本文出处:http://blog.csdn.net/chaijunkun/article/details/27361453,转载请注明。由于本人不定期会整理相关博文,会对相应内容作出完善。因此强烈建议在原始出处查看此文。
最近在做一套系统,其中要求若干个Worker服务器将心跳信息都上报给中央服务器。当一定时间中央服务器没有得到心跳信息时则认为该Worker失效了,发出告警。
满足这种需求的解决方法多种多样,我开始想到了memcache,上报一次心跳信息就刷新一次缓存,当缓存内心跳信息对象超时被删除,即认为对应的Worker失效。然而由于memcache的工作原理,删除都是被动的,我们无法及时判断数据是否过期,即便知道了数据过期,也没有一种机制来回调方法来执行自定义的处理动作。难道缓存架构就真的不行了吗?
答案是否定的。在Redis 2.8.0版本起,加入了“Keyspace notifications”(即“键空间通知”)的功能。如何理解该功能呢?我们来看下官方是怎么说的:
键空间通知,允许Redis客户端从“发布/订阅”通道中建立订阅关系,以便客户端能够在Redis中的数据因某种方式受到影响时收到相应事件。
可能接收到的事件举例如下:
影响一个给出的键的所有命令(会告诉你哪个键被执行了一个命令,这个命令是什么);
接收到了一个LPUSH操作的所有键(LPUSH命令:key v1 [v2 v3..]将指定的所有值从左到右进行压栈操作,形成一个栈,并将该栈命名为指定的key);
在数据库0中失效的所有键(不一定非得是数据库0,这里这样表述其实想表达可以知道影响的哪个数据库)。
看到这里我联想到,如果一条缓存数据失效了,通过订阅关系,客户端会收到消息,通过分析消息可以得知何种消息,分析消息内容可以知道是哪个key失效了。这样就可以间接实现开头所描述的功能。
接下来我们来看下实验的步骤:
K Keyspace events, published with __keyspace@我们配置为:notify-keyspace-events Ex,含义为:发布key事件,使用过期事件(当每一个key失效时,都会生成该事件)。__ prefix. E Keyevent events, published with __keyevent@ __ prefix. g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... $ String commands l List commands s Set commands h Hash commands z Sorted set commands x Expired events (events generated every time a key expires) e Evicted events (events generated when a key is evicted for maxmemory) A Alias for g$lshzxe, so that the "AKE" string means all the events.
本文中使用的客户端是Jedis,版本为2.4.2,为了代码的通用性,我使用Spring来管理连接:
然后使用Spring Test和Junit来测试代码192.168.1.100 6379
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/applicationContext*.xml") public class RedisSubscribeDemo { private static final Log Log= LogFactory.getLog(RedisSubscribeDemo.class); @Resource private JedisPool pool; @Test public void doTest() throws InterruptedException { for (int i = 0; i < 1; i++) { TestThread thread= new TestThread(pool); thread.start(); } Thread.sleep(50000L); Log.info("Test finish..."); } }
public class TestThread extends Thread { private Log log= LogFactory.getLog(TestThread.class); private JedisPool pool; public TestThread(JedisPool pool){ log.info("loading test thread"); this.pool= pool; } @Override public void run() { Jedis jedis= pool.getResource(); jedis.psubscribe(new MySubscribe(), "*"); try { Thread.sleep(10000L); } catch (InterruptedException e) { log.info("延时失败", e); } jedis.close(); log.info("Test run finished"); } }在测试线程中,我们将自定义的MySubscribe加入到了Jedis的模板订阅(即psubscribe,因为模板订阅的channel是支持星号'*'通配的,这样可以收集到多个通配通道的消息,而与之相反的还有一个subscribe,此订阅只能指定严格匹配的通道)中,同样为了测试过程能够将结果显示出来,在绑定了订阅后,对该线程进行了延时10秒。
public class MySubscribe extends JedisPubSub { private static final Log log= LogFactory.getLog(MySubscribe.class); // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { log.info(pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { log.info(pattern + "=" + channel + "=" + message); } ...其他未用到的重写方法忽略 }作为Jedis自定义订阅,必须继承redis.clients.jedis.JedisPubSub类,在psubscribe模式下,重点重写onPMessage方法,该方法为接收到模板订阅后处理事件的重要代码。pattern为在绑定订阅时使用的通配模板,channel为通配后符合条件的实际通道名称,message就不用多说了,就是事件消息内容。
127.0.0.1:6379> set chaijunkun 123 PX 100PX参数指定生命周期单位为毫秒,100即声明周期,即100毫秒。key为chaijunkun的数据,其值为123。
OK
*=__keyevent@0__:expired=chaijunkun从输出可以看出,之前指定的通配符为*,通配任何通道;之后是实际的通道名称:__keyevent@0__:expired,这里我们可以看到订阅收到了一个keyevent位于数据库0,事件类型为:expired,是一个过期事件;最后是chaijunkun,这个是过期数据的key。
127.0.0.1:6379> help publish PUBLISH channel message summary: Post a message to a channel since: 2.0.0 group: pubsub