IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [Zig]使用 std.Thread.Pool 线程池

    罗佳(博主)发表于 2024-10-29 17:08:17
    love 0

    我发现Zig的官方文档里似乎没有写这个`std.Thread.Pool`的具体用例,所以写一篇笔记给大家参考一下。

    这个Zig标准库中的线程池主要用到两个部分,一个是线程池本身的结构体`std.Thread.Pool`,一个是用于等待任务结束的`std.Thread.WaitGroup`(等待组)结构体。等待组的作用是把一组相关的任务放在一起,之后可以使用特定方法等待它们全部完成。

    流程:

    • 在线程池初始化好了之后在创建线程任务时给pool.spawnWg方法传入指定的WaitGroup和要执行的函数以及参数
    • 分配任务时线程池就会开始执行任务
    • →随时使用`pool.waitAndWork`方法阻塞当前线程,等待WaitGroup中所有任务完成
    • ↘或者在任何时候使用`wait_group.isDone()`方法检查任务是否全部完成,如果没有完成可以让主线程去做其他事情。

    以下直接写一个示例,内容依然是在我前面的笔记中已经出现了好几次的多线程数据竞争示例。

    const std = @import("std");
    const debug = std.debug;
    const Pool = std.Thread.Pool;
    const Thread = std.Thread;
    const WaitGroup = std.Thread.WaitGroup;
    
    const print = debug.print;
    
    const NUM_THREADS = 2;
    
    var sum: u64 = 0;//让几个线程分别把这个sum自增,完成后sum应当由于非加锁自增导致其值为一个随机数
    pub fn sumTask(idx: usize) void {
        const tid = Thread.getCurrentId(); // 获取线程真实的id
        print("Thread {} (idx:{}) : starting...\n", .{ tid, idx });
        var timer = std.time.Timer.start() catch unreachable; //获取一个计时器,用于之后计算线程执行时间
        const y = ∑
        for (0..50000000) |_| {
            y.* += 1;
        }
        print("Thread {} (idx:{}) : done in {}ms.\n", .{ tid, idx, timer.read() / 1000000 });
    }
    test "threadpool" {
        const tid = Thread.getCurrentId(); // 获取线程真实的id
        print("Main thread {} start\n", .{tid});
        var gpa = std.heap.GeneralPurposeAllocator(.{}){};
        const allocator = gpa.allocator();
        var thread_pool: Pool = undefined;
        try thread_pool.init(.{ //初始化线程池
            .allocator = allocator,
            .n_jobs = NUM_THREADS, //线程数
        });
        defer thread_pool.deinit();
        var wait_group: WaitGroup = undefined;
        wait_group.reset(); //初始化等待组
        //分配10个任务,线程池内的空闲线程会分别完成这些任务
        for (0..10) |idx| {
            thread_pool.spawnWg(
                &wait_group,//传入等待组
                sumTask,//传入任务函数
                .{idx},//任务函数的参数元组
            );
        }
        thread_pool.waitAndWork(&wait_group); //等待以上分配的任务全部完成
        print("Main: sum = {}\n", .{sum});
        print("Main: program completed. Exiting.\n", .{});
    }
    

    需要注意的是,如果在分配完任务后立刻使用`thread_pool.waitAndWork`等待任务完成(像上面的示例中一样,也就是在任务没有全部完成时就开始等待),你会发现虽然设置的线程数是2,理论上任务应该两个两个一起执行,但实际上却是3个任务一起执行的,因为调用这个等待方法的线程也执行了任务。我当时也疑惑了一会儿,然后想明白了,既然等待线程已经等在那里了,那么与其干等不如一起把任务做完,这样的逻辑确实是合理的,但这样实际并行执行的线程就会多一个,这是要注意的地方。

    另一个要注意的是,目前的线程池实现中每一个线程启动之后就会开始从任务队列的开头取任务,但添加任务的时候也是添加到队列的开头的,所以虽然不严格保证顺序(因为你后续还可以继续添加任务),但任务队列大致上是后进先出的,所以不要连续不断地添加任务,至少要等线程池任务清空了才可以继续添加新的任务。(我觉得这样不太合理)



沪ICP备19023445号-2号
友情链接