2024-08-26

池化思想与C++线程池

醍醐灌顶全方位击破C++线程池及异步处理 - 知乎 (zhihu.com)

基本概念

池化思想,又称为资源池化,是一种在计算机科学和系统工程中广泛应用的设计理念。其核心思想是将资源统一管理和分配 ,以提高资源的使用效率,降低资源消耗,并增强系统的稳定性和响应速度。

池化思想通过将资源(如内存、数据库连接、线程等)预先分配并存储在一个资源池中,当系统需要这些资源时,直接从池中获取,而不是每次需要时都重新创建。这种机制不仅可以减少资源的创建和销毁开销,还能避免资源竞争和过度消耗。

资源池(Resource Pooling):

  • 池化思想的核心概念是资源池,它是一组可重复使用 的资源,如数据库连接、线程、对象实例、网络连接等。

  • 资源池中的资源可以被多个任务或线程共享,并且可以通过请求和释放的方式来管理。

例子 - 为何要用池?

先举一个简单的使用篮球 例子,我们有多种策略使用篮球,并且使用篮球之后会产生一定的代价,主观上认为我们倾向于将代价最小化。

策略1:(一次性使用)

这是一种比较笨的策略,每次都买一个新的篮球用于使用,使用之后丢掉。于是我们可以得到如下代价公式:

总代价=(买篮球代价+用篮球代价)∗使用次数

策略2:(重复使用)

在该策略中,认为篮球是可以多次使用的。于是我们可以得到如下公式::

总代价=买篮球代价∗篮球个数+重复代价∗使用次数+用篮球代价∗使用次数

策略的选择

上面列举了两种策略,事实上还有很多其他的策略。
两种策略本身是没有绝对的好坏的,而是视场景而定的。但就现实情况而言(比如,使用次数很多),篮球这个例子中符合如下的规律:

买篮球的代价∗篮球个数+复用的代价∗使用次数<买篮球的代价∗使用次数

这意味着,复用的总代价 小于 不复用的代价,即复用的策略更适合篮球这个例子

线程池的组成

  1. 线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池

  2. 工作线程:线程池中等待并执行分配的任务

  3. 任务接口:添加任务的接口,以提供工作线程调度任务的执行。

  4. 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面

对象池的优势

通过对象创建的例子,可看出对象创建是一个复杂的过程,少数的对象的创建并不会影响程序的太多的性能,但是如果达到了数以万计,就应该考虑复用同类对象的分配了。
【通俗理解】只是替换某个已存在对象的状态(填充原对象结构中的变量),复用已存在对象分配的内存 (节省了寻找空闲堆区等方面的时间)。

相当于给内存重新换了身衣服
这里认为,重新创建对象的代价 远远大于 更换已存在对象中相关的状态变量

线程池工作的四种情况

1.3.1 没有任务要执行,缓冲队列为空

1.3.2 队列中任务数量,小于等于线程池中线程任务数量

1.3.3 任务数量大于线程池数量,缓冲队列未满

1.3.4 任务数量大于线程池数量,缓冲队列已满

Thread Safe Queue Requirement

  • How many producer are there for this queue? How many threads will be “ pushing to it”?

  • Will there be many threads “ popping ” from the queue?

  • Do we always need the “pop” operation to return something? Can it block a thread?

  • Does the queue need to be atomic- no mutex locking allowed?

Simple C++ Implementation for Thread Safe Queue

#include <queue>

#include #include template class Queue_Safe { private: std::queue q; // Underlying queue to store elements std::condition_variable cv; // Condition variable for synchronization std::mutex mtx; // Mutex for exclusive access to the queue public: // Pushes an element onto the queue void push(T const& val) { std::lock_guardstd::mutex lock(mtx); q.push(val);
cv.notify_one(); // Notify one waiting thread that data is available } // Pops and returns the front element of the queue T pop() { std::unique_lockstd::mutex uLock(mtx);
cv.wait(uLock, [&] { return !q.empty(); }); // Wait until the queue is not empty T front = q.front();
q.pop();
return front;
}
};

We can pass arguments to the start-function when creating a new thread via std::thread(startFunction, args). Those arguments are passed by value from the thread creator function because the std::thread constructor copies or moves the creator's arguments before passing them to the start-function.

最简单的线程池的实现(基于c++11)

#ifndef THREAD_POOL_H

#define THREAD_POOL_H #include #include #include #include #include #include #include #include #include class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks;

// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;

};

// the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( [this] { for(;;) { std::function<void()> task; { std::unique_lockstd::mutex lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward(f), std::forward(args)...) );

std::future<return_type> res = task->get_future();
{
    std::unique_lock<std::mutex> lock(queue_mutex);
    // don't allow enqueueing after stopping the pool
    if(stop)
        throw std::runtime_error("enqueue on stopped ThreadPool");
    tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;

} // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lockstd::mutex lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } #endif