IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Akka入门编程实践

    Yanjun发表于 2015-08-20 06:34:28
    love 0

    Akka是使用Scala语言开发一个编程库,基于事件驱动的架构实现异步处理,它能够简化编写分布式应用程序。Akka中最核心的概念是Actor模型,它为编写分布式/并行计算应用程序提供了高层次抽象,在实际编程实践中,开发人员可以从对复杂网络通信细节的处理、多线程应用场景下对锁的管理中解脱出来。
    Akka能够给应用程序带来的几个重要的特性是:

    • 容错性
    • 可伸缩性
    • 异步性
    • 事件驱动架构(EDA)
    • 远程透明性

    Actor是Akka中最核心的组件,以至于我们在编写基于Akka的应用程序时,大部分时间都会和Actor打交道,那么Actor到底是怎样的一种抽象呢?一个Actor对象封装了状态和行为,但是它不和外界其它的Actor共享状态,如果一个Actor想要和另一个Actor交互,能且只能通过发送消息来达到信息交换的目的。可见,一个Actor能够很好地保护其内部状态的安全。

    与本地Actor通信

    下面,我们从最简单的Actor编程来体验Akka的功能。首先,先定义几种类型的消息,后面会基于这些消息来进行通信,代码如下所示:

    package org.shirdrn.scala.akka
    
    object Start extends Serializable
    object Stop extends Serializable
    
    trait Message {
      val id: String
    }
    
    case class Shutdown(waitSecs: Int) extends Serializable
    case class Heartbeat(id: String, magic:Int) extends Message with Serializable
    case class Header(id: String, len: Int, encrypted: Boolean) extends Message with Serializable
    case class Packet(id: String, seq: Long, content: String) extends Message with Serializable
    

    要实现一个Actor,需要继承自特质akka.actor.Actor,然后需要实现Actor特质声明的receive方法即可。另外,可选地可以混入另一个特质akka.actor.ActorLogging,提供记录日志的功能。我们首先实现的是一个Actor对象,然后拿到该Actor的一个引用(ActorRef),通过发送消息来与其进行交互,实现的Actor类为LocalActor ,代码如下所示:

    class LocalActor extends Actor with ActorLogging {
    
      def receive = {
        case Start => log.info("start")
        case Stop => log.info("stop")
        case Heartbeat(id, magic) => log.info("Heartbeat" + (id, magic))
        case Header(id, len, encrypted) => log.info("Header" + (id, len, encrypted))
        case Packet(id, seq, content) => log.info("Packet" + (id, seq, content))
        case _ =>
      }
    }
    

    然后,实现一个带有main方法的类来与上面的LocalActor对象:

    object LocalClient extends App {
      // Local actor
      val system= ActorSystem("local-system") // 创建一个ActorSystem对象,用来管理Actor实例
      println(system  )
      val localActorRef =system.actorOf(Props(new LocalServer()), name="local-actor") // 通过ActorSystem对象,获取到一个Actor的引用
      println(localActorRef)
    
      localActorRef ! Start // 向LocalActor发送Start消息
      localActorRef ! Heartbeat("3099100", 0xabcd) // 向LocalActor发送Heartbeat消息
    
       // 创建一个JSON类型的消息Packet
      val content = new JSONObject()
      content.put("name", "Stone")
      content.put("empid", 51082001)
      content.put("score", 89.36581)
      localActorRef ! Packet("3000001", System.currentTimeMillis(), content.toString) // 向LocalActor发送Packet消息
    
      localActorRef ! Stop // 停止LocalActor实例
      system  shutdown // 终止ActorSystem对象,释放资源
    }
    

    虽然,我们只实现了一个本地Actor,但是这也非常有用,比如,我们在同一个JVM中有多个模块之间需要通过消息通信,完全可以实现多个本地Actor,他们之间进行通信,完成复杂的处理逻辑。

    与远程Actor通信

    在分布式应用场景中,通常需要跨节点进行通信,或者说交换消息,那么在使用Akka实现的时候就被抽象为在不同节点之上的多个Actor之间的交互。因为Akka提供的高层次抽象,所以在使用Akka编写分布式应用程序的时候,和编写本地应用程序一样简单。下面,我们实现一个伪分布式应用程序,使Actor在不同的JVM之内进行通信,实现上和在不同的节点上是一样的。
    我们使用配置文件application.conf来指定通信处理过程中相关Actor的配置,包括远程Actor的主机名(或IP地址)和端口,包括本地Actor的基本配置。然后,只需要将该文件放在CLASSPATH之下即可,Akka会使用typesafe提供的配置解析工具ConfigFactory类来进行处理,配置文件application.conf中配置内容如下所示:

    MyRemoteServerSideActor {
      akka {
        actor {
          provider = "akka.remote.RemoteActorRefProvider"
        }
        remote {
          enabled-transports = ["akka.remote.netty.tcp"]
          netty.tcp {
            hostname = "127.0.0.1"
            port = 2552
          }
        }
      }
    }
    
    MyRemoteClientSideActor {
      akka {
        actor {
          provider = "akka.remote.RemoteActorRefProvider"
        }
      }
    }
    

    上面,MyRemoteServerSideActor指定了远程Actor的配置内容,Actor的provider配置为akka.remote.RemoteActorRefProvider,TPC通信配置的主机名为127.0.0.1,端口为2552;MyRemoteClientSideActor指定了本地Actor的配置,Actor的provider配置为akka.remote.RemoteActorRefProvider,下面看看代码实现。

    • 远程Actor实现

    实现远程Actor和实现一个本地Actor的方式是一样的,继承自特质Actor,并实现receive方法。我们实现的RemoteActor的代码如下所示:

    class RemoteActor extends Actor with ActorLogging {
      
      // 模拟处理结果状态,发送给消息的发送方
      val SUCCESS = "SUCCESS"
      val FAILURE = "FAILURE"
    
      def receive = {
        case Start => { // 处理Start消息
          log.info("RECV event: " + Start)
        }
        case Stop => { // 处理Stop消息
          log.info("RECV event: " + Stop)
        }
        case Shutdown(waitSecs) => { // 处理Shutdown消息
          log.info("Wait to shutdown: waitSecs=" + waitSecs)
          Thread.sleep(waitSecs)
          log.info("Shutdown this system.")
          context.system.shutdown // 停止当前ActorSystem系统
        }
        case Heartbeat(id, magic) => log.info("RECV heartbeat: " + (id, magic))  // 处理Heartbeat消息
        case Header(id, len, encrypted) => log.info("RECV header: " + (id, len, encrypted)) // 处理Header消息
        case Packet(id, seq, content) => { // 处理Packet消息
          val originalSender = sender // 获取到当前发送方的Actor引用
          log.info("RECV packet: " + (id, seq, content))
          originalSender ! (seq, SUCCESS) // 响应给发送方消息处理结果,类似发送一个ACK
        }
        case _ =>
      }
    }
    

    上面的Actor实现了接收多种类型的消息:Start、Stop、Shutdown、Heartbeat、Header、Packet,其中一个Shutdown消息是可以将当前远程ActorSystem系统终止的,终止后就无法再处理任何请求,而Packet消息则会给发送方一个返回,告知处理结果。
    一个Actor可以在自己内部终止自己,需要通过执行context.system.shutdown就可以实现。
    启动我们实现的远程Actor系统,等待接收并处理消息,如下所示:

    object AkkaServerApplication extends App {
    
      val system = ActorSystem("remote-system", ConfigFactory.load().getConfig("MyRemoteServerSideActor")) // 创建名称为remote-system的ActorSystem:从配置文件application.conf中获取该Actor的配置内容
      val log = system.log
      log.info("Remote server actor started: " + system)
    
      system.actorOf(Props[RemoteActor], "remoteActor") // 创建一个名称为remoteActor的Actor,返回一个ActorRef,这里我们不需要使用这个返回值
    
    }
    

    这里是程序的主入口,启动改程序可以看到控制台输出如下内容:

    [INFO] [08/14/2015 11:52:45.747] [main] [Remoting] Starting remoting
    [INFO] [08/14/2015 11:52:46.230] [main] [Remoting] Remoting started; listening on addresses :[akka.tcp://remote-system@127.0.0.1:2552]
    [INFO] [08/14/2015 11:52:46.232] [main] [Remoting] Remoting now listens on addresses: [akka.tcp://remote-system@127.0.0.1:2552]
    [INFO] [08/14/2015 11:52:46.239] [main] [ActorSystem(remote-system)] Remote server actor started: akka://remote-system
    

    可以看出,这与我们在配置文件,以及在代码中配置的内容相一致:ActorSystem系统名称为remote-system,通信端口为127.0.0.1:2552。

    • 客户端Actor实现

    我们再看本地将要与远程Actor通信的客户端Actor的实现,如下所示:

    class ClientActor extends Actor with ActorLogging {
    
      // akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
      val path = "akka.tcp://remote-system@127.0.0.1:2552/user/remoteActor" // 远程Actor的路径,通过该路径能够获取到远程Actor的一个引用
      val remoteServerRef = context.actorSelection(path) // 获取到远程Actor的一个引用,通过该引用可以向远程Actor发送消息
    
      @volatile var connected = false
      @volatile var stop = false
    
      def receive = {
        case Start => { // 发送Start消息表示要与远程Actor进行后续业务逻辑处理的通信,可以指示远程Actor初始化一些满足业务处理的操作或数据
          send(Start)
          if(!connected) {
            connected = true
            log.info("Actor connected: " + this)
          }
        }
        case Stop => {
          send(Stop)
          stop = true
          connected = false
        }
        case header: Header => send(header)
        case hb: Heartbeat => sendWithCheck(hb)
        case pkt: Packet => sendWithCheck(pkt)
        case cmd: Shutdown => send(cmd)
    
        case (seq, result) => log.info("RESULT: seq=" + seq + ", result=" + result) // 用于接收远程Actor处理一个Packet消息的结果
        case m => log.info("Unknown message: " + m)
      }
    
      private def sendWithCheck(cmd: Serializable): Unit = {
        while(!connected) {
          Thread.sleep(100)
          log.info("Wait to be connected...")
        }
        if(!stop) {
          send(cmd)
        } else {
          log.warning("Actor has stopped!")
        }
      }
    
      private def send(cmd: Serializable): Unit = {
        log.info("Send command to server: " + cmd)
        try {
          remoteServerRef ! cmd // 发送一个消息到远程Actor,消息必须是可序列化的,因为消息对象要经过网络传输
        } catch {
          case e: Exception => {
            connected = false
            log.info("Try to connect by sending Start command...")
            send(Start)
          }
        }
      }
    
    }
    

    本地Actor会接收处理本地(当前JVM中)发送过来的消息,一个简单的check,然后进行转发,发送到远程Actor;也用来接收来自远程Actor响应的处理结果。接收并转发本地消息,包括如下类型消息:Start、Stop、Shutdown、Header、Heartbeat、Packet。其中,我们会在本地客户端创建一个单独的线程去周期性地发送心跳消息Heartbeat到远程Actor,同时将大量的Packet消息发送到远程Actor去处理。接收到的远程Actor响应的消息是一个Tuple类型,可以提取出seq和result数据,查看某个消息处理结果。下面是本地客户端的实现逻辑,如下所示:

    object AkkaClientApplication extends App {
    
      val system = ActorSystem("client-system", ConfigFactory.load().getConfig("MyRemoteClientSideActor")) // 通过配置文件application.conf配置创建ActorSystem系统
      val log = system.log
      val clientActor = system.actorOf(Props[ClientActor], "clientActor") // 获取到ClientActor的一个引用
      @volatile var running = true
      val hbInterval = 1000
    
      lazy val hbWorker = createHBWorker
    
      /**
       * create heartbeat worker thread
       */
      def createHBWorker: Thread = { // 心跳发送线程
        new Thread("HB-WORKER") {
          override def run(): Unit = {
            while(running) {
              clientActor ! Heartbeat("HB", 39264)
              Thread.sleep(hbInterval)
            }
          }
        }
      }
    
      def format(timestamp: Long, format: String): String = {
        val df = new SimpleDateFormat(format)
        df.format(new Date(timestamp))
      }
    
      def createPacket(packet: Map[String, _]): JSONObject = {
        val pkt = new JSONObject()
        packet.foreach(p => pkt.put(p._1, p._2))
        pkt
      }
    
      val ID = new AtomicLong(90760000)
      def nextTxID: Long = {
        ID.incrementAndGet()
      }
    
      clientActor ! Start // 发送一个Start消息,第一次与远程Actor握手(通过本地ClientActor进行转发)
      Thread.sleep(2000)
    
      clientActor ! Header("HEADER", 20, encrypted=false) // 发送一个Header消息到远程Actor(通过本地ClientActor进行转发)
      Thread.sleep(2000)
    
      hbWorker.start // 启动心跳线程
    
      // send some packets
      val DT_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"
      val r = Random
      val packetCount = 100
      val serviceProviders = Seq("CMCC", "AKBBC", "OLE")
      val payServiceProvicers = Seq("PayPal", "CMB", "ICBC", "ZMB", "XXB")
    
      def nextProvider(seq: Seq[String]): String = {
        seq(r.nextInt(seq.size))
      }
    
      val startWhen = System.currentTimeMillis()
      for(i <- 0 until packetCount) { // 持续发送packetCount个Packet消息
        val pkt = createPacket(Map[String, Any](
          "txid" -> nextTxID,
          "pvid" -> nextProvider(serviceProviders),
          "txtm" -> format(System.currentTimeMillis(), DT_FORMAT),
          "payp" -> nextProvider(payServiceProvicers),
          "amount" -> 1000 * r.nextFloat()))
        clientActor ! Packet("PKT", System.currentTimeMillis, pkt.toString)
      }
      val finishWhen = System.currentTimeMillis()
      log.info("FINISH: timeTaken=" + (finishWhen - startWhen) + ", avg=" + packetCount/(finishWhen - startWhen))
    
      Thread.sleep(2000)
    
      // ask remote actor to shutdown
      val waitSecs = hbInterval
      clientActor ! Shutdown(waitSecs) // 发送Packet消息完成,通知远程Actor终止服务
    
      running = false
      while(hbWorker.isAlive) { // 终止心跳线程
        log.info("Wait heartbeat worker to exit...")
        Thread.sleep(300)
      }
      system.shutdown // 终止本地ActorSystem系统
    }
    

    上面代码中有详细注释,可以了解具体实现。

    使用Akka Future实例

    前面的两种情况,我们模拟了Actor如果在本地/远程的上下文中进行通信处理,Akka很好地屏蔽了底层网络通信细节。接下来我们看看看Akka的Future功能,尤其是Future所支持异步Callback特性。
    我们基于Akka实现的例子,如下图所示:
    akka-future
    上图模拟了一个简易的有趣的爬虫系统,而且在这上面为了演示Akka的使用,我们在各个Actor之间增加了好多消息通信,可以根据上图中箭线上的编号来理解整个实例系统的执行流程。
    存储网页链接,以及一个指定网页的出链接(Outlink)信息,我们使用MySQL数据库,创建了2个数据表,数据库及其表定义如下所示:

    GRANT ALL ON *.* TO 'web'@'%' IDENTIFIED BY 'web';
    
    CREATE DATABASE `page_db` DEFAULT CHARACTER SET utf8;
    -- 用来存储一个链接,以及该链接对应的页面的相关信息
    CREATE TABLE `web_link` (
      `link` CHAR(128) NOT NULL UNIQUE,
      `domain` VARCHAR(64) NOT NULL,
      `encoding` VARCHAR(11) NOT NULL DEFAULT 'utf-8',
      `content_length` INT NOT NULL DEFAULT 0,
      `create_time` VARCHAR(20) NOT NULL,
      PRIMARY KEY (`link`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- 用来存储一个链接的outlink信息
    CREATE TABLE `web_outlink` (
      `link` CHAR(128) NOT NULL,
      `outlink` VARCHAR(128) NOT NULL,
      `create_time` VARCHAR(20) NOT NULL,
      PRIMARY KEY (`link`, `outlink`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    另外,Akka中Actor之间通过发送消息进行通信,所以我们首先定义几个case class,如下所示:

    case class WebUrl(link: String)
    case class ScheduledWebUrl(link: String, config: Map[String, Any])
    case class CrawledWeb(link: String, domain: String, encoding: String, contentLength: Int, outlinks: Set[String])
    case class Stored(link: String, outlinkCount: Int)
    

    还有,操作MySQL以及日期时间转换操作,我们实现了两个工具类,如下所示:

    object MySQLUtils {
    
      val driverClass = "com.mysql.jdbc.Driver"
      val jdbcUrl = "jdbc:mysql://10.10.4.130:3306/page_db"
      val user = "web"
      val password = "web"
    
      try {
        Class.forName(driverClass)
      } catch {
        case e: ClassNotFoundException => throw e
        case e: Exception => throw e
      }
    
      @throws(classOf[SQLException])
      def getConnection: Connection = {
        DriverManager.getConnection(jdbcUrl, user, password)
      }
    
      @throws(classOf[SQLException])
      def doTrancation(transactions: Set[String]) : Unit = {
        val connection = getConnection
        connection.setAutoCommit(false)
        transactions.foreach {
          connection.createStatement.execute(_)
        }
        connection.commit
        connection.close 
      }
    }
    
    object DatetimeUtils {
    
      val DEFAULT_DT_FORMAT = "yyyy-MM-dd HH:mm:ss"
    
      def format(timestamp: Long, format: String): String = {
        val df = new SimpleDateFormat(format)
        df.format(new Date(timestamp))
      }
    
      def format(timestamp: Long): String = {
        val df = new SimpleDateFormat(DEFAULT_DT_FORMAT)
        df.format(new Date(timestamp))
      }
    
    }
    

    下面详细上图中各个组件的实现:

    • 程序入口AkkaCrawlApp

    AkkaCrawlApp是程序入口,读取Seed链接信息,然后发送给CrawlActor,使整个系统开始运行起来。AkkaCrawlApp代码实现如下所示:

    object AkkaCrawlApp  {
    
      def main(args: Array[String]) {
        val system = ActorSystem("crawler-system") // 创建一个ActorSystem
        system.log.info(system.toString)
    
        val scheduleActorRef = system.actorOf(Props[ScheduleActor], name="schedule-actor") // 创建ScheduleActor 
        val storeActorRef = system.actorOf(Props[PageStoreActor], name="store-actor") // 创建PageStoreActor 
    
        val crawlActorRef = system.actorOf(Props[CrawlActor], name="crawl-actor") // 创建CrawlActor 
    
        val links =
          """
            |http://apache.org
            |http://csdn.net
            |http://hadoop.apache.org
            |http://spark.apache.org
            |http://nutch.apache.org
            |http://storm.apache.org
            |http://mahout.apache.org
            |http://flink.apache.org
            |http://fdajlkdjfakfjlkadjflkajflakjf.com
          """.stripMargin
        val seeds: Seq[String] = links.split("\\s+").toSeq
        ScheduleActor.sendFeeds(crawlActorRef, seeds) // 调用ScheduleActor的伴生对象的sendFeeds,将爬虫入口seed链接发送给CrawlActor
    
      }
    }
    

    我们在这里创建了一个ActorSystem实例,然后该系统有3个Actor实例:ScheduleActor 、PageStoreActor、CrawlActor。如果是在该ActorSystem上下文中,可以通过Actor的名称检索到该Actor的引用ActorRef实例,然后通过向该引用发送消息来进行通信。

    • ScheduleActor伴生对象

    ScheduleActor伴生对象类中,通过传入一个CrawlActor的引用,将传入的Seed链接发送给该系统中的CrawlActor。ScheduleActor伴生对象类代码如下所示:

    object ScheduleActor {
    
      def sendFeeds(crawlerActorRef: ActorRef, seeds: Seq[String]): Unit = {
        seeds.foreach(crawlerActorRef ! _)
      }
    }
    

    上面代码比较简单,遍历seed链接集合,向CrawlActor发送传入的每一个Seed链接。

    • CrawlActor实现

    CrawlActor是一个Actor,它用来处理链接信息,同时也负责下载网页数据并抽取出链接(Outlink),代码实现如下所示:

    class CrawlActor extends Actor with ActorLogging {
    
      // 获取到ScheduleActor和PageStoreActor的引用
      private val scheduleActor = context.actorOf(Props[ScheduleActor], "schedule_actor")
      private val storeActor = context.actorOf(Props[PageStoreActor], "store_actor")
      private val q = new LinkedBlockingQueue[String]()
      implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
    
      def receive = {
        case link: String => { // 系统启动时,ScheduleActor伴生对象将入口seed链接字符串发过来
          if(link != null && link.startsWith("http://")) { // 简单验证链接合法性
            log.info("Checked: " + link)
            scheduleActor ! WebUrl(link) // 将字符串链接link包装成为WebUrl发送回ScheduleActor,等待进一步调度
          }
        }
        case ScheduledWebUrl(link, _) => { // ScheduleActor将包含调度信息和链接信息的链接任务发过来,指派CrawlActor下载网页内容
          var crawledWeb: CrawledWeb = null
          val crawlFuture = Future { // 创建一个Future,里面的逻辑包含了下载网页的代码
            try {
              var encoding = "utf-8"
              var outlinks: Set[String] = Set[String]()
              val u = new URL(link)
              val domain = u.getHost
              val uc = u.openConnection().asInstanceOf[HttpURLConnection]
              uc.setConnectTimeout(5000)
              uc.connect()
              if (uc.getResponseCode == 200) {
                // page encoding
                if (uc.getContentEncoding != null) {
                  encoding = uc.getContentEncoding
                }
                // page content
                if (uc.getContentLength > 0) {
                  val in = uc.getInputStream
                  val buffer = Array.fill[Byte](512)(0)
                  val baos = new ByteArrayOutputStream
                  var bytesRead = in.read(buffer)
                  while (bytesRead > -1) {
                    baos.write(buffer, 0, bytesRead)
                    bytesRead = in.read(buffer)
                  }
                  outlinks = extractOutlinks(link, baos.toString(encoding)) // 抽取网页中的出链接
                  baos.close
                }
                log.info("Page: link=" + link + ", encoding=" + encoding + ", outlinks=" + outlinks)
                CrawledWeb(link, domain, encoding, uc.getContentLength, outlinks)
              }
            } catch {
              case e: Throwable => {
                log.error("Crawl error: " + e.toString)
                e
              }
            }
          }
    
          // 这里设置了一个回调,当下载网页的Future完成后,会调用这里的回调方法
          crawlFuture.onSuccess { // 下载成功,则Future返回CrawledWeb对象
            case crawledWeb: CrawledWeb => {
              log.info("Succeed to crawl: link=" + link + ", crawledWeb=" + crawledWeb)
              if(crawledWeb != null) {
                storeActor ! crawledWeb // 通知PageStoreActor保存网页相关信息
                log.info("Sent crawled data to store actor.")
                q add link // 将链接缓存到队列中
              }
            }
          }
          crawlFuture.onFailure { // 下载失败,则Future返回Throwable异常对象,这里简单打印出异常内容
            case exception: Throwable => log.error("Fail to crawl: " + exception.toString)
          }
        }
        case Stored(link, count) => { // 当PageStoreActor保存网页信息成功,会给一个Stored通知
          q.remove(link)
          scheduleActor ! (link, count) // 向ScheduleActor汇总统计结果
        }
      }
    
      def extractOutlinks(parentUrl: String, content: String): Set[String] = { // 使用正则表达式抽取页面上的链接
        val outlinks = "href\\s*=\\s*\"([^\"]+)\"".r.findAllMatchIn(content).map { m =>
          var url = m.group(1)
          if(!url.startsWith("http")) {
            url = new URL(new URL(parentUrl), url).toExternalForm
          }
          url
        }.toSet
    
        // 只保留页面上以html和htm结尾的链接
        outlinks.filter( url => !url.isEmpty && (url.endsWith("html") || url.endsWith("htm")))
      }
    }
    

    这里需要说明的是,在使用Future的回调功能时,有两种方式:
    一种是使用onSuccess和onFailure方法,这就要在Future执行代码段中设置好处理逻辑执行成功时,返回值是什么类型,可能有多重成功的情况,如下载页面成功、由于没使用代理无法下载也算成功结束,然后在onSuccess方法中通过case来分别处理;如果Future中代码处理失败,则在onFailure方法中处理异常,例如如果是因为没使用代理无法访问页面,可以在这里选择一个代理地址,然后重新发回给ScheduleActor进行调度下载。
    另一种是使用onComplete方法来处理回调,这里面能够同时处理Future代码执行成功或者失败的情况,根据自己的习惯选择使用。
    上面代码中,我们使用了下面一个implicit定义:

    implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
    

    这里指定了一个隐式ExecutionContext变量ec,它提供了一个线程池来执行Future中定义的任务,也可以看看Future的onSuccess方法,它是一个柯里化(Currying)函数,需要传入一个ExecutionContext,如下所示:

      def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
        case Success(v) =>
          pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError
        case _ =>
      }
    

    如果每次调用都显式传入一个ExecutionContext实例,代码看起来会非常丑陋,所以通过定义隐式ExecutionContext,使代码看起来更直观清晰。通过上面的onSuccess方法的定义,我们也能看到,它内部实际上调用了onComplete方法。

    • PageStoreActor实现

    PageStoreActor主要用来将数据进行持久化,它会直接操作MySQL数据库,代码如下所示:

    class PageStoreActor extends Actor with ActorLogging {
    
      implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
      var crawlerRef = context.actorOf(Props[CrawlActor], name="crawl-actor")
    
      def receive = {
        case CrawledWeb(link, domain, encoding, contentLength, outlinks) => {
          val future = Future {
            var sqls = Set[String]()
            try {
              val createTime = DatetimeUtils.format(System.currentTimeMillis)
              val sql = "INSERT INTO web_link VALUES ('" + link + "','" + domain + "','" + encoding + "'," + contentLength + ",'" + createTime + "')"
              log.info("Link SQL: " + sql)
              sqls += sql
              var outlinksSql = "INSERT INTO web_outlink VALUES "
              outlinksSql += outlinks.map("('" + link + "','" + _ + "','" + createTime + "')").mkString(",")
              log.info("Outlinks SQL: " + outlinksSql)
              sqls += outlinksSql
    
              // 使用了事务操作
              MySQLUtils.doTrancation(sqls)
              (link, outlinks.size)
            } catch {
              case e: Throwable => throw e
            }
          }
    
          // 这里也使用了Future的回调功能
          future.onSuccess {
            case (link: String, outlinkCount: Int) => {
              log.info("SUCCESS: link=" + link + ", outlinkCount=" + outlinkCount)
              crawlerRef ! Stored(link, outlinkCount) // 将持久化后的链接及其outlink数量回馈给CrawlActor
            }
          }
          future.onFailure {
            case e: Throwable  => throw e
          }
        }
      }
    }
    

    上面代码比较容易理解。

    • ScheduleActor实现

    ScheduleActor负责调度任务,将带有调度信息的链接发送给CrawlActor去下载链接对应的页面。同时,页面下载、保存完成以后,ScheduleActor会收集结果信息,保存在内部的一个ConcurrentHashMap中,实现代码如下所示:

    class ScheduleActor extends Actor with ActorLogging {
    
      val config = Map(
        "domain.black.list" -> Seq("google.com", "facebook.com", "twitter.com"),
        "crawl.retry.times" -> 3,
        "filter.page.url.suffixes" -> Seq(".zip", ".avi", ".mkv")
      )
      val counter = new ConcurrentHashMap[String, Int]()
    
      def receive = {
        case WebUrl(url) => {
          sender ! ScheduledWebUrl(url, config)
        }
        case (link: String, count: Int) => {
          counter.put(link, count)
          log.info("Counter: " + counter.toString)
        }
      }
    
    }
    
    • Akka应用扩展

    上面的例子,我们是在单机上运行的,起始可以很容易将其扩展到多机环境。结合上面我们的Akka Remoting实践,可以将各个Actor分别运行在不同的进程之内(单机多进程,或跨机器多进程),通过配置文件的方式,单独搞一个object工具类用来解析配置文件,根据实际需要创建所需要通信的Actor(ActorRef或ActorSelection),其他其他代码的处理几乎不需要做改动,就能运行。
    另外,也可以使用Akka提供的内置Routing策略,来实现消息的路由。



沪ICP备19023445号-2号
友情链接