线程池C++98
作者:
半醒的狐狸
,
2023-03-19 02:59:05
,
所有人可见
,
阅读 45
/*
线程池 pthread_create
pthread_cond_wait // 通过条件变量让线程睡眠,等待任务到来
pthread_cond_signal // 唤醒线程
线程创建 互斥锁 条件变量 消息队列
同一时刻只能由一个人来操作从队头取出,所以要加互斥锁;
而使用条件变量实现休眠/唤醒线程
*/
#include <iostream>
#include <queue>
#include <pthread.h>
#include <condition_variable>
#include <cstring>
using namespace std;
struct Task {
String name;
String type;
}
// 消息队列类
class TaskQueue
{
public:
TaskQueue() {
pthread_mutex_init(&m, NULL);
}
~TaskQueue() {
pthread_mutex_destory(&m);
}
// 添加任务
void addTask(Task task) {
pthread_mutex_lock(&m);
mq.push(task);
pthread_mutex_unlock(&m);
}
// 取出任务
Task getTask() {
Task t;
if (mq.size()) {
pthread_mutex_lock(&m);
t = mq.front();
mq.pop();
pthread_mutex_unlock(&m);
}
return t;
}
// 查询当前任务个数
int getTaskNums() {
return mq.size();
}
private:
queue<Task> mq;
pthread_mutex_t m;
condition_variable cv;
};
// 线程池结构体: 分为两个类,一个线程相关属性类,一个任务队列类
struct ThreadPool
{
public:
// 创建线程池并初始化
ThreadPool(int min, int max) {
// 实例化任务队列和IDs
taskQ = new TaskQueue;
// 初始化线程池参数
do {
threadIDs = new pthread_t[max];
if (threadIDs == nullptr) {
cout << "malloc threadIDs fail..." << endl;
break;
}
memset(threadIDs, 0, sizeof(pthread_t) * max); // 申请的内存初始化为0
minNum = min;
maxNum = max;
busyNum = 0;
liveNum = min;
exitNum = 0;
// 互斥锁和条件变量初始化判定
if (pthread_mutex_init(&mPool) != 0 || pthread_cond_init(¬Empty) != 0) {
cout << "mutex or condition init fail..." << endl;
break;
}
shutdown = false;
//创建线程:管理线程,和最小存活线程
pthread_create(&managerID, NULL, manager, this);
for (int i = 0; i < min; i ++ ) {
pthread_create(&threadIDs[i], NULL, worker, this);
}
return ;
} while (0);
// 释放资源
if (threadIDs) delete[] threadIDs;
if (taskQ) delete taskQ;
}
// 给线程池添加任务(就是往消息队列里面加任务)
void addTask(Task task) {
if (shutdown) return ;
taskQ->addTask(task);
pthread_cond_signal(¬Empty);
}
// 获得当前池中工作线程数
int getBusyNums() {
pthread_mutex_lock(&mPool);
int BusyNums = this->busyNum;
pthread_mutex_unlock(&mPool);
return BusyNums;
}
// 获得当前池中存活线程数
int getLiveNums() {
pthread_mutex_lock(&mPool);
int AliveNums = this->liveNum;
pthread_mutex_unlock(&mPool);
return AliveNums;
}
// 销毁线程池
~ThreadPool() {
shutdown = true;
pthread_join(managerID, NULL);
for (int i = 0; i < liveNum; i ++ ) {
pthread_cond_signal(¬Empty);
}
if (taskQ) delete taskQ;
if (threadIDs) delete[] threadIDs;
pthread_mutex_destory(&mPool);
pthread_cond_destory(¬Empty);
}
private:
// 工作线程(消费者线程)任务函数
static void* worker(void* arg);
// 管理者线程数
static void* manager(void* arg);
// 单线程退出
void threadExit() {
pthread_t tid = pthread_self();
for (int i = 0; i < maxNum; i ++ ) {
if (threadIDs[i] == tid) {
threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
private:
TaskQueue *taskQ; // 实例化消息队列
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作线程IDs
int minNum; // 最小线程数
int maxNum; // 最大线程数
int liveNum; // 存活线程数
int busyNum; // 工作线程数
int exitNum; // 准备销毁的线程数
pthread_mutex_t mPool; // 锁整个线程池
pthread_cond_t notEmpty; // 消息队列是否为空
bool shutdown; // 是否关闭线程池
};
int main()
{
return 0;
}