模板
namespace lb {
/*
* Vector 创建一个自定义维度的 vector
* 参数的个数是维度
* 第 n 个参数是第 n 个维度的大小
* 例如创建一个三维 n, m, k 的 dp 数组:
* lb::Vector(n, m, k).vec;
*/
template<class...Args>
struct Vector;
template<>
struct Vector<> {
using type = int;
type vec = 0;
};
template<class T, class...Args>
struct Vector<T, Args...> {
using type = std::vector<typename Vector<Args...>::type>;
template<class U, class...Rest>
constexpr explicit Vector(U && u, Rest && ...rest) :
vec(std::forward<U>(u), Vector<Args...>(std::forward<Rest>(rest)...).vec) {}
type vec;
};
template<class T, class...Args>
Vector(T, Args...) -> Vector<T, Args...>;
}
简单版 $shared\_ptr$
#include <iostream>
#include <atomic>
template<class T>
struct ControlBlock {
T *ptr;
std::atomic<int> ref_count;
ControlBlock(T *p) : ptr(p), ref_count(1) {}
~ControlBlock() {
delete ptr; // 销毁对象
}
};
template<class T>
class Shared_ptr {
public:
explicit Shared_ptr(T *ptr = nullptr) {
if (ptr) {
controlBlock = new ControlBlock<T>(ptr);
} else {
controlBlock = nullptr;
}
}
Shared_ptr(const Shared_ptr &other) {
controlBlock = other.controlBlock;
if (controlBlock) {
controlBlock->ref_count++; // 增加引用计数
}
}
Shared_ptr& operator=(const Shared_ptr &other) {
if (this != &other) {
if (controlBlock && --controlBlock->ref_count == 0) {
delete controlBlock;
}
controlBlock = other.controlBlock;
if (controlBlock) {
controlBlock->ref_count++; // 增加引用计数
}
}
return *this;
}
~Shared_ptr() {
if (controlBlock && --controlBlock->ref_count == 0) {
delete controlBlock; // 引用计数为 0 时销毁对象
}
}
void reset() {
if (controlBlock && --controlBlock->ref_count == 0) {
delete controlBlock;
}
controlBlock = nullptr;
}
T* operator->() const {
return controlBlock ? controlBlock->ptr : nullptr;
}
T& operator*() const {
return *controlBlock->ptr;
}
int use_count() const {
return controlBlock ? controlBlock->ref_count.load() : 0;
}
private:
ControlBlock<T> *controlBlock;
};
class A {
public:
A() : x(100) {}
void print() {
std::cout << x << "\n";
}
int x;
};
int main() {
Shared_ptr<A> ptr(new A());
ptr->print(); // 调用 print 方法
return 0;
}
条件变量
- 严格来说,条件变量只能阻塞线程,并不能保证线程同步,需要配合互斥锁使用
-
cond
里面记录了哪些被阻塞的线程,当唤醒的时候可以基于cond
里面记录的信息来唤醒线程 -
生产者
while (true) {
// 临界资源,
mutex.lock();
q.push();
mutex.unlock();
// 唤醒cond中记录的被阻塞的线程
cond.broadcast();
}
- 消费者
/* 生成队列为空,一直阻塞。
使用while的原因:
生产者线程可能只生产了一个资源,但cond会唤醒多个消费线程,当前消费线程就可能没抢到资源,只能继续阻塞
*/
while (true) {
// 临界资源,
mutex.lock();
/*
生成队列为空,一直阻塞。
生产者线程可能只生产了一个资源,但cond会唤醒多个消费线程,当前消费线程就可能没抢到资源,只能继续阻塞
*/
while (q.empty()) {
// 被条件变量在此处阻塞线程,被唤醒后在此处继续执行,可能会没抢到资源,下一轮循环继续阻塞
cond.wait(mutex); // 如果mutex上锁,会自动解锁,让生产者可以生产,回到该线程后又会重新上锁
}
/*
两种情况:
1.第一个被唤醒的消费线程还没取走资源,但是重新上了锁,其他被唤醒消费线程因为没有锁的使用权继续阻塞
2.生产者唤醒多个线程,第一个被唤醒消费线程取走了资源,其他被唤醒的消费线程因为没有资源while继续阻塞
*/
// 消费
q.pop();
mutex.unlock();
}
生产者消费者模型
#include <mutex>
#include <vector>
#include <thread>
#include <iostream>
#include <condition_variable>
class TaskQueue {
public:
TaskQueue(int _size) : size(_size) {}
void push(const int& task) {
{
std::unique_lock<std::mutex> locker(_mutex);
// while (taskQueue.size() == this->size) {
// notFull.wait(locker);
// }
notFull.wait(locker, [&] {
return taskQueue.size() != this->size;
});
taskQueue.emplace_back(task);
}
notEmpty.notify_one();
}
void pop() {
{
std::unique_lock<std::mutex> locker(_mutex);
// while (taskQueue.empty()) {
// notEmpty.wait(locker);
// }
notEmpty.wait(locker, [&]() {
return !taskQueue.empty();
});
int t = taskQueue.back();
taskQueue.pop_back();
std::cout << "pop: " << t << "\n";
}
notFull.notify_one();
}
bool empty() {
std::unique_lock<std::mutex> locker(_mutex);
return taskQueue.empty();
}
bool full() {
std::unique_lock<std::mutex> locker(_mutex);
return taskQueue.size() == this->size;
}
private:
int size;
std::mutex _mutex;
std::vector<int> taskQueue;
std::condition_variable notFull, notEmpty;
};
int main() {
TaskQueue tq(100);
constexpr int N = 10;
std::thread t1[N], t2[N];
for (int i = 0; i < N; i ++) {
t1[i] = std::thread(&TaskQueue::push, &tq, i + 1);
t2[i] = std::thread(&TaskQueue::pop, &tq);
}
for (int i = 0; i < N; i ++) {
t1[i].join();
t2[i].join();
}
}
信号量
-
生产者 $x$ 个,消费者 $y$ 个。则存放资源空闲数量最多有 $x$ 个。生产者想生产,则必须保证有空闲位置,否则阻塞。消费者想消费,则得保证有资源,否则阻塞。
-
生产者的信号量 $sem\_p$。消费者的信号量 $sem\_c$
-
wait
相当于让信号量数量减 $1$,pos
加 $1$ -
生产者
while (true) {
sem_p.wait(); // 没有空闲位置则阻塞
mutex.lock();
//生产
mutex.unlock();
sem_c.post(); // 生产完有资源了,通知消费者消费。
}
- 消费者
while (true) {
sem_c.wait(); // 没有资源则阻塞
mutex.lock();
// 消费
mutex.unlock();
sem_p.post(); // 消费完有空闲位置了,通知生产者生产
}
$C++11$ 原子变量 $atomic$
- 如一个加法操作在中最少对应三条指令,从磁盘将数据拷贝到$cpu$,在$cpu$计算,将结果拷回到磁盘。
- 在多线程中,如果在执行第二条指令时,线程的时间片用完了,切换到另一个线程,又对该数值进行加法操作,而第一次计算的结果还没拷回来,所以用的旧数据加法,那么两次加法相当于只加了一次,数值混乱。
- 为了不产生这种错误,要求这三步操作时不可被中断,不可被拆分。只有三步做完以后才可以失去时间片。
- 这种不可被中断的操作称之为
原子操作
。 C++11
提供的原子类型std::atomic<T>
- 只支持
int char bool long 指针
,不支持复合类型和浮点类型
设计一个线程池
- 管理者线程 -> $1$ 个子线程
- 调节工作线程的数量
- 当工作的线程数量比较多,并且任务队列中没有任务,工作线程处理阻塞状态,管理者进行削减
- 任务多,工作线程少则适当添加
- 若干工作线程 -> $n$ 个子线程
- 从任务队列取任务完成任务
- 任务队列为空需要阻塞(
条件变量
) - 线程同步(
互斥锁
) - 当前工作线程数量,空闲的数量
- 【线程的最小数量,最大数量】
- 任务队列 ->
queue
- 互斥锁
- 条件变量
- 线程池开关 ——> bool