Skip to content

Latest commit

 

History

History
815 lines (663 loc) · 50.6 KB

C++并发编程基础:线程同步方式全解析:从基础到高级应用.md

File metadata and controls

815 lines (663 loc) · 50.6 KB

C++线程同步全解析:从基础到高级应用

1. 引言

1.1 什么是线程同步

线程同步是指在多线程编程中,协调多个线程的执行顺序、访问共享资源等操作,以确保程序的正确性和稳定性。当多个线程同时访问共享资源(如全局变量、文件等)时,如果不进行同步,可能会导致数据不一致、竞态条件等问题。例如,多个线程同时对一个共享变量进行读写操作,可能会使该变量的值出现错误的结果,因为线程的执行顺序是不确定的。

1.2 为什么需要线程同步

在多线程程序中,每个线程都有自己独立的执行路径和栈空间,但它们可能会共享一些数据和资源。如果不对这些共享资源的访问进行控制,就会出现数据竞争和并发问题,从而导致程序的行为不可预测,甚至出现崩溃等严重错误。通过线程同步,可以保证共享资源在同一时刻只能被一个线程访问或修改,避免数据冲突和错误的发生,同时也可以协调线程之间的执行顺序,使它们按照预期的方式协作完成任务。

1.3 线程同步的基本概念

线程同步主要涉及到互斥(Mutual Exclusion)和协作(Cooperation)两个方面。互斥是指保证在同一时刻只有一个线程能够访问共享资源,避免数据竞争。协作则是指线程之间需要相互等待、通知等操作,以协调它们的执行顺序,确保程序的逻辑正确性。常见的线程同步机制包括锁、条件变量、信号量、事件、屏障等,它们通过不同的方式来实现互斥和协作,以满足不同场景下的线程同步需求。

2. 锁(Mutex)

2.1 什么是锁

锁(Mutex,全称 Mutex Lock,互斥锁)是一种最基本的线程同步机制,它用于保证在同一时刻只有一个线程能够进入临界区(Critical Section),访问共享资源。当一个线程获取到锁时,其他线程如果也试图获取该锁,就会被阻塞,直到持有锁的线程释放锁为止。

2.2 为什么使用锁

在多线程环境下,如果多个线程同时访问和修改共享资源,就会导致数据不一致的问题。例如,一个线程正在对一个共享变量进行写入操作,而另一个线程同时读取该变量,可能会读到一个错误的中间值。使用锁可以确保在一个线程访问共享资源时,其他线程被阻止,从而保证数据的完整性和一致性。

2.3 如何使用锁

2.3.1 代码示例

#include <iostream>
#include <thread>
#include <mutex>

// 定义一个互斥锁
std::mutex mutex;
// 共享变量
int sharedVariable = 0;

// 线程函数,对共享变量进行自增操作
void increment() {
    // 获取锁
    std::lock_guard<std::mutex> guard(mutex);
    // 对共享变量进行自增操作
    sharedVariable++;
    // guard 析构时自动释放锁,无需手动释放
}

int main() {
    // 创建两个线程
    std::thread t1(increment);
    std::thread t2(increment);

    // 等待线程执行完毕
    t1.join();
    t2.join();

    // 输出共享变量的值
    std::cout << "共享变量的值:" << sharedVariable << std::endl;

    return 0;
}

2.3.2 代码逐行解析

  • std::mutex mutex;:定义了一个std::mutex类型的互斥锁对象mutex,用于保护共享资源的访问。
  • std::lock_guard<std::mutex> guard(mutex);:在increment函数中,使用std::lock_guard来获取锁mutexstd::lock_guard是一个 RAII(Resource Acquisition Is Initialization)类型的对象,它在构造函数中获取锁,并在析构函数中自动释放锁,确保即使在函数发生异常的情况下,锁也能被正确释放,避免了死锁和资源泄漏的问题。
  • sharedVariable++;:在获取锁之后,线程对共享变量sharedVariable进行自增操作,由于此时其他线程无法获取锁,所以保证了该操作的原子性,不会出现数据竞争。
  • t1.join();t2.join();:在main函数中,使用join函数等待线程t1t2执行完毕,确保在main函数结束之前,所有线程都已经完成了对共享变量的操作,避免了线程提前结束导致数据不一致的问题。

2.4 应用场景

  • 对共享数据的读写操作:当多个线程需要读写共享数据时,如数据库连接池、缓存数据等,使用锁可以保证数据的一致性和完整性。
  • 保护临界区代码:在一些关键代码段,如对文件的写入操作、对共享设备的访问等,需要确保同一时刻只有一个线程能够执行这些代码,以防止冲突和错误。

2.5 优缺点

  • 优点
    • 简单易用:互斥锁的概念和使用方法相对简单,容易理解和掌握,能够有效地解决多线程对共享资源的访问冲突问题。
    • 互斥性强:能够严格保证在同一时刻只有一个线程进入临界区,避免数据竞争和不一致的情况发生。
  • 缺点
    • 可能导致死锁:如果线程获取锁的顺序不当或者在获取锁后发生异常而没有正确释放锁,可能会导致死锁的发生,使程序陷入无限等待的状态。
    • 性能开销:获取和释放锁都需要一定的系统开销,尤其是在高并发的情况下,频繁地获取和释放锁可能会影响程序的性能。

3. 条件变量(Condition Variable)

3.1 什么是条件变量

条件变量是一种线程同步机制,它允许线程在满足特定条件之前等待,直到其他线程通知它们条件已经满足。条件变量通常与互斥锁一起使用,线程在进入等待状态之前先获取互斥锁,然后在条件不满足时释放互斥锁并进入等待状态,当条件满足时,其他线程通过通知操作唤醒等待的线程,等待线程被唤醒后重新获取互斥锁并继续执行。

3.2 为什么使用条件变量

在某些情况下,线程需要等待某个条件的发生才能继续执行,而单纯使用锁无法满足这种需求。例如,一个线程在等待另一个线程完成某个任务后才能继续工作,或者一个线程在等待共享资源满足特定条件(如队列非空、缓冲区有可用空间等)时才能进行操作。条件变量提供了一种高效的等待和通知机制,使线程能够在条件满足时及时被唤醒并继续执行,避免了线程的忙等待(不断地检查条件是否满足),从而提高了系统的资源利用率和性能。

3.3 如何使用条件变量

3.3.1 代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

// 定义互斥锁和条件变量
std::mutex mutex;
std::condition_variable cv;
// 共享变量
int sharedVariable = 0;

// 生产者线程函数
void producer() {
    // 模拟生产数据
    for (int i = 0; i < 10; ++i) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        // 获取锁
        std::unique_lock<std::mutex> lock(mutex);
        // 更新共享变量
        sharedVariable = i;
        // 通知消费者线程数据已准备好
        cv.notify_one();
    }
}

// 消费者线程函数
void consumer() {
    while (true) {
        // 获取锁
        std::unique_lock<std::mutex> lock(mutex);
        // 等待条件变量满足(共享变量不为 0)
        cv.wait(lock, [] { return sharedVariable!= 0; });
        // 消费数据
        std::cout << "消费数据:" << sharedVariable << std::endl;
        // 重置共享变量
        sharedVariable = 0;
    }
}

int main() {
    // 创建生产者和消费者线程
    std::thread t1(producer);
    std::thread t2(consumer);

    // 等待线程执行完毕
    t1.join();
    t2.join();

    return 0;
}

3.3.2 代码逐行解析

  • std::condition_variable cv;:定义了一个std::condition_variable类型的条件变量cv,用于线程之间的等待和通知操作。
  • std::unique_lock<std::mutex> lock(mutex);:在生产者和消费者线程中,都使用std::unique_lock来获取互斥锁mutexstd::unique_lockstd::lock_guard更加灵活,它允许在获取锁之后手动释放锁,并且支持条件变量的等待操作。
  • cv.notify_one();:在生产者线程中,当生产数据完成后,使用cv.notify_one函数通知一个正在等待的消费者线程,数据已经准备好。这里也可以使用cv.notify_all函数来通知所有等待的线程,但在某些情况下可能会造成不必要的线程唤醒开销。
  • cv.wait(lock, [] { return sharedVariable!= 0; });:在消费者线程中,使用cv.wait函数等待条件变量满足。cv.wait函数会首先释放互斥锁mutex,然后将线程置于等待状态,直到条件变量被通知并且满足给定的条件(这里是sharedVariable!= 0)。当条件满足时,线程会重新获取互斥锁并继续执行后面的代码。

3.4 应用场景

  • 生产者 - 消费者模型:在生产者线程生产数据并将其放入共享缓冲区,消费者线程从缓冲区中取出数据进行消费的场景中,条件变量可以用于协调生产者和消费者之间的工作节奏,确保消费者在缓冲区有数据时才进行消费,生产者在缓冲区有空闲空间时才进行生产。
  • 任务调度:当多个线程在等待特定任务的完成或某个系统资源的可用性时,条件变量可以用来实现高效的任务调度机制,使线程能够在条件满足时及时被唤醒并执行相应的任务。

3.5 优缺点

  • 优点
    • 避免忙等待:线程可以在条件不满足时进入等待状态,释放 CPU 资源,避免了线程的无效循环和 CPU 资源的浪费,提高了系统的性能和资源利用率。
    • 精确唤醒:可以根据具体的条件来唤醒等待的线程,而不是像信号量那样简单地增加计数器的值,从而避免了不必要的线程唤醒和上下文切换开销。
  • 缺点
    • 需要与互斥锁配合使用:条件变量的正确使用依赖于互斥锁来保护共享资源和条件变量本身的访问,这增加了代码的复杂性和出错的可能性。
    • 可能出现虚假唤醒:在某些情况下,线程可能会被虚假唤醒,即没有真正满足条件却被唤醒。因此,在使用条件变量时,需要在等待条件的循环中再次检查条件是否真正满足,以避免程序出现错误的行为。

4. 信号量(Semaphore)

4.1 什么是信号量

信号量是一种用于控制多个线程对共享资源访问的同步机制,它本质上是一个计数器,用于记录可用资源的数量。线程在访问共享资源之前,需要先获取信号量,如果信号量的值大于 0,则表示有可用资源,线程可以继续执行并将信号量的值减 1;如果信号量的值为 0,则表示没有可用资源,线程会被阻塞,直到其他线程释放信号量,使信号量的值增加。

4.2 为什么使用信号量

信号量可以用于解决多个线程对有限数量的共享资源的访问控制问题,例如多个线程同时访问一个数据库连接池,连接池中的连接数量是有限的,通过信号量可以控制同时使用连接的线程数量,避免资源的过度使用和竞争。同时,信号量也可以用于实现线程之间的同步和协作,例如在一个任务需要等待多个其他任务完成后才能继续执行的场景中,可以使用信号量来实现线程之间的等待和通知机制。

4.3 如何使用信号量

4.3.1 代码示例

#include <iostream>
#include <thread>
#include <semaphore.h>

// 定义信号量并初始化
sem_t semaphore;

// 共享资源(这里简单模拟为一个全局变量)
int sharedResource = 0;

// 线程函数,获取信号量后访问共享资源
void accessResource() {
    // 获取信号量
    sem_wait(&semaphore);
    // 访问共享资源
    std::cout << "线程 " << std::this_thread::get_id() << " 访问共享资源,当前值:" << sharedResource << std::endl;
    sharedResource++;
    // 释放信号量
    sem_post(&semaphore);
}

int main() {
    // 初始化信号量,初始值为 3,表示有 3 个可用资源
    sem_init(&semaphore, 0, 3);

    // 创建 5 个线程
    std::thread threads[5];
    for (int i = 0; i < 5; ++i) {
        threads[i] = std::thread(accessResource);
    }

    // 等待所有线程执行完毕
    for (int i = 0; i < 5; ++i) {
        threads[i].join();
    }

    // 销毁信号量
    sem_destroy(&semaphore);

    return 0;
}

4.3.2 代码逐行解析

  • sem_t semaphore;:定义了一个sem_t类型的信号量变量semaphore
  • sem_init(&semaphore, 0, 3);:在main函数中,使用sem_init函数对信号量进行初始化,第二个参数0表示信号量在同一进程的多个线程之间共享,第三个参数3表示信号量的初始值,即初始时有 3 个可用资源。
  • sem_wait(&semaphore);:在accessResource线程函数中,使用sem_wait函数尝试获取信号量。如果信号量的值大于 0,则获取成功,信号量的值减 1,并继续执行后面的代码;如果信号量的值为 0,则线程会被阻塞,直到其他线程释放信号量。
  • sem_post(&semaphore);:在访问完共享资源后,使用sem_post函数释放信号量,将信号量的值加 1,以便其他等待的线程有机会获取信号量并访问共享资源。
  • sem_destroy(&semaphore);:在所有线程执行完毕后,使用sem_destroy函数销毁信号量,释放相关的系统资源。

4.4 应用场景

  • 资源池管理:如数据库连接池、线程池、内存池等,通过信号量来控制同时使用资源的线程数量,确保资源的合理分配和有效利用,避免资源耗尽和过度竞争。
  • 任务同步:在多个线程需要协同完成一个任务的场景中,例如多个线程分别负责不同的子任务,当所有子任务都完成后才能进行下一步操作,可以使用信号量来实现线程之间的同步,主线程可以等待所有子线程通过信号量发出任务完成的信号后再继续执行。

4.5 优缺点

  • 优点
    • 灵活控制资源访问:可以根据实际情况设置信号量的初始值,精确控制同时访问共享资源的线程数量,适用于各种资源有限的场景。
    • 简单高效:信号量的实现相对简单,其操作(获取和释放)的开销相对较小,在高并发场景下能够提供较好的性能表现。
  • 缺点
    • 可能导致死锁:如果线程获取信号量后没有正确释放,或者在获取和释放信号量的过程中出现异常,可能会导致死锁的发生,使程序陷入僵局。
    • 不适合复杂的同步逻辑:信号量主要用于控制资源的访问数量,对于一些复杂的线程同步和协作需求,如线程之间的条件等待和精确唤醒等,使用信号量可能会使代码变得复杂且难以维护,此时可能需要结合其他同步机制(如条件变量)来实现。

5. 事件(Event)

5.1 什么是事件

事件是一种线程同步机制,它允许一个线程等待另一个线程发出的特定事件信号。事件可以处于两种状态:已触发(Signaled)和未触发(Unsignaled)。线程可以通过等待事件来暂停执行,直到事件被其他线程触发,然后继续执行后续的操作。

5.2 为什么使用事件

在一些需要线程之间进行异步通信和同步的场景中,事件提供了一种简洁有效的方式。例如,一个线程在等待某个外部条件满足或者其他线程完成特定任务后才能继续执行,使用事件可以避免线程的轮询等待,提高系统的性能和响应性。事件还可以用于实现多线程程序中的一些复杂的同步逻辑,如多个线程等待同一个事件的发生,或者一个线程在多个事件中选择等待其中一个事件发生等。

5.3 如何使用事件

5.3.1 代码示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

// 定义互斥锁、条件变量和事件标志
std::mutex mutex;
std::condition_variable cv;
bool eventFlag = false;

// 线程函数,等待事件发生
void waitForEvent() {
    std::unique_lock<std::mutex> lock(mutex);
    // 等待事件标志变为 true
    cv.wait(lock, [] { return eventFlag; });
// 执行后续操作
std::cout << "线程执行后续任务" << std::endl;
}

// 线程函数,触发事件
void triggerEvent() {
    std::this_thread::sleep_for(std::chrono::seconds(3));
    {
        std::lock_guard<std::mutex> guard(mutex);
        // 设置事件标志为 true
        eventFlag = true;
    }
    // 通知等待事件的线程
    cv.notify_one();
}

int main() {
    std::thread t1(waitForEvent);
    std::thread t2(triggerEvent);

    t1.join();
    t2.join();

    return 0;
}

5.3.2 代码逐行解析

  • bool eventFlag = false;:定义一个布尔变量eventFlag作为事件标志,初始化为false,表示事件未发生。
  • std::unique_lock<std::mutex> lock(mutex);:在waitForEvent函数中,使用std::unique_lock获取互斥锁mutex,用于保护对事件标志eventFlag的访问和条件变量cv的操作。
  • cv.wait(lock, [] { return eventFlag; });:线程调用cv.wait函数等待事件发生,即等待eventFlag变为truecv.wait会释放互斥锁并将线程置于阻塞状态,直到条件满足(eventFlagtrue)且被其他线程唤醒,唤醒后会重新获取互斥锁继续执行后续代码。
  • eventFlag = true;:在triggerEvent函数中,经过一段时间(这里是 3 秒)后,通过eventFlag = true;设置事件标志为true,表示事件已发生。
  • cv.notify_one();:接着使用cv.notify_one通知正在等待事件的线程(这里只有一个线程在等待),使其从cv.wait的阻塞状态中唤醒,继续执行后续操作。

5.4 应用场景

  • 线程间异步通信:例如在一个网络编程场景中,一个线程负责接收网络数据,当数据接收完成后,通过触发事件通知其他线程进行数据处理,避免数据处理线程的无效等待和频繁轮询。
  • 任务依赖关系:当一个任务的执行依赖于其他多个任务的完成情况时,可以使用事件来协调各个任务之间的执行顺序。每个任务完成后触发相应的事件,依赖这些任务的线程等待所有相关事件发生后再继续执行。

5.5 优缺点

  • 优点
    • 异步通知:能够实现线程之间的高效异步通信,减少线程的无效等待时间,提高系统的整体性能和响应速度。
    • 灵活控制:可以根据需要灵活地设置事件的触发条件和等待方式,满足不同场景下的线程同步需求,适用于复杂的多线程协作场景。
  • 缺点
    • 需要与其他同步机制配合:事件通常需要与互斥锁和条件变量等其他同步机制结合使用,以确保事件标志的正确访问和线程的安全等待与唤醒,这增加了代码的复杂性和理解难度。
    • 可能存在虚假唤醒:与条件变量类似,在某些情况下线程可能会被虚假唤醒,因此在使用事件等待的代码中,需要再次检查事件条件是否真正满足,以确保程序的正确性。

6. 屏障(Barrier)

6.1 什么是屏障

屏障是一种线程同步机制,它允许一组线程在某个特定的点上相互等待,直到所有线程都到达该点后,才一起继续执行后续的操作。屏障就像是一个关卡,所有线程必须都到达这个关卡后才能通过并继续前进。

6.2 为什么使用屏障

在一些并行计算的场景中,例如多个线程共同处理一个大型数据集,每个线程负责一部分数据的计算,在所有线程都完成各自的计算任务之前,后续的合并或汇总操作无法进行。此时使用屏障可以确保所有线程都同步到同一个阶段,避免部分线程过早地执行后续操作而导致数据不一致或错误的结果。屏障有助于协调多个线程的执行进度,保证整个计算过程的正确性和完整性。

6.3 如何使用屏障

6.3.1 代码示例

#include <iostream>
#include <thread>
#include <barrier>

// 创建一个屏障对象,设置需要等待的线程数量为 3
std::barrier barrier(3);

// 线程函数,执行一些计算任务后等待屏障
void doWork() {
    std::cout << "线程 " << std::this_thread::get_id() << " 开始工作" << std::endl;
    // 模拟一些计算任务
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "线程 " << std::this_thread::get_id() << " 完成工作,等待其他线程" << std::endl;
    // 等待所有线程到达屏障
    barrier.arrive_and_wait();
    std::cout << "所有线程已到达屏障,线程 " << std::this_thread::get_id() << " 继续执行" << std::endl;
}

int main() {
    // 创建 3 个线程
    std::thread t1(doWork);
    std::thread t2(doWork);
    std::thread t3(doWork);

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

6.3.2 代码逐行解析

  • std::barrier barrier(3);:定义了一个std::barrier对象barrier,并设置需要等待的线程数量为 3。这意味着当 3 个线程都调用barrier.arrive_and_wait函数时,它们才会一起继续执行后续代码。
  • barrier.arrive_and_wait();:在doWork线程函数中,每个线程在完成自己的计算任务(这里通过std::this_thread::sleep_for模拟)后,调用barrier.arrive_and_wait函数,表示该线程已经到达屏障并等待其他线程。当所有线程都到达屏障后,barrier会唤醒所有等待的线程,使它们继续执行后续的操作。

6.4 应用场景

  • 并行计算:在大规模数据并行处理的场景中,如矩阵乘法、图像处理等,使用屏障来同步多个线程的计算阶段,确保每个线程在进行下一步操作之前,所有线程都已经完成了当前阶段的计算任务,从而保证数据的一致性和计算结果的正确性。
  • 游戏开发:在多人游戏中,多个线程可能分别负责不同的游戏逻辑,如物理模拟、图形渲染、网络通信等。在某些关键节点,如游戏场景切换、玩家同步等时刻,需要使用屏障来确保所有相关线程都准备好后再进行下一步操作,避免出现游戏画面撕裂、玩家状态不同步等问题。

6.5 优缺点

  • 优点
    • 精确同步:能够确保一组线程在特定的点上实现精确的同步,保证所有线程都处于相同的执行阶段后再继续前进,对于一些对同步要求严格的并行计算任务非常重要。
    • 简单易用:std::barrier在 C++ 标准库中的使用相对简单,程序员只需要设置好需要等待的线程数量,并在每个线程中调用相应的等待函数即可实现线程的同步,无需复杂的同步逻辑代码。
  • 缺点
    • 性能开销:在实现线程同步的过程中,屏障机制可能会引入一定的性能开销,尤其是在需要频繁同步大量线程的场景中,线程的等待和唤醒操作可能会消耗较多的 CPU 时间和系统资源,影响程序的整体性能。
    • 灵活性受限:屏障的功能相对较为单一,主要用于实现线程之间的固定点同步。对于一些复杂的线程同步场景,如线程之间需要根据不同的条件进行动态的等待和唤醒,或者需要实现更精细的同步控制逻辑,屏障可能无法满足需求,需要结合其他同步机制来实现。

7. 原子操作(Atomic Operations)

7.1 什么是原子操作

原子操作是指在执行过程中不会被其他线程中断的操作,即它是一个不可分割的操作单元,要么全部执行完成,要么根本不执行,不存在中间状态被其他线程干扰的情况。原子操作通常用于对共享变量的读写操作,以保证在多线程环境下数据的一致性和正确性。

7.2 为什么使用原子操作

在多线程程序中,当多个线程同时访问和修改同一个共享变量时,如果没有适当的同步机制,就会出现数据竞争和不一致的问题。传统的互斥锁机制虽然可以解决这个问题,但在一些情况下,使用互斥锁可能会带来较大的性能开销,尤其是对于一些简单的共享变量操作,如计数器的自增自减等。原子操作提供了一种轻量级的同步方式,它能够在不使用锁的情况下,保证对共享变量的操作的原子性,从而提高程序的性能和并发能力。

7.3 如何使用原子操作

7.3.1 代码示例

#include <iostream>
#include <thread>
#include <atomic>

// 定义一个原子整数类型的变量
std::atomic<int> atomicVariable(0);

// 线程函数,对原子变量进行自增操作
void incrementAtomic() {
    for (int i = 0; i < 1000; ++i) {
        // 原子自增操作
        atomicVariable++;
    }
}

int main() {
    // 创建两个线程
    std::thread t1(incrementAtomic);
    std::thread t2(incrementAtomic);

    t1.join();
    t2.join();

    // 输出原子变量的值
    std::cout << "原子变量的值:" << atomicVariable.load() << std::endl;

    return 0;
}

7.3.2 代码逐行解析

  • std::atomic<int> atomicVariable(0);:定义了一个std::atomic<int>类型的原子变量atomicVariable,并初始化为 0。std::atomic模板类提供了一系列原子操作函数,用于保证对其封装的数据类型的操作的原子性。
  • atomicVariable++;:在incrementAtomic线程函数中,使用atomicVariable++对原子变量进行自增操作。这是一个原子操作,它能够确保在多个线程同时执行这个自增操作时,不会出现数据竞争和不一致的情况。每个线程的自增操作都是独立且完整的,不会被其他线程中断或干扰。
  • atomicVariable.load();:在main函数中,使用atomicVariable.load()函数获取原子变量的当前值,并将其输出。load函数也是一个原子操作,它能够安全地读取原子变量的值,即使在其他线程正在对该变量进行写操作的情况下,也能保证读取到的是一个完整且正确的值。

7.4 应用场景

  • 计数器:在多线程程序中,经常需要对一些计数器进行自增或自减操作,如统计某个事件发生的次数、记录线程的执行次数等。使用原子操作来实现计数器的功能,可以避免使用互斥锁带来的性能开销,同时保证计数器的值在多线程环境下的正确性。
  • 共享标志位:对于一些简单的共享标志位,如表示某个资源是否可用、某个任务是否完成等,原子操作可以提供高效的读写操作,确保多个线程对标志位的访问和修改的正确性,而无需使用复杂的锁机制。

7.5 优缺点

  • 优点
    • 高性能:对于简单的共享变量操作,原子操作通常比使用锁机制具有更高的性能,因为它避免了线程上下文切换和锁的获取与释放开销,能够更高效地利用 CPU 资源,提高程序的并发性能。
    • 简单直接:原子操作的使用相对简单,程序员只需要将共享变量定义为std::atomic类型,并使用相应的原子操作函数(如++--loadstore等)即可实现对共享变量的原子访问和修改,不需要像使用锁那样编写复杂的同步代码逻辑。
  • 缺点
    • 功能有限:原子操作主要适用于对单个共享变量的简单操作,对于一些复杂的同步场景,如多个共享变量之间的关联操作、线程之间的复杂协作逻辑等,原子操作可能无法满足需求,此时仍然需要使用锁、条件变量等其他同步机制来实现更复杂的同步逻辑。
    • 硬件依赖:原子操作的实现依赖于硬件的支持,不同的硬件平台对原子操作的支持程度和性能表现可能会有所差异。在一些老旧的硬件平台上,原子操作的性能可能不如预期,甚至可能不支持某些原子操作,这就需要程序员在编写代码时考虑到硬件的兼容性问题。

8. 无锁编程(Lock-Free Programming)

8.1 什么是无锁编程

无锁编程是一种并发编程技术,它旨在避免使用传统的锁机制来实现线程之间的同步和共享资源的访问控制。相反,无锁编程依赖于原子操作、比较与交换(CAS,Compare-And-Swap)操作以及其他一些底层的硬件同步原语,来构建线程安全的数据结构和算法,使得多个线程能够在不互相阻塞的情况下并发地访问和修改共享数据,从而提高程序的并发性能和可伸缩性。

8.2 为什么使用无锁编程

随着多核处理器的普及,提高程序的并发性能变得越来越重要。传统的基于锁的同步机制在高并发场景下可能会出现性能瓶颈,因为线程在获取和释放锁的过程中会发生上下文切换和等待,导致 CPU 资源的浪费。而无锁编程通过避免使用锁,减少了线程之间的阻塞和等待时间,使得线程能够更充分地利用 CPU 资源,从而提高程序的整体性能和响应速度。此外,无锁编程还可以降低死锁和优先级反转等与锁相关的问题的发生概率,提高程序的稳定性和可靠性。

8.3 如何使用无锁编程

8.3.1 代码示例

以下是一个简单的无锁队列的实现示例:

无锁队列的并发性
  1. 多个线程同时入队
    • 多个线程可以同时调用 enqueue 方法,向队列尾部插入新节点。
    • 通过原子操作(如 compare_exchange_weak)确保只有一个线程能够成功更新尾节点。
  2. 多个线程同时出队
    • 多个线程可以同时调用 dequeue 方法,从队列头部移除节点。
    • 通过原子操作确保只有一个线程能够成功更新头节点。
  3. 入队和出队同时进行
    • 一个线程可以调用 enqueue 插入数据,同时另一个线程调用 dequeue 移除数据。
    • 无锁队列的设计确保入队和出队操作不会相互阻塞。
#include <iostream>
#include <atomic>
#include <memory>
#include <thread>
#include <vector>

template<typename T>
class LockFreeQueue {
private:
    struct Node {
        std::shared_ptr<T> data; // 存储数据
        std::atomic<Node*> next; // 指向下一个节点的原子指针

        Node(T const& value) : data(std::make_shared<T>(value)), next(nullptr) {}
    };

    std::atomic<Node*> head; // 队列头指针
    std::atomic<Node*> tail; // 队列尾指针

public:
    LockFreeQueue() {
        Node* dummy = new Node(T()); // 创建一个虚拟节点
        head.store(dummy);
        tail.store(dummy);
    }

    ~LockFreeQueue() {
        while (Node* old_head = head.load()) {
            head.store(old_head->next);
            delete old_head;
        }
    }

    void enqueue(T const& value) {
        Node* new_node = new Node(value); // 创建新节点
        Node* old_tail = tail.load();     // 获取当前尾节点
        Node* null_node = nullptr;

        while (true) {
            Node* old_next = old_tail->next.load(); // 获取尾节点的下一个节点

            if (old_next == nullptr) {
                // 尝试将新节点插入到尾部
                if (old_tail->next.compare_exchange_weak(null_node, new_node)) {
                    // 成功插入,尝试更新尾指针
                    tail.compare_exchange_weak(old_tail, new_node);
                    break;
                }
            } else {
                // 尾节点落后了,帮助更新尾指针
                tail.compare_exchange_weak(old_tail, old_next);
            }
        }
    }

    std::shared_ptr<T> dequeue() {
        Node* old_head = head.load(); // 获取当前头节点
        Node* old_tail = tail.load(); // 获取当前尾节点
        Node* next_node;

        while (true) {
            if (old_head == old_tail) {
                // 队列为空
                return std::shared_ptr<T>();
            }

            next_node = old_head->next.load(); // 获取头节点的下一个节点

            // 尝试将头指针移动到下一个节点
            if (head.compare_exchange_weak(old_head, next_node)) {
                std::shared_ptr<T> res = next_node->data; // 获取数据
                delete old_head; // 删除旧的头节点
                return res;
            }
        }
    }
};

// 测试函数
void testQueue(LockFreeQueue<int>& queue, int id) {
    for (int i = 0; i < 10; ++i) {
        queue.enqueue(id * 10 + i); // 每个线程插入 10 个元素
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟耗时操作
    }

    for (int i = 0; i < 10; ++i) {
        auto item = queue.dequeue(); // 每个线程尝试出队
        if (item) {
            std::cout << "Thread " << id << " dequeued: " << *item << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟耗时操作
    }
}

int main() {
    LockFreeQueue<int> queue;

    // 创建多个线程测试队列
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(testQueue, std::ref(queue), i + 1);
    }

    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

8.3.2 代码解析

1. Node 结构体
struct Node {
    std::shared_ptr<T> data; // 存储数据
    std::atomic<Node*> next; // 指向下一个节点的原子指针

    Node(T const& value) : data(std::make_shared<T>(value)), next(nullptr) {}
};
  • data

    • 使用 std::shared_ptr<T> 存储队列中的元素。
    • std::shared_ptr 是智能指针,可以自动管理内存,避免内存泄漏。
    • 使用 std::make_shared 创建 shared_ptr,效率更高。
  • next

    • 使用 std::atomic<Node*> 表示指向下一个节点的指针。
    • std::atomic 确保对指针的操作是原子的,避免多线程竞争导致的数据不一致。
  • 构造函数

    • 初始化 datanextnext 默认为 nullptr

2. LockFreeQueue 类
成员变量
std::atomic<Node*> head; // 队列头指针
std::atomic<Node*> tail; // 队列尾指针
  • head

    • 指向队列的头部节点(虚拟节点或实际节点)。
    • 使用 std::atomic 确保多线程操作的安全性。
  • tail

    • 指向队列的尾部节点。
    • 使用 std::atomic 确保多线程操作的安全性。
构造函数
LockFreeQueue() {
    Node* dummy = new Node(T()); // 创建一个虚拟节点
    head.store(dummy);
    tail.store(dummy);
}
  • 虚拟节点
    • 在队列初始化时创建一个虚拟节点(dummy node)。
    • 虚拟节点的 data 是默认值,nextnullptr
    • 虚拟节点的作用是简化队列的边界条件处理,避免在队列为空时对 headtail 的特殊处理。
析构函数
~LockFreeQueue() {
    while (Node* old_head = head.load()) {
        head.store(old_head->next);
        delete old_head;
    }
}
  • 释放内存
    • 遍历队列,逐个删除节点,释放内存。
    • 使用 head.load() 获取当前头节点,head.store() 更新头节点。

3. 入队操作 (enqueue)
void enqueue(T const& value) {
    Node* new_node = new Node(value); // 创建新节点
    Node* old_tail = tail.load();     // 获取当前尾节点
    Node* null_node = nullptr;

    while (true) {
        Node* old_next = old_tail->next.load(); // 获取尾节点的下一个节点

        if (old_next == nullptr) {
            // 尝试将新节点插入到尾部
            if (old_tail->next.compare_exchange_weak(null_node, new_node)) {
                // 成功插入,尝试更新尾指针
                tail.compare_exchange_weak(old_tail, new_node);
                break;
            }
        } else {
            // 尾节点落后了,帮助更新尾指针
            tail.compare_exchange_weak(old_tail, old_next);
        }
    }
}
  • 步骤

    1. 创建一个新节点 new_node,用于存储要插入的数据。
    2. 获取当前尾节点 old_tail
    3. 进入循环,尝试将新节点插入到队列尾部:
      • 检查 old_tail->next 是否为 nullptr(即尾节点是否为最后一个节点)。
      • 如果是,使用 compare_exchange_weak 尝试将 old_tail->nextnullptr 更新为 new_node
      • 如果成功,尝试更新 tail 指针。
    4. 如果 old_tail->next 不是 nullptr,说明其他线程已经更新了尾节点,当前线程需要帮助更新 tail 指针。
  • compare_exchange_weak

    • 原子操作,比较并交换。
    • 如果当前值与期望值相等,则更新为新值;否则,不更新。
    • weak 版本允许虚假失败(spurious failure),但性能更高。

4. 出队操作 (dequeue)
std::shared_ptr<T> dequeue() {
    Node* old_head = head.load(); // 获取当前头节点
    Node* old_tail = tail.load(); // 获取当前尾节点
    Node* next_node;

    while (true) {
        if (old_head == old_tail) {
            // 队列为空
            return std::shared_ptr<T>();
        }

        next_node = old_head->next.load(); // 获取头节点的下一个节点

        // 尝试将头指针移动到下一个节点
        if (head.compare_exchange_weak(old_head, next_node)) {
            std::shared_ptr<T> res = next_node->data; // 获取数据
            delete old_head; // 删除旧的头节点
            return res;
        }
    }
}
  • 步骤
    1. 获取当前头节点 old_head 和尾节点 old_tail
    2. 检查队列是否为空:
      • 如果 old_head == old_tail,说明队列为空,返回空指针。
    3. 获取头节点的下一个节点 next_node
    4. 使用 compare_exchange_weak 尝试将 headold_head 更新为 next_node
    5. 如果成功,返回 next_node 的数据,并删除 old_head

5. 多线程测试
void testQueue(LockFreeQueue<int>& queue, int id) {
    for (int i = 0; i < 10; ++i) {
        queue.enqueue(id * 10 + i); // 每个线程插入 10 个元素
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟耗时操作
    }

    for (int i = 0; i < 10; ++i) {
        auto item = queue.dequeue(); // 每个线程尝试出队
        if (item) {
            std::cout << "Thread " << id << " dequeued: " << *item << std::endl;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟耗时操作
    }
}
  • 功能
    • 每个线程执行 10 次入队和 10 次出队操作。
    • 使用 std::this_thread::sleep_for 模拟耗时操作,增加并发竞争的可能性。

6. 主函数 (main)
int main() {
    LockFreeQueue<int> queue;

    // 创建多个线程测试队列
    std::vector<std::thread> threads;
    for (int i = 0; i < 5; ++i) {
        threads.emplace_back(testQueue, std::ref(queue), i + 1);
    }

    // 等待所有线程完成
    for (auto& t : threads) {
        t.join();
    }

    return 0;
}
  • 步骤
    1. 创建一个 LockFreeQueue 对象。
    2. 创建 5 个线程,每个线程调用 testQueue 函数测试队列。
    3. 使用 join 等待所有线程完成。

总结
  • 无锁队列的核心:通过原子操作(如 compare_exchange_weak)实现线程安全的入队和出队操作。
  • 虚拟节点:简化边界条件处理,避免队列为空时的特殊逻辑。
  • 多线程测试:验证队列在并发环境下的正确性和性能。

8.4 应用场景

  • 高性能并发数据结构:如无锁队列、无锁哈希表、无锁栈等,在多线程环境下需要频繁地进行数据的插入、删除和查找操作时,无锁数据结构能够提供更高的并发性能,减少线程之间的阻塞和等待时间,适用于高并发的服务器程序、数据库系统、操作系统内核等场景。
  • 实时系统:在对响应时间要求极高的实时系统中,如航空航天控制系统、工业自动化控制系统等,避免锁的使用可以降低线程的延迟和不确定性,确保系统能够及时响应外部事件和请求,提高系统的可靠性和稳定性。

8.5 优缺点

  • 优点
    • 高并发性能:无锁编程能够充分利用多核处理器的并行性,避免了线程在获取和释放锁过程中的阻塞和等待,从而大大提高了程序的并发性能和吞吐量,尤其适用于处理大量并发任务的场景。
    • 避免死锁和优先级反转:由于不使用锁,也就不存在死锁和优先级反转等与锁相关的问题,使得程序的行为更加可预测和稳定,减少了调试和维护的难度。
  • 缺点
    • 编程难度高:无锁编程需要深入理解底层的硬件同步原语(如原子操作、CAS 操作等)和内存模型,并且需要精心设计数据结构和算法,以确保在多线程环境下的正确性和高效性。编写无锁代码的难度较大,容易出现错误,且错误往往难以调试和发现。
    • 内存开销较大:无锁数据结构通常需要使用一些额外的内存来维护数据的一致性和完整性,例如在无锁队列中使用的原子指针和额外的节点标记等,这可能会导致内存开销的增加,在内存资源有限的环境下可能会受到限制。

9. 综合比较与选择建议

9.1 各种同步方式的比较

同步方式 基本原理 优点 缺点 应用场景 示例代码关键操作
锁(Mutex) 通过互斥锁保证同一时刻只有一个线程能进入临界区访问共享资源 简单易用,互斥性强,能有效保护共享资源的完整性和一致性 可能导致死锁;获取和释放锁存在性能开销,在高并发下可能影响性能 对共享数据的读写操作;保护临界区代码 std::mutex mutex;
std::lock_guard<std::mutex> guard(mutex);(或std::unique_lock
条件变量(Condition Variable) 与互斥锁配合,允许线程在条件不满足时等待,直到其他线程通知条件满足 避免线程忙等待,提高资源利用率;可根据条件精确唤醒线程 需与互斥锁配合使用,增加代码复杂性;存在虚假唤醒情况,需在等待循环中再次检查条件 生产者 - 消费者模型;任务调度中线程等待特定条件完成 std::condition_variable cv;
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [] { return condition; });
cv.notify_one();(或cv.notify_all();
信号量(Semaphore) 本质是计数器,控制可同时访问共享资源的线程数量 灵活控制资源访问,可根据实际设置初始值;简单高效 可能导致死锁;对于复杂同步逻辑支持不足,可能使代码复杂难以维护 资源池管理(如数据库连接池、线程池);任务同步中控制并发任务数量 sem_t semaphore;
sem_init(&semaphore, 0, initial_value);
sem_wait(&semaphore);
sem_post(&semaphore);
sem_destroy(&semaphore);
事件(Event) 线程可等待其他线程触发的特定事件信号,事件有已触发和未触发两种状态 实现线程间异步通信,减少无效等待时间,提高性能和响应性;可灵活设置触发条件和等待方式 通常需与其他同步机制配合;存在虚假唤醒可能,需再次检查事件条件 线程间异步通信(如网络编程中数据接收完成通知其他线程);任务依赖关系中协调任务执行顺序 std::mutex mutex;
std::condition_variable cv;
bool eventFlag = false;
cv.wait(lock, [] { return eventFlag; });
eventFlag = true;
cv.notify_one();
屏障(Barrier) 一组线程在特定点相互等待,直到所有线程到达该点后一起继续执行后续操作 精确同步一组线程执行进度,保证所有线程处于相同阶段后再继续,简单易用 性能开销较大,尤其在频繁同步大量线程时;灵活性受限,功能相对单一 并行计算中协调多个线程计算阶段(如矩阵乘法、图像处理);游戏开发中关键节点同步多个游戏逻辑线程 std::barrier barrier(num_threads);
barrier.arrive_and_wait();
原子操作(Atomic Operations) 操作过程不可被其他线程中断,保证对共享变量操作的原子性 高性能,对于简单共享变量操作比锁机制开销小;简单直接,使用方便 功能有限,适用于简单共享变量操作,复杂同步场景难以满足;依赖硬件支持,不同硬件平台表现可能有差异 计数器(如统计事件发生次数);共享标志位(如资源是否可用) std::atomic<type> atomicVariable;
atomicVariable++;(或其他原子操作函数如loadstore等)
无锁编程(Lock-Free Programming) 依赖原子操作、CAS 等底层硬件同步原语,避免使用传统锁机制实现线程同步和共享资源访问控制 高并发性能,充分利用多核处理器并行性;避免死锁和优先级反转问题 编程难度高,需要深入理解底层硬件知识,代码易出错且难以调试;内存开销较大,通常需要额外内存维护数据一致性 高性能并发数据结构(如无锁队列、哈希表、栈);对响应时间要求极高的实时系统 std::atomic<std::shared_ptr<Node<T>>> head;
std::atomic<std::shared_ptr<Node<T>>> tail;
prevTail->next.compare_exchange_weak(next, newNode);(以无锁队列为例)
  • 锁(Mutex)
    • 互斥性强,能有效保护共享资源的访问,但可能导致死锁,且性能开销相对较大,尤其是在高并发场景下频繁获取和释放锁时。
    • 使用简单,适用于大多数简单的共享资源保护场景,但对于复杂的同步逻辑可能不够灵活。
  • 条件变量(Condition Variable)
    • 能够实现线程的高效等待和唤醒,避免忙等待,提高系统资源利用率,但需要与互斥锁配合使用,代码复杂度增加。
    • 适用于线程需要等待特定条件发生的场景,如生产者 - 消费者模型,但存在虚假唤醒的问题,需要在使用时注意条件的检查。
  • 信号量(Semaphore)
    • 可以灵活控制对共享资源的访问数量,简单高效,但也可能导致死锁,且对于复杂的同步逻辑支持不足。
    • 常用于资源池管理和任务同步场景,能有效地协调多个线程对有限资源的访问。
  • 事件(Event)
    • 实现线程间的异步通信和同步,能够提高系统的响应速度,但需要与其他同步机制配合,存在虚假唤醒的可能性。
    • 适用于线程需要等待外部事件发生的场景,如网络编程中的数据接收完成通知等。
  • 屏障(Barrier)
    • 能够精确同步一组线程的执行进度,确保所有线程在特定点上同步,但性能开销较大,灵活性受限。
    • 主要用于并行计算和多线程协作中需要所有线程同步推进的场景,如大规模数据并行处理。
  • 原子操作(Atomic Operations)
    • 提供了对共享变量的轻量级原子访问和修改,性能较高,但功能相对有限,依赖硬件支持。
    • 适用于简单的共享变量操作,如计数器、标志位等,在对性能要求较高且同步逻辑简单的场景中表现出色。
  • 无锁编程(Lock-Free Programming)
    • 具有极高的并发性能,避免了锁相关的问题,但编程难度大,内存开销相对较高,且代码的正确性难以保证和调试。
    • 适用于对并发性能要求极高且能够承担较高开发和维护成本的场景,如高性能服务器和实时系统中的关键数据结构和算法。

9.2 如何选择合适的同步方式

  • 考虑并发场景的复杂度:如果是简单的共享资源访问,如单个变量的读写,原子操作可能是一个较好的选择;如果涉及多个线程对多个共享资源的复杂协作,可能需要使用锁、条件变量或其他更复杂的同步机制的组合。
  • 性能要求:对于性能要求极高且对资源竞争不激烈的场景,可以考虑无锁编程;对于一般性能要求的场景,根据具体情况选择合适的同步方式,如在高并发但资源竞争相对较小的情况下,信号量或原子操作可能更合适;在需要精确控制线程执行顺序和避免忙等待的场景,条件变量或事件可能更优。
  • 资源限制:如果系统的内存资源有限,需要注意无锁编程可能带来的较大内存开销;如果 CPU 资源紧张,频繁的锁获取和释放操作可能会影响性能,需要考虑使用更轻量级的同步方式,如原子操作或优化后的锁机制。

9.3 实际项目中的应用案例

  • 在一个多线程的网络服务器程序中,对于多个线程同时访问的连接池,可以使用信号量来控制同时使用连接的线程数量,确保连接资源的合理利用和有效管理。当一个线程获取到连接后,信号量的值减 1;当线程使用完连接并释放时,信号量的值加 1,通知其他等待的线程可以获取连接。
  • 在一个图像处理程序中,多个线程分别负责图像的不同区域的处理,在所有区域处理完成后需要进行合并操作。可以使用屏障来同步各个线程的执行进度,确保所有线程都完成了各自的处理任务后,再进行合并操作,保证图像数据的一致性和完整性。
  • 在一个实时金融交易系统中,对于交易数据的计数和状态标志的更新,可以使用原子操作来保证数据的准确性和一致性,同时避免使用锁带来的性能开销,确保系统能够快速响应交易请求和实时更新交易数据。

总之,在实际的多线程编程中,需要根据具体的应用场景、性能要求和资源限制等因素,综合考虑并选择合适的线程同步方式,以确保程序的正确性、高效性和稳定性。同时,不断优化和改进同步策略,也是提高多线程程序性能和质量的关键。