文章

C++实现线程池

线程池

C++实现线程池

线程池

线程池(Thread Pool) 用于管理多个线程,这些线程 并发 或 并行执行。

线程池实际上是在内部构建了一个生产者-消费者模型(一个非常经典的多线程并发协作的模型)。 生产者产生任务并插入任务队列,消费者消耗任务(从任务队列取走任务)并完成。

基础版本

ThreadPool封装了 线程数组、任务队列、任务入队列函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>


class ThreadPool
{
private:
	unsigned int m_threads; // number of threads
	std::vector<std::thread> m_threadPool; // thread array
	
	std::queue<std::function<void()>> m_tasks; // task queue
	std::mutex m_mutex;
	std::condition_variable m_condition;

	bool m_stop; // stop flag for thread

public:
    ThreadPool(unsigned int thread_num)
    : m_threads(thread_num), m_stop(false)
    {
        unsigned int threads = std::thread::hardware_concurrency(); // get the number of CPU cores
        if (m_threads > threads && threads != 0) {
            m_threads = threads;  // 防止线程数超过CPU核心数
        }  // 根据具体情况,当前行及以上三行(共四行代码)可以保留,也可 删除

        for (int i = 0; i < m_threads; i++)
        {
            m_threadPool.emplace_back([this]()
            {
                while (true)
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(m_mutex);
                        m_condition.wait(lock, [this]()
                            {
                                return m_stop || !m_tasks.empty();
                            });
                        if (m_stop && m_tasks.empty())
                            return;
                        task = std::move(m_tasks.front());
                        m_tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_stop = true;
        }
        m_condition.notify_all();
        for (auto& thread : m_threadPool)
        {
            thread.join();
        }
    }

    // add task to the queue
    template <typename F, typename... Args>
    void enqueue(F&& f, Args&&... args)
    {
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_tasks.emplace(task);
        }
    }
};

测试上面的线程池ThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <iostream>
#include <string>

int main() 
{
    ThreadPool t_pool(4);
    for (int i = 0; i < 8; i++) {
        t_pool.enqueue([](int i, int j) {
            std::string ss = std::string("[task: ") + std::to_string(i) + " start...] ";
            std::cout << ss << '\n';
            std::this_thread::sleep_for(std::chrono::seconds(2));
            std::string sss = std::string("[task: ") + std::to_string(i) + " end] ";
            std::cout << sss << "\n";
        }, i, i + 1);
    }
    return 0;
}

单例版本

ThreadPool改造为单例模式

1
本文由作者按照 CC BY 4.0 进行授权