C++ 语言实现动态变化的线程池
线程池
Job
Job 作为任务的类型
1 |
|
其中定义两个变量,data
,和 func
。
func
用来保存需要调用的方法,当执行任务时,调用此函数即可。考虑到需要传递参数的可能,所以定义参数为一个指针,而另一个变量 data 则为需要传递给 func 的参数指针
函数的实现为
1 |
|
线程池核心代码
1 |
|
- 首先通过 init 函数初始化核心线程数
- 通过 submit 的函数提交任务,必须是一个 job 指针,且必须是单独 new 出来的,线程池会自动清理已经完成的任务
- 可以随时通过 getAccumulation 来获取到当前堆积的任务,使得可以手动调整线程池数量
- 使用 upateCore 来调整核心线程数量
建议通过 wait 来实现终止线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98ThreadPool::ThreadPool(int core) : maxCore(core), killed(false), needKill(0) {
for (int i = 0; i < core; ++i) addThread();
}
Job *ThreadPool::takeJob() {
Job *cur = nullptr;
enqueue.run([&](queue<Job *> &q) {
if (q.empty()) return;
cur = q.front();
q.pop();
});
return cur;
}
void ThreadPool::addThread() {
auto work = [&]() {
while (true) {
Job *cur = takeJob();
if (cur != nullptr) {
cur->exec();
delete cur;
} else {
bool dead = false;
needKill.run([&](int &count) {
if (count <= 0) return;
dead = true;
count--;
});
if (dead) break;
clean();
unique_lock<mutex> lk(noTaskCvMutex);
noTaskCv.wait(lk);
}
}
threadPool.run([&](map<pthread_t, thread *> &data) {
auto iter = data.find(pthread_self());
deathThread.run([&](vector<thread *> &data) {
data.push_back(iter->second);
});
data.erase(iter);
});
};
auto *newThread = new thread(work);
threadPool.run([&](map<pthread_t, thread *> &data) {
data.insert({newThread->native_handle(), newThread});
});
}
void ThreadPool::clean() {
if (deathThread.get().empty()) return;
deathThread.run([&](vector<thread *> &data) {
for (auto &item: data) delete item;
});
}
void ThreadPool::submit(Job *job) {
if (killed) return;
enqueue.run([&](queue<Job *> &q) {
q.push(job);
noTaskCv.notify_one();
});
}
int ThreadPool::getAccumulation() {
return (int) enqueue.get().size();
}
void ThreadPool::updateCore(int newCount) {
if (killed) return;
needKill.run([&](int &cleaned) {
if (newCount > this->maxCore)
for (int i = this->maxCore; i < newCount; ++i)
addThread();
else {
cleaned += this->maxCore - newCount;
noTaskCv.notify_all();
}
});
}
void ThreadPool::wait() {
updateCore(0);
killed = true;
map<pthread_t, thread *> tmp = threadPool.get();
for (auto &item: tmp)
item.second->join();
}
void ThreadPool::close() {
killed = true;
map<pthread_t, thread *> tmp = threadPool.get();
for (auto &item: tmp) {
pthread_kill(item.first, SIGKILL);
delete item.second;
}
}
线程任务流程
- 尝试获取一个任务
- 若有任务
- 执行任务
- 删除任务
- 若无任务
- 检查是否有需要杀死的线程
- 若有需要杀死的线程
- 将当前线程添加进入已经结束线程组
- 将当前线程从线程池中移除
- 若无需要杀死的线程
- 清理需要删除的任务
- 进入等待状态
C++ 语言实现动态变化的线程池
https://blog.mauve.icu/2021/10/13/cpp/thread-pool/