T O P

[资源分享]     简单的线程池

  • By - 楼主

  • 2021-11-25 18:00:47
  • 概要

    此线程池拥有一个被所有工作线程共享的任务队列。线程池用户提交的任务,被线程池保存在任务队列中,工作线程从任务队列中获取任务并执行。

    gist

    任务是可拥有返回值的、无参数的可调用(callable)对象,或者是经 std::bind 绑定了可调用对象及其参数后的调用包装器。具体而言可以是

    • 自由函数(也称为全局函数)
    • lambda
    • 函数对象(也称为函数符)
    • 类成员函数
    • 包装了上述类型的 std::function
    • bind 调用包装器

    该线程池异步地执行任务。当任务被提交进线程池后,用户不必等待任务执行和返回结果。

    实现

    以下代码给出了此线程池的实现。

    class Thread_Pool {
    
      private:
    
        struct Task_Wrapper { ...
    
        };
    
        atomic<bool> _done_;                    // #2
        Lockwise_Queue<Task_Wrapper> _queue_;        // #3
        unsigned _workersize_;
        thread* _workers_;                // #4
    
        void work() {
            while (!_done_.load(memory_order_acquire)) {
                Task_Wrapper task;
                if (_queue_.pop(task))
                    task();
                else
                    std::this_thread::yield();
            }
        }
    
      public:
        Thread_Pool() : _done_(false) {                // #1
            try {
                _workersize_ = thread::hardware_concurrency();   // #5
                _workers_ = new thread[_workersize_];
                for (unsigned i = 0; i < _workersize_; ++i) {
                    _workers_[i] = thread(&Thread_Pool::work, this);    // #6
                }
            } catch (...) {                                    // #7
                _done_.store(true, memory_order_release);
                for (unsigned i = 0; i < _workersize_; ++i) {
                    if (_workers_[i].joinable())
                        _workers_[i].join();
                }
                delete[] _workers_;
                throw;
            }
        }
        ~Thread_Pool() {
            _done_.store(true, memory_order_release);
            for (unsigned i = 0; i < _workersize_; ++i) {
                if (_workers_[i].joinable())
                    _workers_[i].join();
            }
            delete[] _workers_;
        }
    
        template<class Callable>
        future<typename std::result_of<Callable()>::type> submit(Callable c) {    // #8
            typedef typename std::result_of<Callable()>::type R;
            packaged_task<R()> task(c);
            future<R> r = task.get_future();
            _queue_.push(std::move(task));            // #9
            return r;                        // #10
        }
    
    };
    

    我们从构造 Thread_Pool 对象(#1)开始了解这个线程池。atomic<bool> 数据成员用于标志线程池是否结束,并强制同步内存顺序(#2);Task_Wrapper 具体化了线程安全的任务队列 Lockwise_Queue<>(#3);thread* 用于引用所有的工作线程对象(#4)。Task_Wrapper 和 Lockwise_Queue<> 稍后再做说明。

    线程池通过 thread::hardware_concurrency() 获取当前硬件支持的并发线程数量(#5),并依据此数量创建出工作线程。Thread_Pool 对象的成员函数 work() 作为所有工作线程的初始函数(#6),这使得线程池中的任务队列能被所有工作线程共享。创建 thread 对象和 new 操作可能失败并引发异常,因此用 try-catch 捕获潜在的异常。处理异常过程中,需要标志线程池结束,保证任何创建的线程都能正常的停止,并回收内存资源(#7)。线程池对象析构时的工作与此一致。

    Thread_Pool 对象构建完成后,任务通过 Thread_Pool::submit<>() 被提交进入线程池(#8)。为了支持任务的异步执行,任务先被封装在 std::packaged_task<> 中,再被放入线程安全的任务队列(#9)。任务执行结果被封装在返回的 std::future<> 对象中(#10),允许用户在未来需要结果时,等待任务结束并获取结果。


    因为每一个任务都是一个特定类型的 std::packaged_task<> 对象,为了实现任务队列的泛型化,需要设计一个通用的数据结构 Task_Wrapper,用于封装特定类型的 std::packaged_task<> 对象。

    struct Task_Wrapper {
    
        struct Task_Base {
            virtual ~Task_Base() {}
            virtual void call() = 0;
        };
        template<class T>
        struct Task : Task_Base {        // #5
            T _t_;
            Task(T&& t) : _t_(std::move(t)) {}        // #6
            void call() { _t_(); }            // #9
        };
    
        Task_Base* _ptr_;        // #7
    
        Task_Wrapper() : _ptr_(nullptr) {};
        template<class T>
        Task_Wrapper(T&& t) : _ptr_(new Task<T>(std::move(t))) {}        // #1
        // support move
        Task_Wrapper(Task_Wrapper&& other) {        // #2
            _ptr_ = other._ptr_;
            other._ptr_ = nullptr;
        }
        Task_Wrapper& operator=(Task_Wrapper&& other) {        // #3
            _ptr_ = other._ptr_;
            other._ptr_ = nullptr;
            return *this;
        }
        // no copy
        Task_Wrapper(Task_Wrapper&) = delete;
        Task_Wrapper& operator=(Task_Wrapper&) = delete;
        ~Task_Wrapper() {
            if (_ptr_) delete _ptr_;
        }
    
        void operator()() const {                                  // #4
            _ptr_->call();        // #8
        }
    
    };
    

    std::packaged_task<> 的实例只是可移动的,而不可复制。Task_Wrapper 必须能移动封装 std::packaged_task<R()> 对象(#1)。为了保持一致性,Task_Wrapper 也实现了移动构造(#2)和移动赋值(#3),同时实现了 operator()(#4)。ABC 的继承结构(#5)用于支持泛型化地封装和调用 std::packaged_task<> 对象。std::packaged_task<> 封装在派生类 Task<> 中(#6),由指向非泛型的抽象基类 Task_Base 的指针引用派生类对象(#7)。对 Task_Wrapper 对象的调用由虚调用(#8)委托给派生类并执行实际的任务(#9)。


    另一个关键的数据结构是线程安全的任务队列 Lockwise_Queue<>。

    template<class T>
    class Lockwise_Queue {
    
     private:
        struct Spinlock_Mutex {                        // #3
            atomic_flag _af_;
            Spinlock_Mutex() : _af_(false) {}
            void lock() {
                while (_af_.test_and_set(memory_order_acquire));
            }
            void unlock() {
                _af_.clear(memory_order_release);
            }
        } mutable _m_;                        // #2
        condition_variable _cv_;
        queue<T> _q_;                        // #1
    
     public:
        Lockwise_Queue() {}
    
        void push(const T& element) {
            lock_guard<Spinlock_Mutex> lk(_m_);
            _q_.push(std::move(element));
            _cv_.notify_one();
        }
    
        void push(T&& element) {                        // #4
            lock_guard<Spinlock_Mutex> lk(_m_);
            _q_.push(std::move(element));
            _cv_.notify_one();
        }
    
        bool pop(T& element) {                        // #5
            lock_guard<Spinlock_Mutex> lk(_m_);
            if (_q_.empty())
                return false;
            element = std::move(_q_.front());
            _q_.pop();
            return true;
        }
    
        bool empty() const {
            lock_guard<Spinlock_Mutex> lk(_m_);
            return _q_.empty();
        }
    
    };
    

    所有 Task_Wrapper 对象保存在 std::queue<> 中(#1)。互斥元和条件变量控制工作线程对任务队列的并发访问(#2)。为了提高并发程度,采用非阻塞自旋锁作为互斥元(#3)。任务的入队和出队操作,分别由支持移动语义的 push 函数(#4) 和 pop 函数(#5)完成。

    验证

    为了验证此线程池满足概要中描述的能力,设计了如下的各类可调用对象。

    void shoot() {
        std::printf("\n\t[Free Function] Let an arrow fly...\n");
    }
    
    
    bool shoot(long n) {
        std::printf("\n\t[Free Function] Let %ld arrows fly...\n", n);
        return false;
    }
    
    
    auto shootAnarrow = [] {
        std::printf("\n\t[Lambda] Let an arrow fly...\n");
    };
    
    
    auto shootNarrows = [](long n) -> bool {
        std::printf("\n\t[Lambda] Let %ld arrows fly...\n", n);
        return true;
    };
    
    
    class Archer {
    
      public:
        void operator()() {
            std::printf("\n\t[Functor] Let an arrow fly...\n");
        }
        bool operator()(long n) {
            std::printf("\n\t[Functor] Let %ld arrows fly...\n", n);
            return false;
        }
        void shoot() {
            std::printf("\n\t[Member Function] Let an arrow fly...\n");
        }
        bool shoot(long n) {
            std::printf("\n\t[Member Function] Let %ld arrows fly...\n", n);
            return true;
        }
    
    };
    

    对这些函数做好必要的参数封装,将其提交给线程池,

    atomic<bool> go(false);	
    time_point<steady_clock> start = steady_clock::now();
    minutes PERIOD(1);
    
    Thread_Pool pool;
    
    thread t1([&go, &pool, &PERIOD, start] {        // test free function of void() 
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        void (*task)() = shoot;
        for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) {
            pool.submit(task);
            //pool.submit(std::bind<void(*)()>(shoot));
            std::this_thread::yield();
        }
    });
    
    thread t2([&go, &pool, &PERIOD, start] {        // test free function of bool(long)
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        bool (*task)(long) = shoot;
        for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) {
            future<bool> r = pool.submit(std::bind(task, x));
            //future<bool> r = pool.submit(std::bind<bool(*)(long)>(shoot, x));
            std::this_thread::yield();
        }
    });
    
    thread t3([&go, &pool, &PERIOD, start] {        // test lambda of void()
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) {
            pool.submit(shootAnarrow);
            std::this_thread::yield();
        }
    });
    
    thread t4([&go, &pool, &PERIOD, start] {        // test lambda of bool(long)
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) {
            future<bool> r = pool.submit(std::bind(shootNarrows, x));
            std::this_thread::yield();
        }
    });
    
    thread t5([&go, &pool, &PERIOD, start] {        // test functor of void()
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        Archer hoyt;
        for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) {
            pool.submit(hoyt);
            std::this_thread::yield();
        }
    });
    
    thread t6([&go, &pool, &PERIOD, start] {        // test functor of bool(long)
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        Archer hoyt;
        for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) {
            future<bool> r = pool.submit(std::bind(hoyt, x));
            std::this_thread::yield();
        }
    });
    
    thread t7([&go, &pool, &PERIOD, start] {        // test member function of void()
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        Archer hoyt;
        for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) {
            pool.submit(std::bind<void(Archer::*)()>(&Archer::shoot, &hoyt));
            //pool.submit(std::bind(static_cast<void(Archer::*)()>(&Archer::shoot), &hoyt));
            std::this_thread::yield();
        }
    });
    
    thread t8([&go, &pool, &PERIOD, start] {        // test member function of bool(long)
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        Archer hoyt;
        for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) {
            future<bool> r = pool.submit(std::bind<bool(Archer::*)(long)>(&Archer::shoot, &hoyt, x));
            //future<bool> r = pool.submit(std::bind(static_cast<bool(Archer::*)(long)>(&Archer::shoot), &hoyt, x));
            std::this_thread::yield();
        }
    });
    
    thread t9([&go, &pool, &PERIOD, start] {        // test std::function<> of void()
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        std::function<void()> task = static_cast<void(*)()>(shoot);
        for (long x = 0; steady_clock::now() - start <= PERIOD; ++x) {
            pool.submit(task);
            std::this_thread::yield();
        }
    });
    
    thread t10([&go, &pool, &PERIOD, start] {        // test std::function<> of bool(long)
        while (!go.load(memory_order_acquire))
            std::this_thread::yield();
        std::function<bool(long)> task = static_cast<bool(*)(long)>(shoot);
        for (long x = 2; steady_clock::now() - start <= PERIOD; ++x) {
            future<bool> r = pool.submit(std::bind(task, x));
            std::this_thread::yield();
        }
    });
    

    编译代码 g++ -std=c++11 a_simple_thread_pool.cpp 成功后执行 ./a.out。以下是执行过程中的部分输出,

    ...
    
    [Functor] Let an arrow fly...
    
    [Free Function] Let 9224 arrows fly...
    
    [Free Function] Let 9445 arrows fly...
    
    [Member Function] Let 9375 arrows fly...
    
    [Lambda] Let 9449 arrows fly...
    
    [Free Function] Let an arrow fly...
    
    [Lambda] Let an arrow fly...
    
    [Member Function] Let an arrow fly...
    
    [Functor] Let 9469 arrows fly...
    
    ...
    

    最后

    完整示例请参考 [github] a_simple_thread_pool

    作者参考了 C++并发编程实战 / (美)威廉姆斯 (Williams, A.) 著; 周全等译. - 北京: 人民邮电出版社, 2015.6 (2016.4重印) 一书中的部分设计思路。借此机会对 Anthony Williams 及周全等译者表示感谢。

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册