IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    java中使用akka手记二 cluster

    五四陈科学院发表于 2014-04-16 15:12:32
    love 0

    以下内容由[五四陈科学院]提供

    基础知识

    • 2.3.2的cluster还有些想实现的东西没有实现,包括:actor分区(负载均衡) actor handoff(临时故障时的actor处理) actor重新平衡(增减节点时有用) actor状态复制机制(类似做M/S作用)

    • 2.3.2的cluster已经有的能力有:节点-集群-leader节点; membership; gossip协议同步状态; VECTOR CLOCKS保障顺序; 失败检测-节点不可达算法; seed节点-新节点加入时可以向这些节点发消息,但也是可以向任意成员发的; membership生命周期有joining up leaving exiting removed unreachable几种状态。

    依赖

    maven中添加akka-cluster包:

    1
    2
    3
    4
    5
    
    
                com.typesafe.akka
                akka-cluster_2.10
                2.3.1
            

    配置

    下面的配置启用了Cluster。application.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://ClusterSystem@127.0.0.1:2551",
          "akka.tcp://ClusterSystem@127.0.0.1:2552"]
    
        auto-down-unreachable-after = 10s
      }
    }

    这里面定义的seed节点,用来作为cluster的初始化和加入点。要跨机器的话,就要修改这里的127.0.0.1了。

    代码

    下面是一个使用cluster的actor实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    public class SimpleClusterListener extends UntypedActor {
      LoggingAdapter log = Logging.getLogger(getContext().system(), this);
      Cluster cluster = Cluster.get(getContext().system());
    
      //subscribe to cluster changes
      @Override
      public void preStart() {
        //#subscribe
        cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), 
            MemberEvent.class, UnreachableMember.class);
        //#subscribe
      }
    
      //re-subscribe when restart
      @Override
      public void postStop() {
        cluster.unsubscribe(getSelf());
      }
    
      @Override
      public void onReceive(Object message) {
    • 这个actor把自己注册了一个cluster事件。当cluster上有风吹草动时,都会收到消息。

    • 运行这段代码只需要运行后面代码中的SimpleClusterApp。

    加入种子节点

    • 一开始能够预料的节点们被叫做种子节点(seed nodes),有节点加入的时候,会等种子节点的返回确认才算是加入成功。

    • 两种方式指定seed nodes的位置,一种是在application.xml中,另一种是在jvm的参数里。

    自动和手动down机

    • 被失败检测出来不可达的节点,会被leader进行处理,也可以手动搞下来。
    • akka.cluster.atuo-down-unreadchable-after=10s 10秒不可达就自动关
    • 也可以写代码 Cluster.get(system).down(address)

    • 网络分裂时,这个自动down有可能会出现脑裂。

    cluster的事件

    • ClusterEvent.MemberUp
    • ClusterEvent.MemberExited
    • ClusterEvent.MemberRemoved
    • ClusterEvent.UnreachableMember
    • ClusterEvent.ReachableMember

    代码

    • sample.cluster.simple.main启动了system。
    • 一共三个actorSystem被启动,端口为2551 2552 0,0的时候会是随机最大端口。
    • application.conf里定义了2551与2552为seed nodes,所以2551与2552先组成了cluster,0加入的时候会收到2551和2552的确认。
    • gossip协议的功劳让一个状态值可能会被重复传递。

    • 本文提及代码在 https://github.com/54chen/akka_cluster_learn


    想快点找到作者也可以到Twitter上留言: @54chen
    或者你懒得带梯子上墙,请到新浪微博:@54chen


沪ICP备19023445号-2号
友情链接