说明:本线程池项目旨在学习c++多线程编程,为后续其他项目打基础。
github:NieQuan-kzzx/ThreadPool: c++手写线程池
项目描述:
- 基于可变参模板编程和引用折叠原理,实现线程池submitTask接口,支持任意任务函数和任意参数的传递
- 使用future类型定制submitTask提交任务的返回值
- 使用map和queue容器管理线程对象和任务
- 基于条件变量condition_variable和互斥锁mutex实现任务提交线程和任务执行线程间的通信机制
- 支持fixed和cached模式的线程池定制
些许基本概念:
并发:单核上,多个线程占用不同的CPU时间片,物理上还是串行执行的,但是由于每个线程占用的CPU时间片的时间非常短,看起来就像是多个线程都在共同执行一样,这样的场景称作并发。
并行:在多核或者多CPU上,多个线程是在真正的同时执行,这样的场景称作并行。
IO密集型:指的是程序里面指令的执行,涉及一些IO操作,比如设备、文件、网络操作(等待客户端的连接),对于IO密集型的程序是更加适合设计成多线程模式的。
CPU密集型:指的是程序里面的指令都是做计算用的,对于CPU密集型的程序来说,多核的情况下设计成多线程程序好,但是对于单核来说,设计成多线程程序就会有额外的花费,主要是在线程的调度上。
线程的上下文切换:当前线程调度完成,调度下一个线程。
线程的消耗
为了完成任务,创建很多的线程可以吗?线程真的越多越好吗?
- 线程的创建和销毁都是非常“重”的操作
- 线程栈本身占用大量内存
- 线程的上下文切换要占用大量时间
- 大量线程同时唤醒会使系统经常出现锯齿状负载或者瞬间负载量很大导致宕机
创建多少个线程好:IO复用 + 多线程 (有几个线程呢? 一般都是按照当前CPU的核心数量来确定的)
线程池
线程池的优势就是,在服务进程启动之初,就事先创建好线程池里面的线程,当业务流量到来时需要分配线程,直接从线程池中获取一个空闲线程执行task任务即可,task执行完成后,也不用释放线程,而是把线程归还到线程池中继续给后续的task提供服务。
本项目设计的线程池一共有两种模式:fixed模式和cached模式
fixed模式线程池:线程池里面的线程个数是固定不变的,一般情况下,线程池在创建的时候会根据当前机器的CPU核心数来确定创建的线程个数。
cached模式线程池:线程池里面的线程个数是可以动态增长的,根据任务的数量动态的增加线程的数量,通常在设计之初会设定线程数量的上限阈值,新创建的线程在处理完任务之后,在一定的时间里,如果没有再次调用便会关闭,从而保持线程池中的线程数量保持在最初数量。
线程同步
线程互斥:互斥锁mutex和atomic原子类型
线程通信:
- 条件变量 condition_variable
- 它允许一个线程在某个特定条件尚未满足时“挂起”,直到另一个线程把这个条件改变并“唤醒”它。
- 条件变量的核心要素:
- wait 等待:
- 线程先获取互斥锁
- 检查条件
- 如果条件不满足,调用wait,关键点:wait会自动做两件事:释放锁 + 让线程进入休眠
- signal / notify 通知
- 另一个线程改变了条件
- 调用notify,唤醒一个正在等待该条件的线程
- broadcast / notify all 广播
- 唤醒所有正在等待该条件的线程
- wait 等待:
- unique_lock, condition_variable
- 在c++中,最简单的锁管理工具是lock_guard,他利用RAII机制,构造时加锁,析构时解锁,但是有局限性
- 太死板:一旦加锁,直到作用域结束才能解锁,你不能在中途手动释放,也不能稍后再重新上锁
- 功能单一:它不支持配合条件变量
- unique_lock的核心超能力
- 随时手动控制(灵活开关),可以根据需要再代码中间手动调用.lock()或unlock()。
- 延迟加锁,可以先创建对象,但不立即加锁,等后面合适的时机再加锁。、
- 在c++中,最简单的锁管理工具是lock_guard,他利用RAII机制,构造时加锁,析构时解锁,但是有局限性
- 线程抢占锁都是随机,但是当加入条件变量之后可以通过等待和通知等操作,可以精确控制线程操作,释放锁…
- 互斥锁+条件变量,可以实现对线程通信的精确控制
- 生产者消费者模型,先生产后消费
- 信号量 semaphore
- 信号量的操作通常被称为P操作和V操作
- P操作(wait)尝试获取一个资源,如果计数器>0,则计数器减1,继续执行。如果计数器=0,则线程阻塞(挂起),直到资源可用
- V操作(signal)释放一个资源,计数器加1,如果有线程在排队等待,唤醒其中一个
- P和V操作本身都是原子的,你不需要担心多个线程同时去修改计数器而产生错乱
- 信号量的操作通常被称为P操作和V操作
线程池threadpool.h
# ifndef THREADPOOL_H
# define THREADPOOL_H
#include <iostream>
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <unordered_map>
#include <future>
#include <thread>
const int TASK_MAX_THRESHHOLE = INT32_MAX; // 任务队列上限
const int THREAD_MAX_THRESHHOLD = 1024; // 线程上限数量
const int THREAD_MAX_IDLE_TIME = 60; // 单位:秒
// 线程池支持的模式
enum class PoolMode
{
MODE_FIXED, // 固定数量的线程
MODE_CACHED, // 变化数量的线程
};
// 线程类型
class Thread
{
public:
// 线程函数对象类型
using ThreadFunc = std::function<void(int)>;
// 线程构造
Thread(ThreadFunc func)
: func_(func)
, threadId_(generateId_++)
{}
// 线程析构
~Thread() = default;
// 启动线程
void start()
{
// 创建一个线程来执行一个线程函数
std::thread t(func_, threadId_);
t.detach(); // 设置分离线程
}
// 获取线程ID
int getId()const
{
return threadId_;
}
private:
ThreadFunc func_;
static int generateId_;
int threadId_; // 保存线程id
};
int Thread::generateId_ = 0;
// 线程池类型
class ThreadPool
{
public:
// 线程池构造函数
// 线程池构造函数
ThreadPool()
: initThreadSize_(4)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, taskQueMaxThreshHold_(TASK_MAX_THRESHHOLE)
, threadSizeThreshHold_(THREAD_MAX_THRESHHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false)
{}
// 线程池析构函数
~ThreadPool()
{
isPoolRunning_ = false;
// 等待线程池里面所有的线程返回 有两种状态:1. 阻塞;2. 正在执行任务中
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all();
exitCond_.wait(lock, [&]()->bool {return threads_.size() == 0; });
}
// 设置线程池的工作模式
void setMode(PoolMode mode)
{
if (checkRunningState())
return;
poolMode_ = mode;
}
// 设置task任务队列上限阈值
void setTakQueMaxThreshHold(int threshhold)
{
if (checkRunningState())
return;
taskQueMaxThreshHold_ = threshhold;
}
// 设置线程池cached模式下线程阈值
void setThreadSizeThreshHold(int threshhold)
{
if (checkRunningState())
return;
if (poolMode_ == PoolMode::MODE_CACHED)
{
threadSizeThreshHold_ = threshhold;
}
}
// 给线程池提交任务
// 使用可变参模板编程,让submitTask可以接收任意任务函数和任意数量的参数
// 返回值future<>
template<typename Func, typename... Args>
auto submitTask(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>
{
// 打包任务,放入任务队列
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();
// 获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
if (!notFull_.wait_for(lock, std::chrono::seconds(1),
[&]()->bool { return taskQue_.size() < (size_t)taskQueMaxThreshHold_; }))
{
// 表示notFull_等待1s,条件依然没有满足
std::cerr << "task queue is full, submit task fail." << std::endl;
auto task = std::make_shared<std::packaged_task<RType()>>(
[]()->RType { return RType(); });
//std::async(std::launch::async, []() -> ReturnType { return ReturnType(); });
(*task)();
return task->get_future(); //
}
// 如果有空余,把任务放入任务队列中
//taskQue_.emplace(sp);
// using Task = std::function<void()>;
taskQue_.emplace([task](){(*task)();});
taskSize_++;
// 因为新放了任务,任务队列肯定不空,notEmpty_通知,赶快分配线程执行任务
notEmpty_.notify_all();
// chched模式 任务处理比较紧急 场景:小而块的任务,需要根据任务数量和空闲线程的数量,判断是否需要创建新的线程出来
if (poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& initThreadSize_ < threadSizeThreshHold_)
{
std::cout << "create new thread..." << std::endl;
// 创建新线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// 启动线程
threads_[threadId]->start();
// 修改线程个数相关变量
curThreadSize_++;
idleThreadSize_++;
}
// 返回任务的Result对象
return result;
}
// 开启线程池
void start(int initThreadSize = std::thread::hardware_concurrency())
{
// 设置线程池的运行状态
isPoolRunning_ = true;
// 记录初始线程个数
initThreadSize_ = initThreadSize;
curThreadSize_ = initThreadSize;
// 创建线程对象
for (int i = 0; i < initThreadSize_; i++)
{
// 创建thread线程对象的时候,把线程函数给到thread线程对象
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
int threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr));
// 说明:emplace_back是vector中一个非常高效的成员函数,用于在容器末尾添加元素
// 虽然它和push_back的功能看起来是一样的,但是其底层运作机制有显著的区别,简单来说,emplace_back的核心优势是:原地构造
}
// 启动所有线程 std::vector<Thread*> threads_;
for (int i = 0; i < initThreadSize_; i++)
{
threads_[i]->start(); // 需要去执行一个线程函数
idleThreadSize_++; // 记录初始空闲线程的数量
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
private:
// 定义线程函数
void threadFunc(int threadid)
{
auto lastTime = std::chrono::high_resolution_clock().now();
// 所有任务必须执行完成,线程池才可以回收所有线程资源
for (;;)
{
Task task;
{
// 先获取锁,智能指针出了作用域就会将锁释放掉
std::unique_lock<std::mutex> lock(taskQueMtx_);
std::cout << "tid: " << std::this_thread::get_id()
<< "尝试获取任务..." << std::endl;
// cached模式下,有可能已经创建了很多线程,但是空闲时间超过60s,应该把多余的线程给回收
// 结束回收掉(超过initThreadSize_数量的线程要进行回收)
// 当前时间 - 上一次线程执行时间 > 60s
// 每一秒钟返回一次 怎么区分超时返回?还是有任务待执行返回
// 锁 + 双重判断
while (taskQue_.size() == 0)
{
// 线程池要结束,回收线程资源
if (!isPoolRunning_)
{
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id() << "exit!"
<< std::endl;
exitCond_.notify_all();
return;
}
if (poolMode_ == PoolMode::MODE_CACHED)
{
// 条件变量,超时返回
if (std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1)))
{
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if (dur.count() >= THREAD_MAX_IDLE_TIME
&& curThreadSize_ > initThreadSize_)
{
// 开始回收当前线程
// 记录线程数量的相关变量的值修改
// 把线程对象从线程列表容器中删除 没有办法 threadFunc 匹配 哪一个thread对象
// threadid -> thread对象 -> 删除
threads_.erase(threadid);
curThreadSize_--;
idleThreadSize_--;
std::cout << "threadid:" << std::this_thread::get_id() << "exit!"
<< std::endl;
return;
}
}
}
else
{
// 等待notEmpty任务
notEmpty_.wait(lock);
}
}
idleThreadSize_--;
std::cout << "tid: " << std::this_thread::get_id()
<< "获取任务成功..." << std::endl;
// 从任务队列中取一个任务出来
task = taskQue_.front();
taskQue_.pop();
taskSize_--;
// 如果依然有剩余任务,继续通知其他线程执行任务
if (taskQue_.size() > 0)
{
notEmpty_.notify_all();
}
// 执行完一个任务,进行通知,通知可以提交生产任务
notFull_.notify_all();
} // 及时将锁释放掉
// 当前线程负责执行这个任务
if (task != nullptr)
{
task();
}
idleThreadSize_++;
lastTime = std::chrono::high_resolution_clock().now();// 更新线程执行完任务时间
}
// 释放资源
threads_.erase(threadid);
std::cout << "threadid:" << std::this_thread::get_id() << "exit!"
<< std::endl;
exitCond_.notify_all();
}
// 检查pool的运行状态
bool checkRunningState() const
{
return isPoolRunning_;
}
private:
// std::vector<std::unique_ptr<Thread>> threads_; // 线程列表
std::unordered_map<int, std::unique_ptr<Thread>> threads_; // 线程列表
int initThreadSize_; // 初始线程数量
int threadSizeThreshHold_;// 线程数量上限阈值
std::atomic_int curThreadSize_;// 记录当前线程池里面线程的总数量
std::atomic_int idleThreadSize_; // 记录空闲线程的数量
// Task任务 -> 函数对象
using Task = std::function<void()>;
std::queue <Task> taskQue_; // 任务队列
std::atomic_int taskSize_; // 任务数量
int taskQueMaxThreshHold_; // 任务队列数量上限阈值f
std::mutex taskQueMtx_; // 保证任务队列的线程安全
std::condition_variable notFull_; // 表示任务队列不满
std::condition_variable notEmpty_; // 表示任务队列不空
std::condition_variable exitCond_; // 等到线程资源全部回收
PoolMode poolMode_; // 当前线程池的工作模式
std::atomic_bool isPoolRunning_; // 表示当前线程池的启动状态
};
# endif
线程池使用说明:
- 实例化线程对象,ThreadPool pool;
- 设置线程池模式,pool.setMode(PoolMode::MODE_CACHED); 默认是fixed模式,如若要设置成cached模式,需手动设置
- 启动线程池,pool.start(4);启动线程池的时候可以同步设置创建线程的数量
- 待处理任务,int sum(int a,int b){return a+b;} void print(){std::cout<<“Hello World” << std::endl;}
- 向线程池提交任务,future<int> f1 = pool.submitTask(sum,10,20); || future<void> f2 = pool.submitTask(print);
注意:在linux环境下运行需要做个小修改
// Thread类中修改
void start() {
// 创建线程执行 func_,不再 detach
t_ = std::thread(func_, threadId_);
}
// 增加一个成员变量保存线程对象
private:
std::thread t_;










