configs: - appId: app-1 limits: - api: /v1/user limit: 100 - api: /v1/order limit: 50 - appId: app-2 limits: - api: /v1/user limit: 50 - api: /v1/order limit: 50
String appId = "app-1"; // 调用方APP-ID String url = "http://www.eudemon.com/v1/user/12345";// 请求url RateLimiter ratelimiter = new RateLimiter(); boolean passed = ratelimiter.limit(appId, url); if (passed) { // 放行接口请求,继续后续的处理。 } else { // 接口请求被限流。 }
非功能性需求
易用性:规则的配置、编程接口的使用都很简单;易于集成的Spring(根据业务需求)。
扩展性、灵活性: 灵活的扩展限流算法;支持不同格式的配置。
性能:低延迟-因为每个请求都会被检查。
容错性:限流服务的出错不能影响正常的应用功能请求-因为本身就是一个辅助增强的功能,应该是尽量减少他对原来应用的影响。
低耦合
也可以参照MyBatis的类库思想(MyBatis-Spring类库方便Spring开发),设计一个RateLimiter-Spring类库。
com.xzg.ratelimiter --RateLimiter com.xzg.ratelimiter.rule --ApiLimit --RuleConfig --RateLimitRule com.xzg.ratelimiter.alg --RateLimitAlg
串联:解析ruleConfig
执行limit
public class RateLimiter { private static final Logger log = LoggerFactory.getLogger(RateLimiter.class); // 为每个api在内存中存储限流计数器 private ConcurrentHashMap<String, RateLimitAlg> counters = new ConcurrentHashMap<>(); private RateLimitRule rule; public RateLimiter() { // 将限流规则配置文件ratelimiter-rule.yaml中的内容读取到RuleConfig中 InputStream in = null; RuleConfig ruleConfig = null; try { in = this.getClass().getResourceAsStream("/ratelimiter-rule.yaml"); if (in != null) { Yaml yaml = new Yaml(); ruleConfig = yaml.loadAs(in, RuleConfig.class); } } finally { if (in != null) { try { in.close(); } catch (IOException e) { log.error("close file error:{}", e); } } } // 将限流规则构建成支持快速查找的数据结构RateLimitRule this.rule = new RateLimitRule(ruleConfig); } public boolean limit(String appId, String url) throws InternalErrorException { ApiLimit apiLimit = rule.getLimit(appId, url); if (apiLimit == null) { return true; } // 获取api对应在内存中的限流计数器(rateLimitCounter) String counterKey = appId + ":" + apiLimit.getApi(); RateLimitAlg rateLimitCounter = counters.get(counterKey); if (rateLimitCounter == null) { RateLimitAlg newRateLimitCounter = new RateLimitAlg(apiLimit.getLimit()); rateLimitCounter = counters.putIfAbsent(counterKey, newRateLimitCounter); if (rateLimitCounter == null) { rateLimitCounter = newRateLimitCounter; } } // 判断是否限流 return rateLimitCounter.tryAcquire(); } }
从代码中,我们可以看出来,RuleConfig 类嵌套了另外两个类 AppRuleConfig 和 ApiLimit。这三个类跟配置文件的三层嵌套结构完全对应。我把对应关系标注在了下面的示例中,你可以对照着代码看下。
public class RuleConfig { private List<AppRuleConfig> configs; public List<AppRuleConfig> getConfigs() { return configs; } public void setConfigs(List<AppRuleConfig> configs) { this.configs = configs; } public static class AppRuleConfig { private String appId; private List<ApiLimit> limits; public AppRuleConfig() {} public AppRuleConfig(String appId, List<ApiLimit> limits) { this.appId = appId; this.limits = limits; } //...省略getter、setter方法... } } public class ApiLimit { private static final int DEFAULT_TIME_UNIT = 1; // 1 second private String api; private int limit; private int unit = DEFAULT_TIME_UNIT; public ApiLimit() {} public ApiLimit(String api, int limit) { this(api, limit, DEFAULT_TIME_UNIT); } public ApiLimit(String api, int limit, int unit) { this.api = api; this.limit = limit; this.unit = unit; } // ...省略getter、setter方法... }
configs: <!--对应RuleConfig--> - appId: app-1 <!--对应AppRuleConfig--> limits: - api: /v1/user <!--对应ApiLimit--> limit: 100 unit:60 - api: /v1/order limit: 50 - appId: app-2 limits: - api: /v1/user limit: 50 - api: /v1/order limit: 50
限流过程会频繁查询限流规则,为了提升速度将限流规则组织成为一种支持按照URL快速查询的数据结构(Trie)。
public class RateLimitRule { public RateLimitRule(RuleConfig ruleConfig) { //... } public ApiLimit getLimit(String appId, String api) { //... } }
这个类是限流算法实现类。它实现了最简单的固定时间窗口限流算法。每个接口都要在内存中对应一个 RateLimitAlg 对象,记录在当前时间窗口内已经被访问的次数。
模块划分相对合理
RateLimiter类主要做组装,最好不要有过多的业务逻辑。
RateLimitAlg和RateLimitRule都没有基于接口去实现,可扩展性差一些;
RateLimiter中yaml都是硬编码。
// 重构前: com.xzg.ratelimiter --RateLimiter com.xzg.ratelimiter.rule --ApiLimit --RuleConfig --RateLimitRule com.xzg.ratelimiter.alg --RateLimitAlg // 重构后: com.xzg.ratelimiter --RateLimiter(有所修改) com.xzg.ratelimiter.rule --ApiLimit(不变) --RuleConfig(不变) --RateLimitRule(抽象接口) --TrieRateLimitRule(实现类,就是重构前的RateLimitRule) com.xzg.ratelimiter.rule.parser --RuleConfigParser(抽象接口) --YamlRuleConfigParser(Yaml格式配置文件解析类) --JsonRuleConfigParser(Json格式配置文件解析类) com.xzg.ratelimiter.rule.datasource --RuleConfigSource(抽象接口) --FileRuleConfigSource(基于本地文件的配置类) com.xzg.ratelimiter.alg --RateLimitAlg(抽象接口) --FixedTimeWinRateLimitAlg(实现类,就是重构前的RateLimitAlg)
RateLimiter的重构集中在构造函数
public class RateLimiter { private static final Logger log = LoggerFactory.getLogger(RateLimiter.class); // 为每个api在内存中存储限流计数器 private ConcurrentHashMap<String, RateLimitAlg> counters = new ConcurrentHashMap<>(); private RateLimitRule rule; public RateLimiter() { //改动主要在这里:调用RuleConfigSource类来实现配置加载 RuleConfigSource configSource = new FileRuleConfigSource(); RuleConfig ruleConfig = configSource.load(); this.rule = new TrieRateLimitRule(ruleConfig); } public boolean limit(String appId, String url) throws InternalErrorException, InvalidUrlException { //...代码不变... } }
修改后的代码(读取和解析的逻辑拆出来),有点策略模式的影子(根据文件格式选取不同的Parser):
com.xzg.ratelimiter.rule.parser --RuleConfigParser(抽象接口) --YamlRuleConfigParser(Yaml格式配置文件解析类) --JsonRuleConfigParser(Json格式配置文件解析类) com.xzg.ratelimiter.rule.datasource --RuleConfigSource(抽象接口) --FileRuleConfigSource(基于本地文件的配置类) public interface RuleConfigParser { RuleConfig parse(String configText); RuleConfig parse(InputStream in); } public interface RuleConfigSource { RuleConfig load(); } public class FileRuleConfigSource implements RuleConfigSource { private static final Logger log = LoggerFactory.getLogger(FileRuleConfigSource.class); public static final String API_LIMIT_CONFIG_NAME = "ratelimiter-rule"; public static final String YAML_EXTENSION = "yaml"; public static final String YML_EXTENSION = "yml"; public static final String JSON_EXTENSION = "json"; private static final String[] SUPPORT_EXTENSIONS = new String[] {YAML_EXTENSION, YML_EXTENSION, JSON_EXTENSION}; private static final Map<String, RuleConfigParser> PARSER_MAP = new HashMap<>(); static { PARSER_MAP.put(YAML_EXTENSION, new YamlRuleConfigParser()); PARSER_MAP.put(YML_EXTENSION, new YamlRuleConfigParser()); PARSER_MAP.put(JSON_EXTENSION, new JsonRuleConfigParser()); } @Override public RuleConfig load() { for (String extension : SUPPORT_EXTENSIONS) { InputStream in = null; try { in = this.getClass().getResourceAsStream("/" + getFileNameByExt(extension)); if (in != null) { RuleConfigParser parser = PARSER_MAP.get(extension); return parser.parse(in); } } finally { if (in != null) { try { in.close(); } catch (IOException e) { log.error("close file error:{}", e); } } } } return null; } private String getFileNameByExt(String extension) { return API_LIMIT_CONFIG_NAME + "." + extension; } }