IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Go运行时的并发原语

    smallnest发表于 2024-01-07 11:32:15
    love 0

    这篇文章我们来了解一下隐藏在Go运行时中的一些并发原语, 因为运行时是底座和包循环依赖等原因,运行时中很少使用标准库中的并发原语,它有自己的并发原语。

    mutex

    在runtime/runtime2.go 定义了一个互斥锁,它的定义如下:

    1
    2
    3
    4
    type mutex struct {
    lockRankStruct
    key uintptr
    }

    它可是运行时中的大红人了,在很多数据结构中都被广泛的使用,凡事涉及到并发访问的地方都会用到它,你在runtime2.go文件中就能看到多处使用它的地方,因为很多地方都在使用它,我就不一一列举了在runtime这个文件夹中搜mutex这个关键子就都搜出来了。

    举一个大家常用来底层分析的数据结构channel为例,channel的数据结构定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    type hchan struct {
    qcount uint
    dataqsiz uint
    buf unsafe.Pointer
    elemsize uint16
    closed uint32
    elemtype *_type
    sendx uint
    recvx uint
    recvq waitq
    sendq waitq
    lock mutex
    }

    最后哪个字段lock mutex就是使用的这个互斥锁。因为一个通道在发送和接收的时候都会涉及到对通道的修改,在多发送者或者接收者情况下,需要使用互斥锁来保护。

    这个互斥锁的使用需要调用几个函数。

    • lockInit: 需要初始化这个锁,比如在channel的实现中,有如下的初始化代码:lockInit(&c.lock, lockRankHchan), 它将lock初始化(lockInit)时设置锁的等级(rank)。如果不明确去初始化一个锁,那么可以在调用lock自身的时候通过lockWithRank指定这个锁的等级。这个等级在启用GOEXPERIMENT=staticlockranking用来加强锁的静态分析。
    • lock: 加锁,在不同的操作系统下有不同的实现。如channel使用这个代码进行加锁:lock(&c.lock)
    • unlock: 解锁,在不同的操作系统下有不同的实现。如channel使用这个代码进行解锁:unlock(&c.lock)

    我在Go运行时中的 Mutex中详细介绍了它,这里就不再赘述了。

    rwmutex

    运行时中还实现了读写锁rwmutex。
    这个读写锁完全是从sync.RWMutex中拷贝过来的,只是将sync.RWMutex中的sync包替换成了runtime包,因为sync包依赖了runtime包,所以不能直接使用。

    你看它的数据结构定义和sync.RWMutex几乎是一样的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    type rwmutex struct {
    rLock mutex // protects readers, readerPass, writer
    readers muintptr // list of pending readers
    readerPass uint32 // number of pending readers to skip readers list
    wLock mutex // serializes writers
    writer muintptr // pending writer waiting for completing readers
    readerCount atomic.Int32 // number of pending readers
    readerWait atomic.Int32 // number of departing readers
    readRank lockRank // semantic lock rank for read locking
    writeRank lockRank // semantic lock rank for write locking
    }

    mutex和rwmutex会直接阻塞M。

    gopark/goready

    在其它编程语言中,会直接提供park和unpark的功能,比如rust,提供对并发单元的更底层的控制。

    park就是停止一会,很形象,就是暂时让并发单元阻塞,不再参与调度,直到unpark它,它才会重新参与调度。

    Go运行时并没有直接提供park和unpark的功能,它提供了gopark和goready的功能,它们的实现在runtime/proc.go。

    gopark会将goroutine放到等待队列中,从调度器的运行队列中移出去,等待被唤醒。
    goready会将goroutine放到可运行队列中,加入到调度器的运行队列,等待被调度。

    note

    note实现一次性的通知机制。

    note的数据结构如下:

    1
    2
    3
    type note struct {
    key uintptr
    }

    可以使用notesleep和notewakeup进行休眠和唤醒。

    就像mutex一样,notesleep会阻塞M,notewakeup会唤醒一个M,并且不会重新调度G和P,而notetsleepg就像一个阻塞的系统调用一样,允许P选择另外一个G运行。
    noteclear用来重置note

    总结一下, 上面几种同步原语阻塞的角色如下:






    阻塞角色
    同步原语GMP
    mutex/rwmutexYYY
    noteYYY/N
    parkYNN

    filelock

    "filelock"(文件锁)通常是指在计算机系统中使用的一种机制,用于确保对文件的独占性访问,以防止多个进程或线程同时修改文件而导致数据不一致或损坏。

    一些应用程序经常利用文件锁,来控制只有一个实例在运行,在linux环境下非常常见,比如mysql等。

    在不同的操作系统和编程语言中,文件锁的实现方式可能会有所不同。一般而言,文件锁可以分为两种主要类型:

    • 共享锁(Shared Lock): 多个进程或线程可以同时获取共享锁,允许它们同时读取文件,但阻止其他进程或线程获取独占锁进行写操作。
    • 独占锁(Exclusive Lock): 只允许一个进程或线程获取独占锁,阻止其他进程或线程同时进行读或写操作。

    文件锁的代码在cmd/go/internal/lockedfile中,我们以Linux为例,看看它的实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    type lockType int16
    const (
    readLock lockType = syscall.LOCK_SH
    writeLock lockType = syscall.LOCK_EX
    )
    func lock(f File, lt lockType) (err error) {
    for {
    err = syscall.Flock(int(f.Fd()), int(lt))
    if err != syscall.EINTR {
    break
    }
    }
    if err != nil {
    return &fs.PathError{
    Op: lt.String(),
    Path: f.Name(),
    Err: err,
    }
    }
    return nil
    }
    func unlock(f File) error {
    return lock(f, syscall.LOCK_UN)
    }

    可以看到它实际是调用系统调用syscall.Flock实现的。

    这不属于运行时内定义的同步原语,但是它给我们提供了一个实现文件锁的思路,它甚至还封装了一个Mutex供我们使用。如果有类似的需求,我们可以参考它的实现。

    sema

    不太清楚Go为啥不在运行时或者标准库sync中实现信号量,而是在扩展包中去实现,信号量可以说是一个非常广泛使用的同步原语了。

    虽然没有在运行时中没有明确实现,但是运行时中的runtime/sema.go提供了与信号量相近功能,而且sync.Mutex严重依赖它。

    这个实现旨在提供一个可以在其他同步原语争用的情况下使用的睡眠和唤醒原语,因此,它的目标与Linux的futex相同,但语义要简单得多。
    Go团队说你不要将这些视为信号量,而是将它们视为一种实现睡眠和唤醒的方式,以确保每个睡眠都与单个唤醒配对,
    这是有历史原因,这些从贝尔实验室出来的大佬,对于先前他们在Plan 9中的一些想法一脉相承的继承下来,这个设计可以参见 Mullender 和 Cox 的Plan 9中的信号量。

    比如sync.Mutex睡眠和唤醒的函数其实就是这里实现的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
    func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
    }
    //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
    func poll_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
    }
    //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
    func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
    }
    //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
    func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes, waitReasonSyncMutexLock)
    }
    // RWMutex使用的一些函数
    ...

    atomic

    atomic 提供原子操作,独立于sync/atomic,仅供运行时使用。

    在大多数平台上,编译器能够识别此包中定义的函数,并用平台特定的内部函数替换它们。在其他平台上,提供了通用的实现。
    除非另有说明,在此包中定义的操作在处理它们所操作的值时对线程是有序一致的(sequentially consistent)。更具体地说,在一个线程上按特定顺序发生的操作,将始终被另一个线程观察到以完全相同的顺序发生。

    因为和特定的CPU架构有关,它的实现针对不同的CPU架构,由不同的指令实现而成,而且基本使用汇编实现,比如AMD64下的Cas实现,使用了LOCK + CMPXCHGL指令:

    1
    2
    3
    4
    5
    6
    7
    8
    TEXT ·Cas(SB),NOSPLIT,$0-17
    MOVQ ptr+0(FP), BX
    MOVL old+8(FP), AX
    MOVL new+12(FP), CX
    LOCK
    CMPXCHGL CX, 0(BX)
    SETEQ ret+16(FP)
    RET

    其实sync/atomic下的实现,也是调用这里的实现,否则维护两套代码就太麻烦了,而且可能出现不一致的现象。你看sync/atomic/asm.s:

    1
    2
    3
    4
    5
    6
    7
    ...
    TEXT ·CompareAndSwapInt64(SB),NOSPLIT,$0
    JMP runtime∕internal∕atomic·Cas64(SB)
    TEXT ·CompareAndSwapUint64(SB),NOSPLIT,$0
    JMP runtime∕internal∕atomic·Cas64(SB)
    ...

    它也是调用untime∕internal∕atomic下对应的函数。

    singleflight

    singleflight特别适合大并发情况下许多请求做同一件事情的场景,这个时候只处理一个请求就可以了,其它请求等待那一个请求的结果,这样对下游的压力大大减少,比如在读取cache的时候。

    因为它在特定场景下很有用,Go的扩展库中也同样实现了它。

    它没有定义在运行时中,而是定义在internal/singleflight中。

    比如在包net中,我们查找一台主机的IP地址时,如果并发的请求,对资源是很大的浪费,这个时候我们只让一个请求处理就好了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    type Resolver struct {
    ...
    // lookupGroup merges LookupIPAddr calls together for lookups for the same
    // host. The lookupGroup key is the LookupIPAddr.host argument.
    // The return values are ([]IPAddr, error).
    lookupGroup singleflight.Group
    }
    func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
    ...
    ch := r.getLookupGroup().DoChan(lookupKey, func() (any, error) {
    return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
    })
    ...
    }


沪ICP备19023445号-2号
友情链接