IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    高效I/O并发处理:双缓冲和Exchanger

    smallnest发表于 2024-01-14 08:58:00
    love 0

    双缓冲(double buffering)是高效处理I/O操作的一种并发技术,它使用两个buffer,一个goroutine使用其中一个buffer进行写,而另一个goroutine使用另一个buffer进行读,然后进行交换。这样两个goroutine可能并发的执行,减少它们之间的等待和阻塞。

    本文还提供了一个类似Java的java.util.concurrent.Exchanger的Go并发原语,它可以用来在两个goroutine之间交换数据,快速实现双缓冲的模式。 这个并发原语可以在github.com/smallnest/exp/sync/Exchanger找到。

    double buffering 并发模式

    双缓冲(double buffering)设计方式虽然在一些领域中被广泛的应用,但是我还没有看到它在并发模式中专门列出了,或者专门列为一种模式。这里我们不妨把它称之为双缓存模式。

    这是一种在I/O处理领域广泛使用的用来提速的编程技术,它使用两个缓冲区来加速计算机,该计算机可以重叠 I/O 和处理。一个缓冲区中的数据正在处理,而下一组数据被读入另一个缓冲区。
    在流媒体应用程序中,一个缓冲区中的数据被发送到声音或图形卡,而另一个缓冲区则被来自源(Internet、本地服务器等)的更多数据填充。
    当视频显示在屏幕上时,一个缓冲区中的数据被填充,而另一个缓冲区中的数据正在显示。当在缓冲区之间移动数据的功能是在硬件电路中实现的,而不是由软件执行时,全动态视频的速度会加快,不但速度被加快,而且可以减少黑屏闪烁的可能。

    !

    在这个模式中,两个goroutine并发的执行,一个goroutine使用一个buffer进行写(不妨称为buffer1),而另一个goroutine使用另一个buffer进行读(不妨称为buffer2)。如图所示。
    当左边的writer写满它当前使用的buffer1后,它申请和右边的goroutine的buffer2进行交换,这会出现两种情况:

    • 右边的reader已经读完了它当前使用的buffer2,那么它会立即交换,这样左边的writer可以继续写buffer2,而右边的reader可以继续读buffer1。
    • 右边的reader还没有读完buffer2,那么左边的writer就会阻塞,直到右边的reader读完buffer2,然后交换。
      周而复始。

    同样右边的goroutine也是同样的处理,当它读完buffer2后,它会申请和左边的goroutine的buffer1进行交换,这会出现两种情况:

    • 左边的writer已经写完了它当前使用的buffer1,那么它会立即交换,这样右边的reader可以继续读buffer1,而左边的writer可以继续写buffer2。
    • 左边的writer还没有写完buffer1,那么右边的reader就会阻塞,直到左边的writer写完buffer1,然后交换。
      周而复始。

    这样两个goroutine就可以并发的执行,而不用等待对方的读写操作。这样可以提高并发处理的效率。

    不仅仅如此, double buffering其实可以应用于更多的场景, 不仅仅是buffer的场景,如Java的垃圾回收机制中,HotSpot JVM把年轻代分为了三部分:1个Eden区和2个Survivor区(分别叫from和to,或者s0和s1),在GC开始的时候,对象只会存在于Eden区和名为“From”的Survivor区,Survivor区“To”是空的。紧接着进行GC,Eden区中所有存活的对象都会被复制到“To”,而在“From”区中,仍存活的对象会根据他们的年龄值来决定去向。年龄达到一定值的对象会被移动到年老代中,没有达到阈值的对象会被复制到“To”区域。经过这次GC后,Eden区和From区已经被清空。这个时候,“From”和“To”会交换(exchange)他们的角色,也就是新的“To”就是上次GC前的“From”,新的“From”就是上次GC前的“To”。不管怎样,都会保证名为To的Survivor区域是空的。Minor GC会一直重复这样的过程,直到“To”区被填满,“To”区被填满之后,会将所有对象移动到年老代中。

    Exchanger的实现

    既然有这样的场景,有这样的需求,所以我们需要针对这样场景的一个同步原语。Java给我们做了一个很好的师范,接下来我们使用实现相应的Go,但是我们的实现和Java的实现完全不同,我们要基于Go既有的同步原语来实现。

    基于Java实现的Exchanger的功能,我们也实现一个Exchanger, 我们期望它的功能如下:

    • 只用作两个goroutine之间的数据交换,不支持多个goroutine之间的数据交换。
    • 可以重用。交换完之后还可以继续交换
    • 支持泛型,可以交换任意类型的数据
    • 如果对端还没有准备交换,就阻塞等待
    • 在交换完之前,阻塞的goroutine不可能调用Exchange方法两次
    • Go内存模型补充: 同一次交换, 一个goroutine在调用Exchange方法的完成,一定happens after另一个goroutine调用Exchange方法的开始。

    如果你非常熟悉Go的各种同步原语,你可以很快的组合出这样一个同步原语。如果你还不是那么熟悉,建议你阅读《深入理解Go并发编程》这本书,京东有售。
    下面是一个简单的实现,代码在Exchanger。
    我们只用left、right指代这两个goroutine, goroutine是Go语言中的并发单元,我们期望的就是这两个goroutine发生关系。

    为了跟踪这两个goroutine,我们需要使用goroutine id来标记这两个goroutine,这样避免了第三者插入。

    1
    2
    3
    4
    type Exchanger[T any] struct {
    leftGoID, rightGoID int64
    left, right chan T
    }

    你必须使用 NewExchanger 创建一个Exchanger,它会返回一个Exchanger的指针。
    初始化的时候我们把left和right的id都设置为-1,表示还没有goroutine使用它们,并且不会和所有的goroutine的id冲突。
    同时我们创建两个channel,一个用来左边的goroutine写,右边的goroutine读,另一个用来右边的goroutine写,左边的goroutine读。channel的buffer设置为1,这样可以避免死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    func NewExchanger[T any]() *Exchanger[T] {
    return &Exchanger[T]{
    leftGoID: -1,
    rightGoID: -1,
    left: make(chan T, 1),
    right: make(chan T, 1),
    }
    }

    Exchange方法是核心方法,它用来交换数据,它的实现如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (e *Exchanger[T]) Exchange(value T) T {
    goid := goroutine.ID()
    // left goroutine
    isLeft := atomic.CompareAndSwapInt64(&e.leftGoID, -1, goid)
    if !isLeft {
    isLeft = atomic.LoadInt64(&e.leftGoID) == goid
    }
    if isLeft {
    e.right <- value // send value to right
    return <-e.left // wait for value from right
    }
    // right goroutine
    isRight := atomic.CompareAndSwapInt64(&e.rightGoID, -1, goid)
    if !isRight {
    isRight = atomic.LoadInt64(&e.rightGoID) == goid
    }
    if isRight {
    e.left <- value // send value to left
    return <-e.right // wait for value from left
    }
    // other goroutine
    panic("sync: exchange called from neither left nor right goroutine")
    }

    当一个goroutine调用的时候,首先我们尝试把它设置为left,如果成功,那么它就是left。
    如果不成功,我们就判断它是不是先前已经是left,如果是,那么它就是left。
    如果先前,或者此时left已经被另一个goroutine占用了,它还有机会成为right,同样的逻辑检查和设置right。

    如果既不是left也不是right,那么就是第三者插入了,我们需要panic,因为我们不希望第三者插足。

    如果它是left,那么它就会把数据发送到right,然后等待right发送数据过来。
    如果它是right,那么它就会把数据发送到left,然后等待left发送数据过来。

    这样就实现了数据的交换。

    Exchanger的使用

    我们使用一个简单的双缓冲例子来说明如何使用Exchanger,我们创建两个goroutine,一个goroutine负责写,另一个goroutine负责读,它们之间通过Exchanger来交换数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    buf1 := bytes.NewBuffer(make([]byte, 1024))
    buf2 := bytes.NewBuffer(make([]byte, 1024))
    exchanger := syncx.NewExchanger[*bytes.Buffer]()
    var wg sync.WaitGroup
    wg.Add(2)
    expect := 0
    go func() { // g1
    defer wg.Done()
    buf := buf1
    for i := 0; i < 10; i++ {
    for j := 0; j < 1024; j++ {
    buf.WriteByte(byte(j / 256))
    expect += j / 256
    }
    buf = exchanger.Exchange(buf)
    }
    }()
    var got int
    go func() { // g2
    defer wg.Done()
    buf := buf2
    for i := 0; i < 10; i++ {
    buf = exchanger.Exchange(buf)
    for _, b := range buf.Bytes() {
    got += int(b)
    }
    buf.Reset()
    }
    }()
    wg.Wait()
    fmt.Println(got)
    fmt.Println(expect == got)

    在这个例子中 g1负责写,每个buffer的容量是1024,写满就交给另外一个读g2,并从读g2中交换过来一个空的buffer继续写。
    交换10次之后,两个goroutine都退出了,我们检查写入的数据和读取的数据是否一致,如果一致,那么就说明我们的Exchanger实现是正确的。

    总结

    文本介绍了一种类似Java的Exchanger的同步原语的实现,这个同步原语可以在双缓冲的场景中使用,提高并发处理的性能。



沪ICP备19023445号-2号
友情链接