IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    java中使用akka手记三 cluster详例

    五四陈科学院发表于 2014-04-17 20:11:07
    love 0

    以下内容由[五四陈科学院]提供

    一个例子

    • 同样是typesafe的经典例子。
    • 例子提供的服务是传输文本。当文本发给frontend节点,它会委派backend节点,backend执行转化任务,把结果返回给原来的客户端。
    • 新的backend节点和frontend节点,都可以动态地在cluster上增减。

    message

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    public interface TransformationMessages {
    
      public static class TransformationJob implements Serializable {
        private final String text;
    //......
      }
    
      public static class TransformationResult implements Serializable {
        private final String text;
    //.....
      }
    
      public static class JobFailed implements Serializable {
        private final String reason;
        private final TransformationJob job;
    //....
      }
    
      public static final String BACKEND_REGISTRATION = "BackendRegistration";
    
    }

    backend处理逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    
    public class TransformationBackend extends UntypedActor {
    
      Cluster cluster = Cluster.get(getContext().system());
    //...  
      @Override
      public void onReceive(Object message) {
        if (message instanceof TransformationJob) {
          TransformationJob job = (TransformationJob) message;
          getSender().tell(new TransformationResult(job.getText().toUpperCase()),
              getSelf());
    
        } else if (message instanceof CurrentClusterState) {
          CurrentClusterState state = (CurrentClusterState) message;
          for (Member member : state.getMembers()) {
            if (member.status().equals(MemberStatus.up())) {
              register(member);
            }
          }
    
        } else if (message instanceof MemberUp) {
          MemberUp mUp = (MemberUp) message;
          register(mUp.member());
    
        } else {
          unhandled(message);
        }
      }
    
      void register(Member member) {
        if (member.hasRole("frontend"))
          getContext().actorSelection(member.address() + "/user/frontend").tell(
              BACKEND_REGISTRATION, getSelf());
      }
    }
    • backend订阅了cluster的事件,检测frontend节点,还会发一条消息告诉fontend可以使用了。
    • frontend节点接收用户的任务,扔给注册好的backend节点。

    frontend节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    public class TransformationFrontend extends UntypedActor {
      List backends = new ArrayList();
      int jobCounter = 0;
      @Override
      public void onReceive(Object message) {
        if ((message instanceof TransformationJob) && backends.isEmpty()) {
          TransformationJob job = (TransformationJob) message;
          getSender().tell(
              new JobFailed("Service unavailable, try again later", job),
              getSender());
    
        } else if (message instanceof TransformationJob) {
          TransformationJob job = (TransformationJob) message;
          jobCounter++;
          backends.get(jobCounter % backends.size())
              .forward(job, getContext());
    
        } else if (message.equals(BACKEND_REGISTRATION)) {
          getContext().watch(getSender());
          backends.add(getSender());
    
        } else if (message instanceof Terminated) {
          Terminated terminated = (Terminated) message;
          backends.remove(terminated.getActor());
    
        } else {
          unhandled(message);
        }
      }
    }
    • frontend用List保存了backend的actor位置,有需要的时候就轮循发给backend。
    • getSender 本次收到消息的上游,一般用来回复消息。
    • getContext 本actor的上下文。
    • getContext().watch DeathWatch,相当于watch了谁,谁有啥公开动作就会告诉我,包括挂了之类的。
    • ActorRef.forward与tell、ask的区别,性能最好的是tell,发完就走。ask是发完等Future,要等的话性能是个问题。forward用于从一个actor转发消息给另一个actor,原始的sender信息会被保留,在做路由、负载均衡、备份时非常有用。

    运行TransformationApp

    • sample.cluster.transformation.TransformationApp 启动三个backend 2551 2552 0为一个cluster,启动一个fronend。
    • frontend每5秒会收到一次任务,接收成功后print代码,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    system.scheduler().schedule(interval, interval, new Runnable() {
          public void run() {
            ask(frontend,
                new TransformationJob("hello-" + counter.incrementAndGet()),
                timeout).onSuccess(new OnSuccess() {
              public void onSuccess(Object result) {
                System.out.println(result);
              }
            }, ec);
          }
    
        }, ec);
    • frontend节点中,收到job的时候会去检查backend注册数是否可用了,如果有可用的就forward任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    public void onReceive(Object message) {
        if ((message instanceof TransformationJob) && backends.isEmpty()) {
          TransformationJob job = (TransformationJob) message;
          getSender().tell(
              new JobFailed("Service unavailable, try again later", job),
              getSender());
    
        } else if (message instanceof TransformationJob) {
          TransformationJob job = (TransformationJob) message;
          jobCounter++;
          backends.get(jobCounter % backends.size())
                  .forward(job, getContext());
    
        } else if (message.equals(BACKEND_REGISTRATION)) {
          getContext().watch(getSender());
          backends.add(getSender());
    
        } else if (message instanceof Terminated) {
          Terminated terminated = (Terminated) message;
          backends.remove(terminated.getActor());
    
        } else {
          unhandled(message);
        }
      }
    • 在backend中有一句代码如下:
    1
    2
    3
    4
    5
    
    void register(Member member) {
        if (member.hasRole("frontend"))
          getContext().actorSelection(member.address() + "/user/frontend").tell(
              BACKEND_REGISTRATION, getSelf());
      }
    • 解析:backend订阅了memberUp事件,所以在cluster中如果有memberUp了,都会执行上述代码。
    • actorSelection是根据地址进行lookup,返回一个ActorSelection,可以当成本地的actor一样tell。

    代码

    • 本文提及代码在 https://github.com/54chen/akka_cluster_learn

    想快点找到作者也可以到Twitter上留言: @54chen
    或者你懒得带梯子上墙,请到新浪微博:@54chen


沪ICP备19023445号-2号
友情链接