C++ many threadprogrammingtutorial

LearningC++in many threadprogramming, includingthread creation, synchronization, 互斥etc. in 容

返回tutoriallist

1. many threadprogrammingoverview

1.1 what is many thread?

thread is 程序执行 最 small 单位, 一个process可以package含 many 个thread. many threadprogramming is 指 in 一个程序in同时执行 many 个thread, 以improving程序 执行efficiency and response速度.

in C++11之 before , C++标准libraryin没 has in 置 threadsupport, Development者需要using平台specific API (such asWindows CreateThread or POSIX pthread) 来creation and managementthread. C++11引入了<thread>头file, providing了跨平台 threadsupport.

1.2 many thread 优势

  • improving程序performance: 充分利用 many 核CPU 优势, parallel执行 many 个task.
  • 改善user体验: in after 台执行耗时operation, 保持UIresponse.
  • 简化程序design: 将不同functions code分离 to 不同 threadin, 使程序structure更清晰.
  • resource共享: thread之间可以方便地共享memoryresource, reducingdatacopy 开销.

1.3 many thread challenges

  • threadsynchronization: many 个thread访问共享resource时需要synchronization, 避免data竞争.
  • 死lock: 两个 or many 个thread互相etc.待 for 方释放resource, 导致程序卡住.
  • race condition: 程序 执行结果依赖于thread 执行顺序, 导致结果不确定.
  • resource消耗: creation and managementthread需要消耗systemresource.
  • debug difficult : many thread程序 behavior难以预测, debug起来比较 difficult .

2. thread creation and management

2.1 thread creation

C++11inusingstd::threadclass来creationthread:

2.1.1 usingfunctioncreationthread

#include <iostream>
#include <thread>

using namespace std;

void printHello() {
    cout << "Hello from thread!" << endl;
}

int main() {
    // creationthread
    thread t(printHello);
    
    // etc.待thread结束
    t.join();
    
    cout << "Hello from main!" << endl;
    return 0;
}

2.1.2 usinglambda表达式creationthread

#include <iostream>
#include <thread>

using namespace std;

int main() {
    // usinglambda表达式creationthread
    thread t([]() {
        cout << "Hello from lambda thread!" << endl;
    });
    
    t.join();
    cout << "Hello from main!" << endl;
    return 0;
}

2.1.3 传递parameter给threadfunction

#include <iostream>
#include <thread>
#include <string>

using namespace std;

void printMessage(const string& message, int count) {
    for (int i = 0; i < count; i++) {
        cout << message << " " << i << endl;
    }
}

int main() {
    // 传递parameter给threadfunction
    thread t(printMessage, "Hello", 5);
    
    t.join();
    return 0;
}

2.2 thread management

2.2.1 etc.待thread结束 (join)

join()method会阻塞当 before thread, 直 to 被调用 thread执行完毕:

thread t(printHello);
t.join(); // etc.待tthread结束

2.2.2 分离thread (detach)

detach()method会将thread and threadobject分离, thread会 in after 台继续执行, 当thread结束时, 其resource会被自动回收:

thread t(printHello);
t.detach(); // 分离thread
// 主thread可以继续执行othertask

2.2.3 checkthread is 否可连接

joinable()method用于checkthread is 否可连接 (即 is 否可以调用join()) :

thread t(printHello);
if (t.joinable()) {
    t.join();
}

2.3 thread property

2.3.1 获取threadID

usingget_id()method获取thread 唯一标识符:

#include <iostream>
#include <thread>

using namespace std;

void printThreadId() {
    cout << "Thread ID: " << this_thread::get_id() << endl;
}

int main() {
    thread t1(printThreadId);
    thread t2(printThreadId);
    
    cout << "Main thread ID: " << this_thread::get_id() << endl;
    
    t1.join();
    t2.join();
    return 0;
}

2.3.2 获取硬件thread数

usingthread::hardware_concurrency()获取systemsupport concurrentthread数:

#include <iostream>
#include <thread>

using namespace std;

int main() {
    int concurrency = thread::hardware_concurrency();
    cout << "硬件thread数: " << concurrency << endl;
    return 0;
}

3. threadsynchronization

3.1 互斥lock (mutex)

互斥lock is 最basic threadsynchronizationmechanism, 用于保护共享resource, 确保同一时刻只 has 一个thread可以访问共享resource:

3.1.1 basicusing

#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

mutex mtx; // 互斥lock
int sharedCounter = 0;

void incrementCounter(int times) {
    for (int i = 0; i < times; i++) {
        mtx.lock(); // 加lock
        sharedCounter++;
        mtx.unlock(); // 解lock
    }
}

int main() {
    thread t1(incrementCounter, 100000);
    thread t2(incrementCounter, 100000);
    
    t1.join();
    t2.join();
    
    cout << "最终计数器值: " << sharedCounter << endl;
    return 0;
}

3.1.2 usinglock_guard

std::lock_guard is a RAII风格 lockmanagementclass, 它会 in construct时自动加lock, in 析构时自动解lock, 避免忘记解lock导致 死lock:

#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

mutex mtx;
int sharedCounter = 0;

void incrementCounter(int times) {
    for (int i = 0; i < times; i++) {
        lock_guard lock(mtx); // 自动加lock
        sharedCounter++; // 访问共享resource
    } // 自动解lock
}

int main() {
    thread t1(incrementCounter, 100000);
    thread t2(incrementCounter, 100000);
    
    t1.join();
    t2.join();
    
    cout << "最终计数器值: " << sharedCounter << endl;
    return 0;
}

3.1.3 usingunique_lock

std::unique_lock is a 更flexible lockmanagementclass, It supportslatency加lock, 手动解lock, 转移所 has 权etc.functions:

#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

mutex mtx;
int sharedCounter = 0;

void incrementCounter(int times) {
    for (int i = 0; i < times; i++) {
        unique_lock lock(mtx); // 自动加lock
        sharedCounter++;
        // 可以手动解lock
        lock.unlock();
        // 执行不需要lock operation
        // ...
        lock.lock(); // 重 new 加lock
    } // 自动解lock
}

int main() {
    thread t1(incrementCounter, 100000);
    thread t2(incrementCounter, 100000);
    
    t1.join();
    t2.join();
    
    cout << "最终计数器值: " << sharedCounter << endl;
    return 0;
}

3.2 条件variable (condition_variable)

条件variable用于thread间 notificationmechanism, 一个thread可以etc.待某个条件满足, 另一个thread可以 in 条件满足时notificationetc.待 thread:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

using namespace std;

queue dataQueue;
mutex mtx;
condition_variable cv;
bool done = false;

void producer() {
    for (int i = 0; i < 10; i++) {
        {
            lock_guard lock(mtx);
            dataQueue.push(i);
            cout << "producedata: " << i << endl;
        }
        // notificationconsume者
        cv.notify_one();
        this_thread::sleep_for(chrono::milliseconds(100));
    }
    
    // 标记producecompletion
    { 
        lock_guard lock(mtx);
        done = true;
    }
    cv.notify_one();
}

void consumer() {
    while (true) {
        unique_lock lock(mtx);
        // etc.待条件: queue不 for 空 or producecompletion
        cv.wait(lock, []{ return !dataQueue.empty() || done; });
        
        // check is 否producecompletion且queue for 空
        if (done && dataQueue.empty()) {
            break;
        }
        
        // consumedata
        int data = dataQueue.front();
        dataQueue.pop();
        cout << "consumedata: " << data << endl;
    }
    cout << "consumecompletion" << endl;
}

int main() {
    thread producerThread(producer);
    thread consumerThread(consumer);
    
    producerThread.join();
    consumerThread.join();
    
    return 0;
}

3.3 原子operation (atomic)

原子operation is 不可in断 operation, 用于 simple 共享variable 读写, 不需要using互斥lock:

#include <iostream>
#include <thread>
#include <atomic>

using namespace std;

atomic sharedCounter(0); // 原子variable

void incrementCounter(int times) {
    for (int i = 0; i < times; i++) {
        sharedCounter++;
    }
}

int main() {
    thread t1(incrementCounter, 100000);
    thread t2(incrementCounter, 100000);
    
    t1.join();
    t2.join();
    
    cout << "最终计数器值: " << sharedCounter << endl;
    return 0;
}

3.4 互斥lock 死lockissues

死lock is 指两个 or many 个thread互相etc.待 for 方释放resource, 导致程序卡住 circumstances. 以 under is a 死lock 例子:

#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

mutex mtx1, mtx2;

void thread1() {
    cout << "thread1尝试lock定mtx1" << endl;
    lock_guard lock1(mtx1);
    this_thread::sleep_for(chrono::milliseconds(100));
    cout << "thread1尝试lock定mtx2" << endl;
    lock_guard lock2(mtx2);
    cout << "thread1获取了两个lock" << endl;
}

void thread2() {
    cout << "thread2尝试lock定mtx2" << endl;
    lock_guard lock2(mtx2);
    this_thread::sleep_for(chrono::milliseconds(100));
    cout << "thread2尝试lock定mtx1" << endl;
    lock_guard lock1(mtx1);
    cout << "thread2获取了两个lock" << endl;
}

int main() {
    thread t1(thread1);
    thread t2(thread2);
    
    t1.join();
    t2.join();
    
    return 0;
}

3.4.1 避免死lock method

  • lock 顺序一致: 所 has thread按照相同 顺序获取lock.
  • usingstd::lock: 同时获取 many 个lock, 避免in间status.
  • usingstd::scoped_lock: C++17引入 RAII风格 many lockmanagementclass.
  • usinglock超时: in 获取lock时设置超时时间, 避免无限etc.待.
  • 避免嵌套lock: 尽量reducing获取 many 个lock circumstances.

3.4.2 usingstd::lock避免死lock

#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

mutex mtx1, mtx2;

void thread1() {
    cout << "thread1尝试lock定两个lock" << endl;
    lock(mtx1, mtx2); // 同时获取两个lock
    lock_guard lock1(mtx1, adopt_lock); // adopts已获取 lock
    lock_guard lock2(mtx2, adopt_lock);
    cout << "thread1获取了两个lock" << endl;
}

void thread2() {
    cout << "thread2尝试lock定两个lock" << endl;
    lock(mtx1, mtx2); // 同时获取两个lock
    lock_guard lock1(mtx1, adopt_lock);
    lock_guard lock2(mtx2, adopt_lock);
    cout << "thread2获取了两个lock" << endl;
}

int main() {
    thread t1(thread1);
    thread t2(thread2);
    
    t1.join();
    t2.join();
    
    return 0;
}

4. thread池

4.1 thread池 concepts

thread池 is amanagementthread mechanism, 它预先creation一定数量 thread, 当 has task需要执行时, 将task分配给空闲 thread, 而不 is 每次都creation new thread.

thread池 优势:

  • reducingthreadcreation and 销毁 开销
  • 控制concurrentthread 数量, 避免thread过 many 导致systemresource耗尽
  • improvingtaskprocessing response速度
  • 方便management and monitorthread

4.2 simple thread池 implementation

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>

using namespace std;

class ThreadPool {
private:
    vector workers;
    queue> tasks;
    mutex mtx;
    condition_variable cv;
    bool stop;
public:
    ThreadPool(size_t threads) : stop(false) {
        // creationthread
        for (size_t i = 0; i < threads; i++) {
            workers.emplace_back([this] {
                while (true) {
                    function task;
                    {
                        unique_lock lock(this->mtx);
                        this->cv.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty()) {
                            return;
                        }
                        task = move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    
    // 析构function
    ~ThreadPool() {
        {
            unique_lock lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (thread &worker : workers) {
            worker.join();
        }
    }
    
    // 添加task
    template
    void enqueue(F&& f) {
        {
            unique_lock lock(mtx);
            tasks.emplace(forward(f));
        }
        cv.notify_one();
    }
};

int main() {
    // creationthread池, using4个thread
    ThreadPool pool(4);
    
    // 添加task
    for (int i = 0; i < 10; i++) {
        pool.enqueue([i] {
            cout << "task " << i << " 正 in 执行, threadID: " << this_thread::get_id() << endl;
            this_thread::sleep_for(chrono::milliseconds(100));
            cout << "task " << i << " 执行completion" << endl;
        });
    }
    
    // etc.待所 has taskcompletion
    this_thread::sleep_for(chrono::seconds(2));
    
    return 0;
}

5. parallelalgorithms

5.1 C++17parallelalgorithmslibrary

C++17引入了parallelalgorithmslibrary, 它 is for 标准algorithmslibrary scale, supportparallel执行algorithms:

5.1.1 parallelalgorithms using

#include <iostream>
#include <vector>
#include <algorithms>
#include <execution>
#include <chrono>

using namespace std;
using namespace std::chrono;

int main() {
    // creation一个 big 向量
    vector v(10000000);
    
    // 填充随机数
    for (int& i : v) {
        i = rand() % 1000;
    }
    
    // 顺序sort
    auto start = high_resolution_clock::now();
    sort(std::execution::seq, v.begin(), v.end());
    auto end = high_resolution_clock::now();
    cout << "顺序sort时间: " << duration_cast(end - start).count() << "ms" << endl;
    
    // 打乱向量
    shuffle(v.begin(), v.end(), std::mt19937{std::random_device{}()});
    
    // parallelsort
    start = high_resolution_clock::now();
    sort(std::execution::par, v.begin(), v.end());
    end = high_resolution_clock::now();
    cout << "parallelsort时间: " << duration_cast(end - start).count() << "ms" << endl;
    
    return 0;
}

5.1.2 parallel执行策略

  • std::execution::seq: 顺序执行
  • std::execution::par: parallel执行
  • std::execution::par_unseq: parallel且向量化执行
  • std::execution::unseq: 向量化执行

实践case: parallel计算质数

writing一个C++程序, using many threadparallel计算指定范围 in 质数数量.

requirementsanalysis

  • implementation一个function, 判断一个数 is 否 for 质数.
  • 将指定范围 number分成 many 个区间, 每个区间由一个threadprocessing.
  • usingthread池来managementthread.
  • 汇总所 has thread 计算结果, 得 to 最终 质数数量.

referencecode

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <chrono>

using namespace std;
using namespace std::chrono;

// thread池class
class ThreadPool {
private:
    vector workers;
    queue> tasks;
    mutex mtx;
    condition_variable cv;
    bool stop;
public:
    ThreadPool(size_t threads) : stop(false) {
        for (size_t i = 0; i < threads; i++) {
            workers.emplace_back([this] {
                while (true) {
                    function task;
                    {
                        unique_lock lock(this->mtx);
                        this->cv.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty()) {
                            return;
                        }
                        task = move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    
    ~ThreadPool() {
        {
            unique_lock lock(mtx);
            stop = true;
        }
        cv.notify_all();
        for (thread &worker : workers) {
            worker.join();
        }
    }
    
    template
    void enqueue(F&& f) {
        {
            unique_lock lock(mtx);
            tasks.emplace(forward(f));
        }
        cv.notify_one();
    }
};

// 判断 is 否 for 质数
bool isPrime(int n) {
    if (n <= 1) return false;
    if (n <= 3) return true;
    if (n % 2 == 0 || n % 3 == 0) return false;
    for (int i = 5; i * i <= n; i += 6) {
        if (n % i == 0 || n % (i + 2) == 0) return false;
    }
    return true;
}

int main() {
    const int maxNumber = 10000000;
    const int threadCount = thread::hardware_concurrency();
    
    cout << "计算 1  to  " << maxNumber << " 之间 质数数量" << endl;
    cout << "using " << threadCount << " 个thread" << endl;
    
    ThreadPool pool(threadCount);
    vector results(threadCount, 0);
    mutex resultMutex;
    
    auto start = high_resolution_clock::now();
    
    // 分割task
    int chunkSize = maxNumber / threadCount;
    for (int i = 0; i < threadCount; i++) {
        int startNum = i * chunkSize + 1;
        int endNum = (i == threadCount - 1) ? maxNumber : (i + 1) * chunkSize;
        
        pool.enqueue([&results, i, startNum, endNum] {
            int count = 0;
            for (int num = startNum; num <= endNum; num++) {
                if (isPrime(num)) {
                    count++;
                }
            }
            results[i] = count;
        });
    }
    
    // etc.待所 has taskcompletion
    this_thread::sleep_for(chrono::seconds(2));
    
    // 汇summarized果
    int totalPrimes = 0;
    for (int count : results) {
        totalPrimes += count;
    }
    
    auto end = high_resolution_clock::now();
    auto duration = duration_cast(end - start).count();
    
    cout << "质数数量: " << totalPrimes << endl;
    cout << "计算时间: " << duration << " ms" << endl;
    
    return 0;
}

run结果

计算 1  to  10000000 之间 质数数量
using 8 个thread
质数数量: 664579
计算时间: 1256 ms

互动练习

练习1: writing一个C++程序, using many thread计算斐波那契数列 第n项.

练习2: modifythread池 implementation, 添加task结果 返回functions, 使thread池able to返回task执行 结果.

练习3: writing一个C++程序, using many threadparallelprocessing一个 big 型array, 计算arrayin所 has 元素 平均值.