2024-08-26
池化思想与C++线程池
醍醐灌顶全方位击破C++线程池及异步处理 - 知乎 (zhihu.com)
基本概念
池化思想,又称为资源池化,是一种在计算机科学和系统工程中广泛应用的设计理念。其核心思想是将资源统一管理和分配 ,以提高资源的使用效率,降低资源消耗,并增强系统的稳定性和响应速度。
池化思想通过将资源(如内存、数据库连接、线程等)预先分配并存储在一个资源池中,当系统需要这些资源时,直接从池中获取,而不是每次需要时都重新创建。这种机制不仅可以减少资源的创建和销毁开销,还能避免资源竞争和过度消耗。
资源池(Resource Pooling):
池化思想的核心概念是资源池,它是一组可重复使用 的资源,如数据库连接、线程、对象实例、网络连接等。
资源池中的资源可以被多个任务或线程共享,并且可以通过请求和释放的方式来管理。
例子 - 为何要用池?
先举一个简单的使用篮球 例子,我们有多种策略使用篮球,并且使用篮球之后会产生一定的代价,主观上认为我们倾向于将代价最小化。
策略1:(一次性使用)
这是一种比较笨的策略,每次都买一个新的篮球用于使用,使用之后丢掉。于是我们可以得到如下代价公式:
总代价=(买篮球代价+用篮球代价)∗使用次数
策略2:(重复使用)
在该策略中,认为篮球是可以多次使用的。于是我们可以得到如下公式::
总代价=买篮球代价∗篮球个数+重复代价∗使用次数+用篮球代价∗使用次数
策略的选择
上面列举了两种策略,事实上还有很多其他的策略。
两种策略本身是没有绝对的好坏的,而是视场景而定的。但就现实情况而言(比如,使用次数很多),篮球这个例子中符合如下的规律:
买篮球的代价∗篮球个数+复用的代价∗使用次数<买篮球的代价∗使用次数
这意味着,复用的总代价 小于 不复用的代价,即复用的策略更适合篮球这个例子 。
线程池的组成
线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池
工作线程:线程池中等待并执行分配的任务
任务接口:添加任务的接口,以提供工作线程调度任务的执行。
任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面
对象池的优势
通过对象创建的例子,可看出对象创建是一个复杂的过程,少数的对象的创建并不会影响程序的太多的性能,但是如果达到了数以万计,就应该考虑复用同类对象的分配了。
【通俗理解】只是替换某个已存在对象的状态(填充原对象结构中的变量),复用已存在对象分配的内存 (节省了寻找空闲堆区等方面的时间)。
(相当于给内存重新换了身衣服 )
这里认为,重新创建对象的代价 远远大于 更换已存在对象中相关的状态变量 。
线程池工作的四种情况
1.3.1 没有任务要执行,缓冲队列为空
1.3.2 队列中任务数量,小于等于线程池中线程任务数量
1.3.3 任务数量大于线程池数量,缓冲队列未满
1.3.4 任务数量大于线程池数量,缓冲队列已满
Thread Safe Queue Requirement
How many producer are there for this queue? How many threads will be “ pushing to it”?
Will there be many threads “ popping ” from the queue?
Do we always need the “pop” operation to return something? Can it block a thread?
Does the queue need to be atomic- no mutex locking allowed?
Simple C++ Implementation for Thread Safe Queue
#include <queue>
#include
cv.notify_one(); // Notify one waiting thread that data is available
}
// Pops and returns the front element of the queue
T pop() {
std::unique_lockstd::mutex uLock(mtx);
cv.wait(uLock, [&] { return !q.empty(); }); // Wait until the queue is not empty
T front = q.front();
q.pop();
return front;
}
};
We can pass arguments to the start-function when creating a new thread via std::thread(startFunction, args). Those arguments are passed by value from the thread creator function because the std::thread constructor copies or moves the creator's arguments before passing them to the start-function.
最简单的线程池的实现(基于c++11)
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lockstd::mutex lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
} // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lockstd::mutex lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } #endif



