IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Go中秘而不宣的数据结构 spmc, 10倍性能于 channel

    smallnest发表于 2024-10-20 04:19:52
    love 0

    Go 标准库和运行中中,有一些专门针对特定场景优化的数据结构,这些数据结构并没有暴露出来,这个系列就是逐一介绍这些数据结构。

    这一次给大家介绍的就是一个 lock-free、高性能的单生产者多消费者的队列:PoolDequeue 和 PoolChain。
    到底是一个还是两个呢?
    主要是 PoolDequeue, 它是一个固定尺寸,使用 ringbuffer (环形队列) 方式实现的队列。
    PoolChain 是在它的基础上上,实现的一个动态尺寸的队列。

    生产者消费者模式是常见的一种并发模式,根据生产者的数量和消费者的数量,可以分为四种情况:

    • 单生产者-单消费者模式: spsc
    • 单生产者-多消费者模式: spmc
    • 多生产者-单消费者模式: mpsc
    • 多生产者-多消费者模式: mpmc

    Channel 基本上可以看做是一种多生产者多消费者模式的队列。可以同时允许多个生产者发送数据,有可以允许多个消费者消费数据,它也可以应用在其他模式的场景,比如 rpc 包中的 oneshot 模式、通知情况下的的单生产者多消费者模式、rpc 和服务端单连接通讯时的消息处理,就是多生产者单消费者模式。

    但是 Go 标准库的 sync 包下,有一个针对单生产者多消费者的数据结构,它是一个 lock-free 的数据结构,针对这个场景做了优化,被使用在 sync.Pool 中。

    sync.Pool 采用了一种类似 Go 运行时调度的机制,针对每个 p 有一个 private 的数据,同时还有一个 shared 的数据,如果在本地 private、shared 中没有数据,就去其他 P 对应的 shared 去偷取。难么同时可能有多个 P 偷取同一个 shared, 这是多消费者。

    同时对 shared 的写只有它隶属的 p 执行 Put 的时候才会发生:

    1
    2
    3
    4
    5
    6
    7
    l, _ := p.pin()
    if l.private == nil {
    l.private = x
    } else {
    l.shared.pushHead(x)
    }
    runtime_procUnpin()

    这有属于单生产者模式。sync.Pool 使用了 PoolDequeue 和 PoolChain 来做优化。

    首先我们先来了解 poolDequeue。

    poolDequeue

    poolDequeue 是一个 lock-free 的数据结构,必然会使用 atomic, 同时它要求必须使用单生产者,否则会有并发问题。消费者可以是并发多个,当然你用一个也没问题。

    其中,生产者可以使用下面的方法:

    • pushHead: 在队列头部新增加一个数据。如果队列满了,增加失败
    • popHead: 在队列头部弹出一个数据。生产者总是弹出新增加的数据,除非队列为空

    消费者可以使用下面的一个方法:

    • popTail: 从队尾处弹出一个数据,除非队列为空。所以消费者总是消费最老的数据,这也正好符合大部分的场景

    接下来就是分析代码了,有点枯燥,你可以跳过。

    代码分析

    首先我们看这个struct的定义:

    1
    2
    3
    4
    type poolDequeue struct {
    headTail atomic.Uint64
    vals []eface
    }

    这里有两个重要的字段:

    • headTail: 一个 atomic.Uint64 类型的字段,它的高 32 位是 head,低 32 位是 tail。head 是下一个要填充的位置,tail 是最老的数据的位置。
    • vals: 一个 eface 类型的切片,它是一个环形队列,大小必须是 2 的幂次方。

    生产者增加数据的逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func (d *poolDequeue) pushHead(val any) bool {
    ptrs := d.headTail.Load()
    head, tail := d.unpack(ptrs)
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
    // 队列满
    return false
    }
    slot := &d.vals[head&uint32(len(d.vals)-1)]
    // 检查 head slot 是否被 popTail 释放
    typ := atomic.LoadPointer(&slot.typ)
    if typ != nil {
    // 另一个 goroutine 正在清理 tail,所以队列还是满的
    return false
    }
    // 如果值为空,那么设置一个特殊值
    if val == nil {
    val = dequeueNil(nil)
    }
    // 队列头是空的,将数据写入 slot
    *(*any)(unsafe.Pointer(slot)) = val // ①
    // 增加 head,这样 popTail 就可以消费这个 slot 了
    // 同时也是一个 store barrier,保证了 slot 的写入
    d.headTail.Add(1 << dequeueBits)
    return true
    }

    ① 处会有并发问题吗?万一有两个 goroutine 同时执行到这里,会不会有问题?这里没有问题,因为要求只有一个生产者,不会有另外一个goroutine同时写这个槽位。

    注意它还实现了pack和unpack方法,用于将 head 和 tail 打包到一个 uint64 中,或者从 uint64 中解包出 head 和 tail。

    消费者消费数据的逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    func (d *poolDequeue) popTail() (any, bool) {
    var slot *eface
    for { // ②
    ptrs := d.headTail.Load()
    head, tail := d.unpack(ptrs)
    if tail == head {
    // 队列为空
    return nil, false
    }
    // 确认头部和尾部(用于我们之前的推测性检查),并递增尾部。如果成功,那么我们就拥有了尾部的插槽。
    ptrs2 := d.pack(head, tail+1)
    if d.headTail.CompareAndSwap(ptrs, ptrs2) {
    // 成功读取了一个 slot
    slot = &d.vals[tail&uint32(len(d.vals)-1)]
    break
    }
    }
    // 剩下来就是读取槽位的值
    val := *(*any)(unsafe.Pointer(slot))
    if val == dequeueNil(nil) { // 如果本身就存储的nil
    val = nil
    }
    // 释放 slot,这样 pushHead 就可以继续写入这个 slot 了
    slot.val = nil // ③
    atomic.StorePointer(&slot.typ, nil) // ④
    return val, true
    }

    ② 处是一个 for 循环,这是一个自旋的过程,直到成功读取到一个 slot 为止。在有大量的goroutine的时候,这里可能会是一个瓶颈点,但是少量的消费者应该还不算大问题。

    ③ 和 ④ 处是释放 slot 的过程,这样生产者就可以继续写入这个 slot 了。

    生产者还可以调用popHead方法,用来弹出刚刚压入还没有消费的数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    func (d *poolDequeue) popHead() (any, bool) {
    var slot *eface
    for {
    ptrs := d.headTail.Load()
    head, tail := d.unpack(ptrs)
    if tail == head {
    // 队列为空
    return nil, false
    }
    // 确认头部和尾部(用于我们之前的推测性检查),并递减头部。如果成功,那么我们就拥有了头部的插槽。
    head--
    ptrs2 := d.pack(head, tail)
    if d.headTail.CompareAndSwap(ptrs, ptrs2) {
    // 成功取回了一个 slot
    slot = &d.vals[head&uint32(len(d.vals)-1)]
    break
    }
    }
    val := *(*any)(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
    val = nil
    }
    // 释放 slot,这样 pushHead 就可以继续写入这个 slot 了
    *slot = eface{}
    return val, true
    }

    这是一个固定大小的队列,如果队列满了,生产者就会失败。这个队列的大小是 2 的幂次方,这样可以用 & 来取模,而不用 %,这样可以提高性能。

    PoolChain

    PoolChain 是在 PoolDequeue 的基础上实现的一个动态尺寸的队列,它的实现和 PoolDequeue 类似,只是增加了一个 headTail 的链表,用于存储多个 PoolDequeue。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    type poolChain struct {
    // head 是生产者用来push的 poolDequeue。只有生产者访问,所以不需要同步
    head *poolChainElt
    // tail 是消费者用来pop的 poolDequeue。消费者访问,所以需要原子操作
    tail atomic.Pointer[poolChainElt]
    }
    type poolChainElt struct {
    poolDequeue
    // next由生产者原子写入,消费者原子读取。它只能从nil转换为非nil。
    // prev由消费者原子写入,生产者原子读取。它只能从非nil转换为nil。
    next, prev atomic.Pointer[poolChainElt]
    }

    考虑到文章中代码过多,大家就会感觉很枯燥了,我就不具体展示代码了,你可以在 https://github.com/golang/go/blob/master/src/sync/poolqueue.go#L220-L302 查看具体的实现。
    整体的思想就是将多个poolDequeue串联起来,生产者在head处增加数据,消费者在tail处消费数据,当tail的poolDequeue为空时,就从head处获取一个poolDequeue。
    当head满了的时候,就增加一个新的poolDequeue。
    这样就实现了动态尺寸的队列。

    sync.Pool中就是使用的PoolChain来实现的,它是一个单生产者多消费者的队列,可以同时有多个消费者消费数据,但是只有一个生产者生产数据。

    为了能将这个数据结构暴露出来使用,我把相关的代码复制到 https://github.com/smallnest/exp/blob/master/gods/poolqueue.go , 增加了单元测试和性能测试的代码。

    你可以学到这个方法,使用类似的技术,创建一个 look-free 无线长度的 byte buffer。在一些 Go 的网络优化库中就使用这种方法,避免频繁的 grow 和 copy 既有数据。

    与channel的性能比较

    我们来看一下poolDequeue、PoolChain和channel的性能对比。
    我们使用一个goroutine进行写入,10个goroutine进行读取:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    package gods
    import (
    "sync"
    "testing"
    )
    func BenchmarkPoolDequeue(b *testing.B) {
    const size = 1024
    pd := NewPoolDequeue(size)
    var wg sync.WaitGroup
    // Producer
    go func() {
    for i := 0; i < b.N; i++ {
    pd.PushHead(i)
    }
    wg.Done()
    }()
    // Consumers
    numConsumers := 10
    wg.Add(numConsumers + 1)
    for i := 0; i < numConsumers; i++ {
    go func() {
    for {
    if _, ok := pd.PopTail(); !ok {
    break
    }
    }
    wg.Done()
    }()
    }
    wg.Wait()
    }
    func BenchmarkPoolChain(b *testing.B) {
    pc := NewPoolChain()
    var wg sync.WaitGroup
    // Producer
    go func() {
    for i := 0; i < b.N; i++ {
    pc.PushHead(i)
    }
    wg.Done()
    }()
    // Consumers
    numConsumers := 10
    wg.Add(numConsumers + 1)
    for i := 0; i < numConsumers; i++ {
    go func() {
    for {
    if _, ok := pc.PopTail(); !ok {
    break
    }
    }
    wg.Done()
    }()
    }
    wg.Wait()
    }
    func BenchmarkChannel(b *testing.B) {
    ch := make(chan interface{}, 1024)
    var wg sync.WaitGroup
    // Producer
    go func() {
    for i := 0; i < b.N; i++ {
    ch <- i
    }
    close(ch)
    wg.Done()
    }()
    // Consumers
    numConsumers := 10
    wg.Add(numConsumers + 1)
    for i := 0; i < numConsumers; i++ {
    go func() {
    for range ch {
    }
    wg.Done()
    }()
    }
    wg.Wait()
    }

    运行这个benchmark,我们可以看到poolDequeue和PoolChain的性能要比channel高很多,大约是channel的10倍。
    poolDequeue 比 PoolChain 要好一些,性能是后者的两倍。



沪ICP备19023445号-2号
友情链接