IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [翻译] channel 独木难支

    mikespook发表于 2014-09-15 09:05:04
    love 0

    原文在此。遗憾的是文章只提出了问题,并没明确提供如何解决这些问题。但无论如何,对于这种可以引起反思的文章,是不能放过的。另外,我得承认,似乎高层次的分布式系统的抽象,用函数式语言的范式来表述更容易一些(实现上其实未必)。

    ————翻译分隔线————

    channel 独木难支
    或者说为什么流水线作业没那么容易

    勇敢和聪明的 Golang 并发模型。

    @kachayev 撰写

    概述

    Go 被设计用于更容易的构建并发系统,因此它有运行独立的计算的 goroutine 和用于它们之间通讯的 channel。我们之前都听过这个故事。所有的例子和指南看起来都挺好的:我们可以创建一个新的 channel,可以向这个 channel 发送数据,可以从 channel 读取,甚至还有漂亮和优雅的 select 语句(顺便提一下,为什么 21 世纪了我们还在用语句?),阻塞读和缓存……
    A magical unicorn

    主旨:99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。

    在为初学者撰写指南的时候这确实挺酷的!但是当你尝试实现大型的复杂系统的时候这就很痛苦了。channel 太原始了。它们是低级的构件,我相当怀疑你愿意在日常工作中天天和它们打交道。

    看看“高级模式”和“流水作业”。不是那么简单吧?有太多的东西要考虑,并且永远记得:什么时候、如何关闭 channel;如何传递错误;如何释放资源。我抱怨这些是因为我曾尝试实现一些东西,然后失败了。而我每天都在面对这些东西。

    你可能会说,对于初学者没必要理解所有细节。不过……描述一个模式真得很“高级”?不幸的是,答案是否定的。它们是基础和常识。

    更仔细的了解一下流水作业问题。这真得是流水作业?不,“…对于每个来自目录的路径计算 MD5 校验码,并将结果存入一个 map[string][string]…”。这只是一个 pmap(并行 map)。或具有池化执行器的、有限的并行化的 pmap。而 pmap 不应当需要我输入如此多行代码。想了解真正的流水作业——我将在文章的最后介绍一个(参阅“构建 Twitter 分析器”的段落)。

    那么模式如何呢?

    为了快速开发真实的应用,我们应当能够提炼出比原始的 channel 层面更高的抽象。它们只是传输层。我们需要应用层的抽象来编写程序(对比 OSI),否则你会发现你总是在低级的 channel 网络的细节上纠结,试图在生产环境中、偶发的、没有任何有效的方法重现的找到它不工作的原因。参阅 Erlang OTP 是如何针对性的解决类似的问题的:将你保护在低级的消息传递代码之外。

    低级的代码有什么问题?这里有一篇超棒的文章“爱德华 C++ 手(译注:借‘爱德华剪刀手’)”:

    手里有把剪刀并不一定总是那么糟糕。爱德华有许多天赋:例如,他能创造劲爆的狗狗的发型。别误会——它展示了许多劲爆的狗狗的发型(我是说优雅且简单的 C++ 代码),但是主要内容还是关于如何避免剪坏,以及在发生剪坏的情况下进行急救。

    在 Kyiv Go 聚会的时候,我经历了相同的情况:在一页幻灯上那 20 行干净可读的代码。一个不一般的竞态条件和一个可能出现的运行时错误。这对于所有听众来说很明显吗?不。至少一半人不明白。

    痛苦的缘由?

    好,让我们试着收集一些类似的模式。从工作的经验中、从书中、从其他语言中(是,伙计们,我知道这有点令人难以相信,不过还有许多其他语言同样也有并发的设计)。

    Rob Pike 讨论了 Fan-in、Fan-out。在许多情况下,这很有用,不过还是关于网络的 channel。而不是你的应用。在任何情况下,看看(无耻的从这里偷的)。
    Rob Pike talks about Fan-in, Fan-out. It’s useful in many ways, but still about the network of channels. Not about your application. In any case, let’s check (shamelessly stolen from here).

    func merge(cs ...<-chan int) <-chan int {
        var wg sync.WaitGroup
        out := make(chan int)
    
        // 为 cs 中每个输入的 channel 启动一个输出用的 goroutine。
        // 从 c 中复制值出来直到 c 被关闭,然后又调用 wg.Done。
        output := func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }
        wg.Add(len(cs))
        for _, c := range cs {
            go output(c)
        }
    
        // 一旦所有输出的 goroutine 完成的,就启动一个 goroutine 来关闭 out。
        // 这必须在 wg.Add 调用后启动。
        go func() {
            wg.Wait()
            close(out)
        }()
        return out
    }
    

    呃…… <-chan int。在我的应用中重用起来没那么抽象(例如,迁移到库中)……并且在每次我需要的时候都重新实现也不是那么清晰。那么如何让其可以重用?<-chan interface{}?欢迎来到类型转换和运行时错误的领地。如果,希望实现一个高级的 fan-in(合并)就必须牺牲类型安全。同样的(不幸的)是其他模式也是一样。

    我真正想要的是:

    func merge[T](cs ...<-chan T) <-chan T
    

    是,我知道 Go 没有泛型,因为谁需要它们呢?

    现在天气如何?

    回到模式。让我们分析一个假设的项目,服务器端开发会与实际经验非常接近。我们需要一个服务器接收请求,输入一个美国的州,返回从 OpenWeatherMap 收集到的信息。例如这样:

    $ http localhost:4912/weather?q=CA
    HTTP/1.1 200 OK
    Access-Control-Allow-Credentials: true
    Access-Control-Allow-Methods: GET, POST
    Access-Control-Allow-Origin: *
    Connection: keep-alive
    Content-Type: application/json; charset=utf-8
    
    [{
        "clouds": {
            "all": 40
        },
        "id": 5391959,
        "main": {
            "temp": 288.89,
            "temp_max": 291.48,
            "temp_min": 286.15
        },
        "name": "San Francisco",
        "weather": [
            {
                "description": "mist",
                "icon": "50d",
                "id": 701,
                "main": "Mist"
            }
        ]
    }, {
        "clouds": {
            "all": 90
        },
        "id": 5368361,
        "main": {
            "temp": 292.83,
            "temp_max": 296.15,
            "temp_min": 289.15
        },
        "name": "Los Angeles",
        "weather": [
            {
                "description": "mist",
                "icon": "50d",
                "id": 701,
                "main": "Mist"
            }
        ]
    }]
    

    pmap

    让我们从一些我们已经知道的东西开始。那么,我们收到了请求 ?q=CA。我不想对从哪里得到相关城市的列表进行解释。我们可以用这个数据库,在内存中缓存以及其他什么合理的东西。假设我们有一个神奇的 findCities(state) 函数,返回 chan City(像通常 go 程序表现的延迟序列那样)。然后呢?每个城市我们都必须调用 OpenWeatherMap API 并解析结果到一个 map[City]Weather 中。我们已经讨论过这个模式了。这是个 pmap。我希望我的代码像这样:

    chanCities := findCities(state)
    resolver := func(name City) Weather { return openWeatherMap.AskFor(name) }
    weather := chanCities.Par.Map(resolver)
    

    或限制并发数:

    chanCities := findCities(state)
    pool := NewWorkers(20)
    resolver := func(w Worker, name City) Weather { return w.AskFor(name) }
    weather := chanCities.Par.BoundedMap(pool, resolver)
    

    我希望所有这些 <-done 同步和神圣的 select 完全被隐藏起来。

    Futures & Promises

    获取当前天气可能需要很长的时间,例如,你有一个很长的城市列表。当然,你不希望重复的 API 调用,因此应当可以用某种方法管理并行的请求:

    func collect(state string) Weather {
      calc, ok := calculations.get(state) // check if it's in progress
      if !ok {
          calc.calculations.run(state) // run otherwise
      }
      return calc.Wait() // wait until done
    }
    

    这也被叫做 future/promise 。Wiki 的解释:

    它们描述了一个对象对于结果扮演了代理的角色,而这在一开始是不可预知的,通常是由于它的值尚未完成计算造成的。

    我已经听过太多人说 go 的 future 很简单:

    f := make(chan int, 1)
    

    这是错误的,因为所有的等待者都应当得到结果(译注:实现 channel 的订阅与变化值的广播确实另人头大)。而这个版本也是错的:

    [C]
    f := make(chan int, 1)
    v <- f
    f <- v
    // 在这里使用 v
    [/C]

    由于不可能用这个方法管理资源。所以,当某个家伙在他的代码里丢了 f <- v 那部分,我希望你能幸运的发现这个 bug。

    将数据直接发给所有的等待者来实现 promise 没那么复杂(我不确定这段代码是不是有 bug):

    type PromiseDelivery chan interface{}
    type Promise struct {
        sync.RWMutex
        value interface{}
        waiters []PromiseDelivery
    }
    
    func (p *Promise) Deliver(value interface{}) {
        p.Lock()
        defer p.Unlock()
        p.value = value
        for _, w := range p.waiters {
            locW := w
            go func(){
                locW <- value
            }()
        }
    }
    
    func (p *Promise) Value() interface{} {
        if p.value != nil {
            return p.value
        }
    
        delivery := make(PromiseDelivery)
        p.waiters = append(p.waiters, delivery)
        return <-delivery
    }
    
    func NewPromise() *Promise {
        return &Promise;{
            value: nil,
            waiters: []PromiseDelivery{},
        }
    }
    

    如何使用他呢?

    p := NewPromise()
    go func(){
      p.Deliver(42)
    }()
    p.Value().(int) // 阻塞,当有值的时候返回 interface{} 
    

    不过这里有 interface{} 和类型转换。我实际上想要什么呢?

    // 在那些经过良好测试的库,甚至 stdlib 中
    type PromiseDelivery[T] chan T
    type Promise[T] struct {
        sync.RWMutex
        value T
        waiters []PromiseDelivery[T]
    }
    func (p *Promise[T]) Deliver(value T)
    func (p *Promise[T]) Value() T
    func NewPromise[T]() *Promise[T]
    
    // 我的代码:
    v := NewPromise[int]()
    go func(){
      v.Deliver("woooow!") // 错误
      v.Deliver(42)
    }()
    v.Value() // 阻塞并返回 42,而不是 interface{}
    

    是的,当然,没有人需要泛型。我在讨论什么鬼玩意啊?

    也可以通过使用 select 来避免 p.Lock() 来监听 deliver,并在一个 goroutine 中 wait 操作。还可以引入对最终用户极为有用的 .ValueWithTimeout 方法。还有许多许多其他“你可以……”。尽管我们实际上是在讨论一个 20 行的代码(它的长度可能在每次你发现 future/promise 交互更多细节的时候就开始增长了)。我真得需要知道(或想到) channel 为我传递值吗?不!

    pub/sub

    假设我们想要构建一个实时服务。那么我们的客户端现在可以开启一个 websocket 连接,传递 q=CA 请求,并即刻获得加利福尼亚的天气变化情况。它看起来应该像:

    // deliverer
    calculation.WhenDone(func(state string, w Weather) {
      broker.Publish("CA", w)
    })
    
    // client
    ch := broker.Subscribe("CA")
    for update := range ch {
      w.Write(update.Serialize())
    }
    

    这是一个典型的 pub/sub(译注:公告/订阅)。你可以从高级 Go 模式的演讲中学习它,甚至可以找到即刻可用的实现。问题是,它们全都基于接口的。

    有没有可能实现:

    broker := NewBroker[String, Weather]()
    // so that
    broker.Subs(42) // compilation failure
    // and
    broker.Subs("CA") // returns (chan Weather) not (chan interface{})
    

    当然!如果你能勇敢的在项目之间复制粘贴代码,并到处进行修改。

    map/filter

    假设希望给与我们的用户更多的弹性,从而引入了新的查询参数:show,它的值可以是 all|temp|wind|icon。

    可能你可以从基础开始:

    ch := broker.Subscribe("CA")
    for update := range ch {
      temps := []Temp
      for _, t := update.Temp {
        temps = append(temps, t)
      }
    
      w.Write(temps)
    }
    

    不过,在写了 10 个这样的方法之后,你会意识到它没那么模块化,并且也很无聊。可能你需要:

    ch := broker.Subscribe("CA").Map(func(w Weather) Temp { return w.Temp })
    for update := range ch {
      w.Write(update)
    }
    

    等等,我有提过 channel 是一个 functor(译注:函子)吗?跟 future/promise 一样。

    p := NewPromise().Map(func(w Weather) Temp { return w.Temp })
    go func(){
      p.Deliver(Weather{Temp{42}})
    }()
    p.Value().(Temp) // Temp, not Weather
    

    这意味着我重用了 future 的 channel 的相同代码。你也可以用想 transducers 这样的东西来完成它。我经常在 ClojureScript 的代码中使用的技巧:

    (->> (send url) ;; returns chan, put single value to it {:status 200 :result 42} when ready
         (async/filter< #(= 200 (:status %))) ;; check that :status is 200
         (async/map< :result)) ;; expose only 42 to end user
    ;; note, that it will close all channels (including implicit intermediate one) properly
    

    当我可以简单的进行 x.Map(transformation) 并得到相同类型的值的时候,我真得需要关心 x 是个 channel 还是个 future 吗?在这个例子里,为什么允许我创建 make(chan int) 而不能创建 make(Future int) 呢?

    Request/Reply

    假设我们的用户喜欢这个服务,并且频繁的使用它。那么就需要引入一些简单的 API 限制:每天、每个 IP 请求的数量。收集这个数量保存在一个 map[string]int 中很简单。Go 的文档说“不要通过共享内存来通讯,用通讯来共享内存”。好吧,听起来是个好主意。

    req := make(chan string)
    go func() { // wow, look here - it's an actor!
      m := map[string]int{}
      for r := range req {
        if v, ok := m[r]; !ok {
          m[r] = 1
        } else {
          m[r] = v + 1
        }
      } 
    }()
    
    go func() {
      req <- "127.0.0.2"
    }()
    
    go func() {
      req <- "127.0.0.1"
    }()
    

    这很容易。现在可以计算每个 IP 请求的数量了。不但如此……同时也可以要求执行请求需要权限。

    type Req struct {
      ip string
      resp chan int
    }
    
    func NewRequest(ip string) *Req {
      return &Req;{ip, make(chan int)}
    }
    
    requests := make(chan *Req)
    
    go func() {
      m := map[string]int{}
      for r := range requests {
        if v, ok := m[r.ip]; !ok {
          m[r.ip] = 1
        } else {
          m[r.ip] = v + 1
        }
        r.resp <- m[r.ip]
      } 
    }()
    
    go func() {
      r := NewRequest("127.0.0.2")
      requests <- r
      fmt.Println(<- r.resp)
    }()
    
    go func() {
      r := NewRequest("127.0.0.1")
      requests <- r
      fmt.Println(<- r.resp)
    }()
    

    我不会再问你要泛型的解决方案(没有写死的 string 和 int)。换而言之,我希望你检查一下这段代码中是不是都正确?真得这么简单吗?

    你确定 r.resp <- m[r.ip] 是个好办法?不,肯定不是!我希望有任何人等待那些很慢的客户端。是吗?而如果我有许多很慢的客户端的时候会怎么样呢?可能我需要对此进行一些处理。

    而 requests <- r 这部分简单吗?如果我的 actor(服务器)过载无法响应的时候呢?可能我需要在这里处理超时……

    时不时的我就需要特定的初始化和清理过程……都需要超时机制。并且需要能保持请求,直到初始化完成。

    那么调用的优先权呢?例如,当我需要为了分析系统实现 Dump 方法,但是又不想让所有的用户暂停来收集需要的数据。

    还有……看看 Erlang 中的 gen_server。为了保险起见,我希望它一被实现就可以是具有良好的文档的,经过高度覆盖测试的库。98% 的情况下,我不希望看到这样的介绍:make(chan int, ?) 而我不想思考到底我应该将 ? 替换成多少。

    99% 的情况下,我其实并不关心响应是由 channel 传递的,还是一只魔法独角兽从它的角上带来的。

    数不胜数

    还有许多其他常见的并发的情况。我想你已经明白了。

    苦难

    你可以说,这些模式都不常见。不过……我在我的项目中不得不实现它们中的大多数。每!一!次!可能我不怎么走运,而你的项目会跟写给初学者的指南一样简单。

    我知道,你们中的大多数会说“世界是艰辛的,编程是苦难的”。我会继续打击你:至少有一些语言展示了部分解决这些问题的示例。至少,在尝试解决它。Haskell 和 Scala 的类型系统提供了构建强大的高级抽象的能力,甚至自定义控制流来处理并发。而另一阵营的 Clojure 利用动态类型鼓励和共享高级的抽象。Rust 有 channel 和泛型。

    让它工作 -> 让它优雅 -> 让它可重用。

    现在,第一步已经完成。接下来呢?不要误会,go 是一个有远见的语言:channel 和 goroutine 比起例如 pthread 来说更好,不过是不是真得就停留在此?

    补充:构建 Twitter 分析器

    关于真实的流水作业。

    你可能已经看过 Twitter 的分析了,它真得很棒。假设它尚未出现,而我们需要自己的分析工具:提供一个用户名,来统计有多少用户看过(至少是理论上)他的 tweet。应该如何做呢?其实不难:读取用户的时间轴,过滤掉所有的 retweet 和回复,然后请求其他 tweet 的 retweeter,为每个 retweeter 请求 follower 的列表,合并所有 retweeter 的 follower 在一起,然后加上这个用户的 follower。对于这个步骤我想要的结果是:
    map[TweetId][]Username(retweeter)和 map[Username][]Username。这些用于构造一个向请求者展示的魔幻的表格是足够了。

    有一些技术细节你应当留意:

        Twitter API 需要每个调用都使用 OAuth,并且设定了很强的限制(每个用户每 15 分钟 450 次调用)。为了对付这个限制,我们将用预定义的一个 OAuth token 列表(例如 50 个)组织在一个池中供 worker 使用,每个 worker 在达到限制之前都可以让自己休息一会。
        大多数 Twitter API 调用通过 since_id 或 max_id 使用了结果分页。因此你不能依赖一个请求就可以获取完整的结果。

    一个粗糙的实现的例子。注意,你没必要理解这个文件中所有的内容。相反,如果你无法理解的话,这恰恰说明我们做对了。

    那么我们现在有什么?

    • 一些步骤的计算:TimelineReader -> RetweetersReader -> FollowersReader -> FinalReducer。
    • 自供消息。由于分页所有阶段都是递归的。这意味着每个步骤都会向下一个阶段和其本身发出消息。在这个情况下,很难处理取消的情况。甚至无法发现某个步骤的工作全部完成。
    • 尽早传播。至少有两种情况:首先为了通过 TweetId 来收集 []Username,我们需要将收集到的信息直接从 RetweetersReader 发送到 FinalReducer。然后,一开始我们就知道,需要获得初始用户的 follower,因此他的用户名应当从 TimelineReader 传递到
      RetweetersReader 步骤。
    • 中间收缩。FollowersReader 不只是一个管道。它会过滤我们已经见过的用户名(因为总不想重复工作吧)。
    • 持续工作的 worker。在许多情况下,你无法等待 worker 退出。例如,当你实现了一个服务,它会同时响应许多客户端的时候。


沪ICP备19023445号-2号
友情链接