搜索平台化中预热的需求描述为:当线上集群需要水平扩容时,新部署实例的机器需要用已存在的日志进行预查询,其目的是将用户的常用查询加载到扩容机内存中,最终对外开放时以提高缓存命中率,减少搜索RT
比如现在要用A机器上的日志对B机器进行预热,实现很简单:
步骤一、拿到A机器上的日志,%TOMCAT_HOME%/logs/catalina.2014-06-21.log
默认取昨天生成好的日志文件,不存在就向前推,直到找不到报错。然后对日志文件过滤
awk -F'{|}' '{/param/; if (count>100000) exit; if ($2 != "") {count++; print $2 > "preload.log";};}' catalina.2014-06-21.log
其中用关键字“param”过滤行文本,再以分隔符“{”或“}”解析后将第二个字段重定向到preload.log文件下,取前10万行处理结果得到preload.log如下
步骤二、将A机器上生成的preload.log文件scp到平台机,通过平台机读取文件并向B机器查询,代码如
private List readPreloadFile(File preloadLog) {
List ret = Lists.newArrayList();
BufferedReader br = null;
String s = null;
try {
br = new BufferedReader(new FileReader(preloadLog));
while((s = br.readLine()) != null) {
ret.add(s);
}
} catch (IOException e) {
} finally {
if (br != null) {
IOUtils.closeQuietly(br);
}
}
return ret;
}
考虑到通过NIO中的MappedByteBuffer直接将整个文件映射到内存可以避免一条条read的io开销,尝试如下
private List readPreloadFile(File preloadLog) {
List logs = Lists.newArrayList();
FileChannel fc = null;
MappedByteBuffer fout = null;
byte[] ret = null;
try {
fc = new RandomAccessFile(preloadLog, "r").getChannel();
fout = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
ret = new byte[(int)fc.size()];
fout.get(ret);
} catch (FileNotFoundException e) {
} catch (IOException e) {
} finally {
fout = null;
if (fc != null) {
IOUtils.closeQuietly(fc);
}
}
logs = Arrays.asList(StringUtils.split(new String(ret), '\n'));
return logs;
}
实际对10万数据测试结果显示后者452ms,比前者305ms还差,发现主要还是耗在将byte数组转化为字符串(243ms)和split动作(164)ms
注意,以上两种方式都是一次性将日志文件的内容读到内存,现在10万条线上日志大约40M,考虑到后续会加大预热量以及同时预热多台机器,这样可直接导致OOM,所以更佳的方式应该从文件中读一条预热一条
将第一步返回的结果分解给多个线程去查询
1)定义callable类型的线程,返回该线程总共处理的查询次数
class PreloadHandler implements Callable {
private String name;
private String solrPath;
private List subQueryLogs;
private ThreadLocal localDoneNum = new ThreadLocal (){
@Override
protected Integer initialValue() {
return 0;
}
};
private ThreadLocal localErrNum = new ThreadLocal (){
@Override
protected Integer initialValue() {
return 0;
}
};
PreloadHandler(String name, String solrPath, List subQueryLogs) {
this.name = name;
this.solrPath = solrPath;
this.subQueryLogs = subQueryLogs;
}
public Integer call() throws Exception {
for (String queryLog : subQueryLogs) {
if (!isRunning) {
break;
}
String url = solrPath + queryLog;
try {
HttpUtil.httpGet(url); //真正查询处理
} catch (Exception e) {
errNum.getAndIncrement();
int _errNum = localErrNum.get();
localErrNum.set(++_errNum);
}
doneNum.getAndIncrement();
int _doneNum = localDoneNum.get();
localDoneNum.set(++_doneNum);
}
return localDoneNum.get();
}
构造函数中传入三个参数,分别是线程名称,如“搜索实例名_机器B_handler_线程序号”;查询路径solrPath,如“http://机器B的ip:端口号port/solr/搜索实例名/select?”;分解后的子日志队列
localErrNum和localDoneNum是线程处理失败和完成的查询计数;errNum和doneNum是多个线程总的处理失败和完成的查询计数。前者定义为threadlocal,后者必须是atomic类型的
2)主线程中进行分解和子任务提交
public void run() {
isRunning = true;
String solrPath = String.format("http://%s:%s/solr/%s/select?",host.getInnerIp(),
port, instance.getInstanceName());
List> subList = Lists.partition(queryLogs,
queryLogs.size() / handlers);
futures = Lists.newArrayList();
for (int i = 0; i < handlers; i++) {
futures.add(threadPool.submit(
new PreloadHandler(Joiner.on("_").join(instance.getInstanceName(),
host.getHostName(), "handler", (i + 1)), solrPath, subList.get(i))));
}
}
3)因为异步的,页面触发action后就直接返回了。所以主线程中需要初始化一些状态信息并保存下来以供ajax查询
private static Map statCache = Maps.newConcurrentMap();
public static void registor(String key, Preload preload) {
statCache.put(key, preload);
}public static void unRegistor(String key) {
statCache.remove(key);
}public void init() throws IOException {
queryLogs = readPreloadFile(preloadLog);
this.threadPool = Executors.newFixedThreadPool(handlers);
totalNum = new AtomicInteger(queryLogs.size());
doneNum = new AtomicInteger(0);
errNum = new AtomicInteger(0);
registor(
Joiner.on("_").join(instance.getInstanceName(), host.getHostName()), this);
}
并提供取消正在执行任务的能力
public void cancel() { isRunning = false; for (Future f : futures) { f.cancel(true); } }
public void close() {
isRunning = false;
if (threadPool != null) {
threadPool.shutdown();
}
}
主线程类和“搜索实例+预热机器”进行绑定,外界通过如下查询预热状态
public static Preload getPreload(String instanceName, String hostName) {
return statCache.get(Joiner.on("_").join(instanceName, hostName));
}
综上所述便是整个Preload过程,下面给出预热一台机器的演示结果:
总共50万条日志,5个线程并发预热,结果显示了每个线程处理的日志和错误数以及总的处理进度