1. many threadprogrammingoverview
1.1 what is many thread?
thread is 程序执行 最 small 单位, 一个process可以package含 many 个thread. many threadprogramming is 指 in 一个程序in同时执行 many 个thread, 以improving程序 执行efficiency and response速度.
in C++11之 before , C++标准libraryin没 has in 置 threadsupport, Development者需要using平台specific API (such asWindows CreateThread or POSIX pthread) 来creation and managementthread. C++11引入了<thread>头file, providing了跨平台 threadsupport.
1.2 many thread 优势
- improving程序performance: 充分利用 many 核CPU 优势, parallel执行 many 个task.
- 改善user体验: in after 台执行耗时operation, 保持UIresponse.
- 简化程序design: 将不同functions code分离 to 不同 threadin, 使程序structure更清晰.
- resource共享: thread之间可以方便地共享memoryresource, reducingdatacopy 开销.
1.3 many thread challenges
- threadsynchronization: many 个thread访问共享resource时需要synchronization, 避免data竞争.
- 死lock: 两个 or many 个thread互相etc.待 for 方释放resource, 导致程序卡住.
- race condition: 程序 执行结果依赖于thread 执行顺序, 导致结果不确定.
- resource消耗: creation and managementthread需要消耗systemresource.
- debug difficult : many thread程序 behavior难以预测, debug起来比较 difficult .
2. thread creation and management
2.1 thread creation
C++11inusingstd::threadclass来creationthread:
2.1.1 usingfunctioncreationthread
#include <iostream>
#include <thread>
using namespace std;
void printHello() {
cout << "Hello from thread!" << endl;
}
int main() {
// creationthread
thread t(printHello);
// etc.待thread结束
t.join();
cout << "Hello from main!" << endl;
return 0;
}
2.1.2 usinglambda表达式creationthread
#include <iostream>
#include <thread>
using namespace std;
int main() {
// usinglambda表达式creationthread
thread t([]() {
cout << "Hello from lambda thread!" << endl;
});
t.join();
cout << "Hello from main!" << endl;
return 0;
}
2.1.3 传递parameter给threadfunction
#include <iostream>
#include <thread>
#include <string>
using namespace std;
void printMessage(const string& message, int count) {
for (int i = 0; i < count; i++) {
cout << message << " " << i << endl;
}
}
int main() {
// 传递parameter给threadfunction
thread t(printMessage, "Hello", 5);
t.join();
return 0;
}
2.2 thread management
2.2.1 etc.待thread结束 (join)
join()method会阻塞当 before thread, 直 to 被调用 thread执行完毕:
thread t(printHello); t.join(); // etc.待tthread结束
2.2.2 分离thread (detach)
detach()method会将thread and threadobject分离, thread会 in after 台继续执行, 当thread结束时, 其resource会被自动回收:
thread t(printHello); t.detach(); // 分离thread // 主thread可以继续执行othertask
2.2.3 checkthread is 否可连接
joinable()method用于checkthread is 否可连接 (即 is 否可以调用join()) :
thread t(printHello);
if (t.joinable()) {
t.join();
}
2.3 thread property
2.3.1 获取threadID
usingget_id()method获取thread 唯一标识符:
#include <iostream>
#include <thread>
using namespace std;
void printThreadId() {
cout << "Thread ID: " << this_thread::get_id() << endl;
}
int main() {
thread t1(printThreadId);
thread t2(printThreadId);
cout << "Main thread ID: " << this_thread::get_id() << endl;
t1.join();
t2.join();
return 0;
}
2.3.2 获取硬件thread数
usingthread::hardware_concurrency()获取systemsupport concurrentthread数:
#include <iostream>
#include <thread>
using namespace std;
int main() {
int concurrency = thread::hardware_concurrency();
cout << "硬件thread数: " << concurrency << endl;
return 0;
}
3. threadsynchronization
3.1 互斥lock (mutex)
互斥lock is 最basic threadsynchronizationmechanism, 用于保护共享resource, 确保同一时刻只 has 一个thread可以访问共享resource:
3.1.1 basicusing
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx; // 互斥lock
int sharedCounter = 0;
void incrementCounter(int times) {
for (int i = 0; i < times; i++) {
mtx.lock(); // 加lock
sharedCounter++;
mtx.unlock(); // 解lock
}
}
int main() {
thread t1(incrementCounter, 100000);
thread t2(incrementCounter, 100000);
t1.join();
t2.join();
cout << "最终计数器值: " << sharedCounter << endl;
return 0;
}
3.1.2 usinglock_guard
std::lock_guard is a RAII风格 lockmanagementclass, 它会 in construct时自动加lock, in 析构时自动解lock, 避免忘记解lock导致 死lock:
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx;
int sharedCounter = 0;
void incrementCounter(int times) {
for (int i = 0; i < times; i++) {
lock_guard lock(mtx); // 自动加lock
sharedCounter++; // 访问共享resource
} // 自动解lock
}
int main() {
thread t1(incrementCounter, 100000);
thread t2(incrementCounter, 100000);
t1.join();
t2.join();
cout << "最终计数器值: " << sharedCounter << endl;
return 0;
}
3.1.3 usingunique_lock
std::unique_lock is a 更flexible lockmanagementclass, It supportslatency加lock, 手动解lock, 转移所 has 权etc.functions:
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx;
int sharedCounter = 0;
void incrementCounter(int times) {
for (int i = 0; i < times; i++) {
unique_lock lock(mtx); // 自动加lock
sharedCounter++;
// 可以手动解lock
lock.unlock();
// 执行不需要lock operation
// ...
lock.lock(); // 重 new 加lock
} // 自动解lock
}
int main() {
thread t1(incrementCounter, 100000);
thread t2(incrementCounter, 100000);
t1.join();
t2.join();
cout << "最终计数器值: " << sharedCounter << endl;
return 0;
}
3.2 条件variable (condition_variable)
条件variable用于thread间 notificationmechanism, 一个thread可以etc.待某个条件满足, 另一个thread可以 in 条件满足时notificationetc.待 thread:
#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> using namespace std; queuedataQueue; mutex mtx; condition_variable cv; bool done = false; void producer() { for (int i = 0; i < 10; i++) { { lock_guard lock(mtx); dataQueue.push(i); cout << "producedata: " << i << endl; } // notificationconsume者 cv.notify_one(); this_thread::sleep_for(chrono::milliseconds(100)); } // 标记producecompletion { lock_guard lock(mtx); done = true; } cv.notify_one(); } void consumer() { while (true) { unique_lock lock(mtx); // etc.待条件: queue不 for 空 or producecompletion cv.wait(lock, []{ return !dataQueue.empty() || done; }); // check is 否producecompletion且queue for 空 if (done && dataQueue.empty()) { break; } // consumedata int data = dataQueue.front(); dataQueue.pop(); cout << "consumedata: " << data << endl; } cout << "consumecompletion" << endl; } int main() { thread producerThread(producer); thread consumerThread(consumer); producerThread.join(); consumerThread.join(); return 0; }
3.3 原子operation (atomic)
原子operation is 不可in断 operation, 用于 simple 共享variable 读写, 不需要using互斥lock:
#include <iostream> #include <thread> #include <atomic> using namespace std; atomicsharedCounter(0); // 原子variable void incrementCounter(int times) { for (int i = 0; i < times; i++) { sharedCounter++; } } int main() { thread t1(incrementCounter, 100000); thread t2(incrementCounter, 100000); t1.join(); t2.join(); cout << "最终计数器值: " << sharedCounter << endl; return 0; }
3.4 互斥lock 死lockissues
死lock is 指两个 or many 个thread互相etc.待 for 方释放resource, 导致程序卡住 circumstances. 以 under is a 死lock 例子:
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx1, mtx2;
void thread1() {
cout << "thread1尝试lock定mtx1" << endl;
lock_guard lock1(mtx1);
this_thread::sleep_for(chrono::milliseconds(100));
cout << "thread1尝试lock定mtx2" << endl;
lock_guard lock2(mtx2);
cout << "thread1获取了两个lock" << endl;
}
void thread2() {
cout << "thread2尝试lock定mtx2" << endl;
lock_guard lock2(mtx2);
this_thread::sleep_for(chrono::milliseconds(100));
cout << "thread2尝试lock定mtx1" << endl;
lock_guard lock1(mtx1);
cout << "thread2获取了两个lock" << endl;
}
int main() {
thread t1(thread1);
thread t2(thread2);
t1.join();
t2.join();
return 0;
}
3.4.1 避免死lock method
- lock 顺序一致: 所 has thread按照相同 顺序获取lock.
- usingstd::lock: 同时获取 many 个lock, 避免in间status.
- usingstd::scoped_lock: C++17引入 RAII风格 many lockmanagementclass.
- usinglock超时: in 获取lock时设置超时时间, 避免无限etc.待.
- 避免嵌套lock: 尽量reducing获取 many 个lock circumstances.
3.4.2 usingstd::lock避免死lock
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
mutex mtx1, mtx2;
void thread1() {
cout << "thread1尝试lock定两个lock" << endl;
lock(mtx1, mtx2); // 同时获取两个lock
lock_guard lock1(mtx1, adopt_lock); // adopts已获取 lock
lock_guard lock2(mtx2, adopt_lock);
cout << "thread1获取了两个lock" << endl;
}
void thread2() {
cout << "thread2尝试lock定两个lock" << endl;
lock(mtx1, mtx2); // 同时获取两个lock
lock_guard lock1(mtx1, adopt_lock);
lock_guard lock2(mtx2, adopt_lock);
cout << "thread2获取了两个lock" << endl;
}
int main() {
thread t1(thread1);
thread t2(thread2);
t1.join();
t2.join();
return 0;
}
4. thread池
4.1 thread池 concepts
thread池 is amanagementthread mechanism, 它预先creation一定数量 thread, 当 has task需要执行时, 将task分配给空闲 thread, 而不 is 每次都creation new thread.
thread池 优势:
- reducingthreadcreation and 销毁 开销
- 控制concurrentthread 数量, 避免thread过 many 导致systemresource耗尽
- improvingtaskprocessing response速度
- 方便management and monitorthread
4.2 simple thread池 implementation
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
using namespace std;
class ThreadPool {
private:
vector workers;
queue> tasks;
mutex mtx;
condition_variable cv;
bool stop;
public:
ThreadPool(size_t threads) : stop(false) {
// creationthread
for (size_t i = 0; i < threads; i++) {
workers.emplace_back([this] {
while (true) {
function task;
{
unique_lock lock(this->mtx);
this->cv.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) {
return;
}
task = move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
// 析构function
~ThreadPool() {
{
unique_lock lock(mtx);
stop = true;
}
cv.notify_all();
for (thread &worker : workers) {
worker.join();
}
}
// 添加task
template
void enqueue(F&& f) {
{
unique_lock lock(mtx);
tasks.emplace(forward(f));
}
cv.notify_one();
}
};
int main() {
// creationthread池, using4个thread
ThreadPool pool(4);
// 添加task
for (int i = 0; i < 10; i++) {
pool.enqueue([i] {
cout << "task " << i << " 正 in 执行, threadID: " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::milliseconds(100));
cout << "task " << i << " 执行completion" << endl;
});
}
// etc.待所 has taskcompletion
this_thread::sleep_for(chrono::seconds(2));
return 0;
}
5. parallelalgorithms
5.1 C++17parallelalgorithmslibrary
C++17引入了parallelalgorithmslibrary, 它 is for 标准algorithmslibrary scale, supportparallel执行algorithms:
5.1.1 parallelalgorithms using
#include <iostream>
#include <vector>
#include <algorithms>
#include <execution>
#include <chrono>
using namespace std;
using namespace std::chrono;
int main() {
// creation一个 big 向量
vector v(10000000);
// 填充随机数
for (int& i : v) {
i = rand() % 1000;
}
// 顺序sort
auto start = high_resolution_clock::now();
sort(std::execution::seq, v.begin(), v.end());
auto end = high_resolution_clock::now();
cout << "顺序sort时间: " << duration_cast(end - start).count() << "ms" << endl;
// 打乱向量
shuffle(v.begin(), v.end(), std::mt19937{std::random_device{}()});
// parallelsort
start = high_resolution_clock::now();
sort(std::execution::par, v.begin(), v.end());
end = high_resolution_clock::now();
cout << "parallelsort时间: " << duration_cast(end - start).count() << "ms" << endl;
return 0;
}
5.1.2 parallel执行策略
- std::execution::seq: 顺序执行
- std::execution::par: parallel执行
- std::execution::par_unseq: parallel且向量化执行
- std::execution::unseq: 向量化执行
实践case: parallel计算质数
writing一个C++程序, using many threadparallel计算指定范围 in 质数数量.
requirementsanalysis
- implementation一个function, 判断一个数 is 否 for 质数.
- 将指定范围 number分成 many 个区间, 每个区间由一个threadprocessing.
- usingthread池来managementthread.
- 汇总所 has thread 计算结果, 得 to 最终 质数数量.
referencecode
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <chrono>
using namespace std;
using namespace std::chrono;
// thread池class
class ThreadPool {
private:
vector workers;
queue> tasks;
mutex mtx;
condition_variable cv;
bool stop;
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; i++) {
workers.emplace_back([this] {
while (true) {
function task;
{
unique_lock lock(this->mtx);
this->cv.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) {
return;
}
task = move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
unique_lock lock(mtx);
stop = true;
}
cv.notify_all();
for (thread &worker : workers) {
worker.join();
}
}
template
void enqueue(F&& f) {
{
unique_lock lock(mtx);
tasks.emplace(forward(f));
}
cv.notify_one();
}
};
// 判断 is 否 for 质数
bool isPrime(int n) {
if (n <= 1) return false;
if (n <= 3) return true;
if (n % 2 == 0 || n % 3 == 0) return false;
for (int i = 5; i * i <= n; i += 6) {
if (n % i == 0 || n % (i + 2) == 0) return false;
}
return true;
}
int main() {
const int maxNumber = 10000000;
const int threadCount = thread::hardware_concurrency();
cout << "计算 1 to " << maxNumber << " 之间 质数数量" << endl;
cout << "using " << threadCount << " 个thread" << endl;
ThreadPool pool(threadCount);
vector results(threadCount, 0);
mutex resultMutex;
auto start = high_resolution_clock::now();
// 分割task
int chunkSize = maxNumber / threadCount;
for (int i = 0; i < threadCount; i++) {
int startNum = i * chunkSize + 1;
int endNum = (i == threadCount - 1) ? maxNumber : (i + 1) * chunkSize;
pool.enqueue([&results, i, startNum, endNum] {
int count = 0;
for (int num = startNum; num <= endNum; num++) {
if (isPrime(num)) {
count++;
}
}
results[i] = count;
});
}
// etc.待所 has taskcompletion
this_thread::sleep_for(chrono::seconds(2));
// 汇summarized果
int totalPrimes = 0;
for (int count : results) {
totalPrimes += count;
}
auto end = high_resolution_clock::now();
auto duration = duration_cast(end - start).count();
cout << "质数数量: " << totalPrimes << endl;
cout << "计算时间: " << duration << " ms" << endl;
return 0;
}
run结果
计算 1 to 10000000 之间 质数数量 using 8 个thread 质数数量: 664579 计算时间: 1256 ms
互动练习
练习1: writing一个C++程序, using many thread计算斐波那契数列 第n项.
练习2: modifythread池 implementation, 添加task结果 返回functions, 使thread池able to返回task执行 结果.
练习3: writing一个C++程序, using many threadparallelprocessing一个 big 型array, 计算arrayin所 has 元素 平均值.