我们希望设计开发一个小的框架,能够获取接口调用的各种统计信息,比如,响应时间的最大值(max)、最小值(min)、平均值(avg)、百分位值(percentile)、接口调用次数(count)、频率(tps) 等,并且支持将统计结果以各种显示格式(比如:JSON 格式、网页格式、自定义显示格式等)输出到各种终端(Console 命令行、HTTP 网页、Email、日志文件、自定义输出终端等),以方便查看。
作为可被复用的框架,除了功能需求,非功能需求也很重要。
接口统计信息:包括接口响应时间的统计信息,以及接口调用次数的统计信息等。
统计信息的类型:max、min、avg、percentile、count、tps 等。
统计信息显示格式:Json、Html、自定义显示格式。
统计信息显示终端:Console、Email、HTTP 网页、日志、自定义显示终端。
除了罗列这些, 还可以通过线框图,把最终数据的显示样式画出来,会更加一目了然。
实际上,从线框图我们还能挖掘出下面几个隐藏需求。
统计触发方式:主动和被动。
统计时间区间:可以自定义
统计时间间隔
Feign feign = Feign.builder() .logger(new CustomizedLogger()) .encoder(new FormEncoder(new JacksonEncoder())) .decoder(new JacksonDecoder()) .errorDecoder(new ResponseErrorDecoder()) .requestInterceptor(new RequestHeadersInterceptor()).build(); public class RequestHeadersInterceptor implements RequestInterceptor { @Override public void apply(RequestTemplate template) { template.header("appId", "..."); template.header("version", "..."); template.header("timestamp", "..."); template.header("token", "..."); template.header("idempotent-token", "..."); template.header("sequence-id", "..."); } public class CustomizedLogger extends feign.Logger { //... } public class ResponseErrorDecoder implements ErrorDecoder { @Override public Exception decode(String methodKey, Response response) { //... } }
//应用场景:统计下面两个接口(注册和登录)的响应时间和访问次数 public class UserController { public void register(UserVo user) { //... } public UserVo login(String telephone, String password) { //... } }
public class Metrics { // Map的key是接口名称,value对应接口请求的响应时间或时间戳; private Map<String, List<Double>> responseTimes = new HashMap<>(); private Map<String, List<Double>> timestamps = new HashMap<>(); private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); public void recordResponseTime(String apiName, double responseTime) { responseTimes.putIfAbsent(apiName, new ArrayList<>()); responseTimes.get(apiName).add(responseTime); } public void recordTimestamp(String apiName, double timestamp) { timestamps.putIfAbsent(apiName, new ArrayList<>()); timestamps.get(apiName).add(timestamp); } public void startRepeatedReport(long period, TimeUnit unit){ executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { Gson gson = new Gson(); Map<String, Map<String, Double>> stats = new HashMap<>(); for (Map.Entry<String, List<Double>> entry : responseTimes.entrySet()) { String apiName = entry.getKey(); List<Double> apiRespTimes = entry.getValue(); stats.putIfAbsent(apiName, new HashMap<>()); stats.get(apiName).put("max", max(apiRespTimes)); stats.get(apiName).put("avg", avg(apiRespTimes)); } for (Map.Entry<String, List<Double>> entry : timestamps.entrySet()) { String apiName = entry.getKey(); List<Double> apiTimestamps = entry.getValue(); stats.putIfAbsent(apiName, new HashMap<>()); stats.get(apiName).put("count", (double)apiTimestamps.size()); } System.out.println(gson.toJson(stats)); } }, 0, period, unit); } private double max(List<Double> dataset) {//省略代码实现} private double avg(List<Double> dataset) {//省略代码实现} }
具体调用:
//应用场景:统计下面两个接口(注册和登录)的响应时间和访问次数 public class UserController { private Metrics metrics = new Metrics(); public UserController() { metrics.startRepeatedReport(60, TimeUnit.SECONDS); } public void register(UserVo user) { long startTimestamp = System.currentTimeMillis(); metrics.recordTimestamp("regsiter", startTimestamp); //... long respTime = System.currentTimeMillis() - startTimestamp; metrics.recordResponseTime("register", respTime); } public UserVo login(String telephone, String password) { long startTimestamp = System.currentTimeMillis(); metrics.recordTimestamp("login", startTimestamp); //... long respTime = System.currentTimeMillis() - startTimestamp; metrics.recordResponseTime("login", respTime); } }
所以我们大概知道这个业务的系统设计模型:
所以我们将框架分成:数据采集,存储、聚合统计和显示四个模块。
先去实现基本功能和思路。再去逐步迭代更优版本。
MetricsCollector 类负责提供 API,来采集接口请求的原始数据。我们可以为 MetricsCollector 抽象出一个接口,但这并不是必须的,因为暂时我们只能想到一个 MetricsCollector 的实现方式。
MetricsStorage 接口负责原始数据存储,RedisMetricsStorage 类实现 MetricsStorage 接口。这样做是为了今后灵活地扩展新的存储方法,比如用 HBase 来存储。
public class MetricsCollector { private MetricsStorage metricsStorage;//基于接口而非实现编程 //依赖注入 public MetricsCollector(MetricsStorage metricsStorage) { this.metricsStorage = metricsStorage; } //用一个函数代替了最小原型中的两个函数 public void recordRequest(RequestInfo requestInfo) { if (requestInfo == null || StringUtils.isBlank(requestInfo.getApiName())) { return; } metricsStorage.saveRequestInfo(requestInfo); } } public class RequestInfo { private String apiName; private double responseTime; private long timestamp; //...省略constructor/getter/setter方法... }
MetricsStorage 类和 RedisMetricsStorage 类的属性和方法也比较明确。具体的代码实现如下所示。注意,一次性取太长时间区间的数据,可能会导致拉取太多的数据到内存中,有可能会撑爆内存。对于 Java 来说,就有可能会触发 OOM(Out Of Memory)。而且,即便不出现 OOM,内存还够用,但也会因为内存吃紧,导致频繁的 Full GC,进而导致系统接口请求处理变慢,甚至超时。
可以采用分页请求的方式?或者异步请求。
public interface MetricsStorage { void saveRequestInfo(RequestInfo requestInfo); List<RequestInfo> getRequestInfos(String apiName, long startTimeInMillis, long endTimeInMillis); Map<String, List<RequestInfo>> getRequestInfos(long startTimeInMillis, long endTimeInMillis); } public class RedisMetricsStorage implements MetricsStorage { //...省略属性和构造函数等... @Override public void saveRequestInfo(RequestInfo requestInfo) { //... } @Override public List<RequestInfo> getRequestInfos(String apiName, long startTimestamp, long endTimestamp) { //... } @Override public Map<String, List<RequestInfo>> getRequestInfos(long startTimestamp, long endTimestamp) { //... } }
public class Aggregator { public static RequestStat aggregate(List<RequestInfo> requestInfos, long durationInMillis) { double maxRespTime = Double.MIN_VALUE; double minRespTime = Double.MAX_VALUE; double avgRespTime = -1; double p999RespTime = -1; double p99RespTime = -1; double sumRespTime = 0; long count = 0; for (RequestInfo requestInfo : requestInfos) { ++count; double respTime = requestInfo.getResponseTime(); if (maxRespTime < respTime) { maxRespTime = respTime; } if (minRespTime > respTime) { minRespTime = respTime; } sumRespTime += respTime; } if (count != 0) { avgRespTime = sumRespTime / count; } long tps = (long)(count / durationInMillis * 1000); Collections.sort(requestInfos, new Comparator<RequestInfo>() { @Override public int compare(RequestInfo o1, RequestInfo o2) { double diff = o1.getResponseTime() - o2.getResponseTime(); if (diff < 0.0) { return -1; } else if (diff > 0.0) { return 1; } else { return 0; } } }); int idx999 = (int)(count * 0.999); int idx99 = (int)(count * 0.99); if (count != 0) { p999RespTime = requestInfos.get(idx999).getResponseTime(); p99RespTime = requestInfos.get(idx99).getResponseTime(); } RequestStat requestStat = new RequestStat(); requestStat.setMaxResponseTime(maxRespTime); requestStat.setMinResponseTime(minRespTime); requestStat.setAvgResponseTime(avgRespTime); requestStat.setP999ResponseTime(p999RespTime); requestStat.setP99ResponseTime(p99RespTime); requestStat.setCount(count); requestStat.setTps(tps); return requestStat; } } public class RequestStat { private double maxResponseTime; private double minResponseTime; private double avgResponseTime; private double p999ResponseTime; private double p99ResponseTime; private long count; private long tps; //...省略getter/setter方法... }
public class ConsoleReporter { private MetricsStorage metricsStorage; private ScheduledExecutorService executor; public ConsoleReporter(MetricsStorage metricsStorage) { this.metricsStorage = metricsStorage; this.executor = Executors.newSingleThreadScheduledExecutor(); } // 第4个代码逻辑:定时触发第1、2、3代码逻辑的执行; public void startRepeatedReport(long periodInSeconds, long durationInSeconds) { executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 第1个代码逻辑:根据给定的时间区间,从数据库中拉取数据; long durationInMillis = durationInSeconds * 1000; long endTimeInMillis = System.currentTimeMillis(); long startTimeInMillis = endTimeInMillis - durationInMillis; Map<String, List<RequestInfo>> requestInfos = metricsStorage.getRequestInfos(startTimeInMillis, endTimeInMillis); Map<String, RequestStat> stats = new HashMap<>(); for (Map.Entry<String, List<RequestInfo>> entry : requestInfos.entrySet()) { String apiName = entry.getKey(); List<RequestInfo> requestInfosPerApi = entry.getValue(); // 第2个代码逻辑:根据原始数据,计算得到统计数据; RequestStat requestStat = Aggregator.aggregate(requestInfosPerApi, durationInMillis); stats.put(apiName, requestStat); } // 第3个代码逻辑:将统计数据显示到终端(命令行或邮件); System.out.println("Time Span: [" + startTimeInMillis + ", " + endTimeInMillis + "]"); Gson gson = new Gson(); System.out.println(gson.toJson(stats)); } }, 0, periodInSeconds, TimeUnit.SECONDS); } } public class EmailReporter { private static final Long DAY_HOURS_IN_SECONDS = 86400L; private MetricsStorage metricsStorage; private EmailSender emailSender; private List<String> toAddresses = new ArrayList<>(); public EmailReporter(MetricsStorage metricsStorage) { this(metricsStorage, new EmailSender(/*省略参数*/)); } public EmailReporter(MetricsStorage metricsStorage, EmailSender emailSender) { this.metricsStorage = metricsStorage; this.emailSender = emailSender; } public void addToAddress(String address) { toAddresses.add(address); } public void startDailyReport() { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.DATE, 1); calendar.set(Calendar.HOUR_OF_DAY, 0); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); Date firstTime = calendar.getTime(); Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { long durationInMillis = DAY_HOURS_IN_SECONDS * 1000; long endTimeInMillis = System.currentTimeMillis(); long startTimeInMillis = endTimeInMillis - durationInMillis; Map<String, List<RequestInfo>> requestInfos = metricsStorage.getRequestInfos(startTimeInMillis, endTimeInMillis); Map<String, RequestStat> stats = new HashMap<>(); for (Map.Entry<String, List<RequestInfo>> entry : requestInfos.entrySet()) { String apiName = entry.getKey(); List<RequestInfo> requestInfosPerApi = entry.getValue(); RequestStat requestStat = Aggregator.aggregate(requestInfosPerApi, durationInMillis); stats.put(apiName, requestStat); } // TODO: 格式化为html格式,并且发送邮件 } }, firstTime, DAY_HOURS_IN_SECONDS * 1000); } }
public class Demo { public static void main(String[] args) { MetricsStorage storage = new RedisMetricsStorage(); ConsoleReporter consoleReporter = new ConsoleReporter(storage); consoleReporter.startRepeatedReport(60, 60); EmailReporter emailReporter = new EmailReporter(storage); emailReporter.addToAddress("wangzheng@xzg.com"); emailReporter.startDailyReport(); MetricsCollector collector = new MetricsCollector(storage); collector.recordRequest(new RequestInfo("register", 123, 10234)); collector.recordRequest(new RequestInfo("register", 223, 11234)); collector.recordRequest(new RequestInfo("register", 323, 12334)); collector.recordRequest(new RequestInfo("login", 23, 12434)); collector.recordRequest(new RequestInfo("login", 1223, 14234)); try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } } }
MetricsCollector
MetricsCollector 负责采集和存储数据,职责相对来说还算比较单一。它基于接口而非实现编程,通过依赖注入的方式来传递 MetricsStorage 对象,可以在不需要修改代码的情况下,灵活地替换不同的存储方式,满足开闭原则。
MetricsStorage、RedisMetricsStorage
MetricsStorage 和 RedisMetricsStorage 的设计比较简单。当我们需要实现新的存储方式的时候,只需要实现 MetricsStorage 接口即可。因为所有用到 MetricsStorage 和 RedisMetricsStorage 的地方,都是基于相同的接口函数来编程的,所以,除了在组装类的地方有所改动(从 RedisMetricsStorage 改为新的存储实现类),其他接口函数调用的地方都不需要改动,满足开闭原则。
Aggregator
Aggregator 类是一个工具类,里面只有一个静态函数,有 50 行左右的代码量,负责各种统计数据的计算。当需要扩展新的统计功能的时候,需要修改 aggregate() 函数代码,并且一旦越来越多的统计功能添加进来之后,这个函数的代码量会持续增加,可读性、可维护性就变差了。所以,从刚刚的分析来看,这个类的设计可能存在职责不够单一、不易扩展等问题,需要在之后的版本中,对其结构做优化。
ConsoleReporter、EmailReporter
ConsoleReporter 和 EmailReporter 中存在代码重复问题。在这两个类中,从数据库中取数据、做统计的逻辑都是相同的,可以抽取出来复用,否则就违反了 DRY 原则。而且整个类负责的事情比较多,职责不是太单一。特别是显示部分的代码,可能会比较复杂(比如 Email 的展示方式),最好是将显示部分的代码逻辑拆分成独立的类。除此之外,因为代码中涉及线程操作,并且调用了 Aggregator 的静态函数,所以代码的可测试性不好。