文章目录
同步问题
同步问题是保证数据安全的情况下,使多线程在访问同一资源时具有一定顺序性;
#define NUM 5 int g_val = 700; class threadData { public: threadData(const int number, pthread_mutex_t *mutex) : lock_(mutex) { threadname_ = "Thread_" + to_string(number); } public: string threadname_; pthread_t tid_; pthread_mutex_t *lock_; // 在类中定义一个互斥锁对象类型指针用于接收在主线程中实例化的锁 }; void *threadRoutine(void *args) { threadData *td = static_cast<threadData *>(args); td->tid_ = pthread_self(); while (true) { pthread_mutex_lock(td->lock_); // 锁定互斥锁对象 if (g_val > 0) { usleep(100); printf("I am %s , the g_val = %d\n", td->threadname_.c_str(), g_val); g_val--; pthread_mutex_unlock(td->lock_); // 解锁互斥锁对象 } else { pthread_mutex_unlock(td->lock_); // 解锁互斥锁对象 /* 当一个线程锁定了一个锁时必须经过 if 或者 else 两个选项之一 为了避免带锁的线程未在 else 处解锁而退出所导致死锁问题 应在 if else 两处都进行解锁 */ break; } } delete td; // 线程退出时释放描述自身基本属性的结构体对象 return nullptr; } int main() { vector<pthread_t> tids; pthread_mutex_t lock; // 定义一个互斥锁对象 pthread_mutex_init(&lock, nullptr); // 初始化该互斥锁对象 for (size_t i = 0; i < NUM; ++i) { pthread_t tid; threadData *td = new threadData(i, &lock); pthread_create(&tid, nullptr, threadRoutine, td); // (传入互斥锁对象的指针)利用 new 实例化一个用来维护线程的结构体对象 // 并将该实例化的对象传给线程作为参数 tids.push_back(tid); } for (size_t i = 0; i < tids.size(); ++i) { pthread_join(tids[i], nullptr); } pthread_mutex_destroy(&lock); // 销毁互斥锁对象 return 0; }
这段代码创建了5
个线程并访问共享资源g_val
,使用了互斥锁保证了临界资源永远是多个线程串型访问从而保证该共享资源的安全性;
当一个线程解锁互斥锁对象时其他线程才会被唤醒并且申请锁向下执行;
这里出现了一个小问题,当持有互斥锁对象的线程解锁该互斥锁时所有的线程都会试图去锁定该互斥锁对象,而互斥锁只有一个,且每个线程获取锁的能力都不同;
最终导致其余所有未锁定互斥锁对象的线程无效唤醒,表明该程序中的线程在访问资源时不具有顺序性,也可以说在当前程序中线程是不同步的;
条件变量
条件变量是一种线程同步机制;
用于多线程中协调线程之间的执行顺序,允许线程在某个条件满足之前进行等待,并在条件满足时被唤醒从而实现线程间的协调;
线程的前提是 “保证数据安全的情况下” 所以条件变量的使用必须以使用锁为前提,这表示条件变量通常与互斥锁配合使用以确保对共享资源的互斥访问和条件变量的同步操作;
)
POSIX线程库提供了一系列的条件变量的接口,用于线程之间的同步共享数据时协调线程的执行顺序;
其允许线程在某个条件满足前进入等待状态并在条件满足时被唤醒从而避免线程等待;
PROLOG This manual page is part of the POSIX Programmer's Manual. The Linux implementation of this interface may differ (consult the corresponding Linux manual page for details of Linux behavior), or the interface may not be implemented on Linux. NAME pthread_cond_destroy, pthread_cond_init - destroy and initialize condition variables SYNOPSIS #include <pthread.h> int pthread_cond_destroy(pthread_cond_t *cond); int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); pthread_cond_t cond = PTHREAD_COND_INITIALIZER; RETURN VALUE If successful, the pthread_cond_destroy() and pthread_cond_init() functions shall return zero; otherwise, an error number shall be returned to indicate the error. The [EBUSY] and [EINVAL] error checks, if implemented, shall act as if they were performed immediately at the beginning of processing for the function and caused an error return prior to modifying the state of the condition variable specified by cond.
这一系列接口用于条件变量的初始化,销毁,全局定义等;
pthread_cond_t
类型与互斥锁相同,在使用条件变量前也需要使用该类型定义一个该类型的对象;
该类型是一个自定义类型,用于线程库维护管理
pthread
线程库的条件变量的;pthread_cond_init()
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
该函数用于初始化一个条件变量对象,函数调用成功时返回
0
,调用失败时返回一个错误码;pthread_cond_t *restrict cond
该参数用于指向要初始化的条件变量的指针;
const pthread_condattr_t *restrict attr
该参数为初始化的条件变量的属性,若是传递
nullptr
表示使用默认属性;
pthread_cond_destroy()
int pthread_cond_destroy(pthread_cond_t *cond);
该函数用于销毁一个条件变量对象并释放相关资源,函数调用成功时返回
0
,调用失败返回一个错误码;pthread_cond_t *cond
传入一个指针指向要销毁的条件变量的指针;
PTHREAD_COND_INITIALIZER
宏该宏用于全局定义一个条件变量,使用该宏定义的条件变量可不使用
pthread_cond_init()
与pthread_cond_destroy()
进行初始化与销毁,当结束其将会被操作系统回收;可能出现的错误码
当函数调用失败时将会返回一个错误码,可能出现的错误码为:
EBUSY
表示条件变量正在被使用(有线程正在等待该条件变量);
EINVAL
表示传入的条件变量指针无效;
ENOMEM
系统内存不足,无法分配资源;
当条件变量被创建与初始化后需要使用对应的接口使线程能通过条件变量进行等待与被唤醒;
等待条件满足
通常等待条件满足使用
pthread_cond_wait()
函数使线程在未满足条件时进行等待;int pthread_cond_wait(pthread_cond_t *restrict cond , pthread_mutex_t *restrict mutex);
pthread_cond_t *restrict cond
该参数为指向条件变量的指针;
pthread_mutex_t *resstrict mutex
该参数为指向互斥锁的指针,通常情况下载使用条件变量时将线程加入阻塞队列前会通过该参数将该线程当前持有的该锁进行解除;
避免线程在持有锁的情况下被阻塞从而导致死锁;
函数调用成功返回
0
,调用失败返回错误码;可能的错误码为:
EINVAL
传入的条件或互斥锁指针无效;
EPERM
当前线程没有持有互斥锁;
唤醒线程
唤醒条件变量中等待队列的线程通常使用以下两个函数:
int pthread_cond_broadcast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);
pthread_cond_broadcast()
唤醒所有等待指定条件变量的线程;
所有等待在该条件变量上的线程将被移除等待队列并竞争重新获得互斥锁;
pthread_cond_signal()
唤醒一个等待指定条件变量的线程;
若是多个线程在等待这个条件变量时具体唤醒哪个线程是不确定的,由系统决定,但一般是第一个;
两个函数的参数
pthread_cond_t *cond
为指向条件变量的指针;该函数调用成功时返回
0
,调用失败时返回一个EINVAL
的错误码用于表明传入的条件变量指针无效;
条件变量的使用
#include <pthread.h> #include <stdio.h> #include <unistd.h> #define NUM 5 // 定义线程数量 // 初始化互斥锁和条件变量 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int cnt = 0; // 共享变量,用于计数 // 线程函数 void *Count(void *args) { uint64_t number = (uint64_t)args; pthread_detach(pthread_self()); // 将线程分离,自动回收资源 printf("The thread %lu running...\n", number); while (true) { pthread_mutex_lock(&mutex); // 获取互斥锁,进入临界区 // 在获取锁后,调用pthread_cond_wait,进入条件变量的等待队列 pthread_cond_wait(&cond, &mutex); // 被唤醒后,重新获取互斥锁,执行以下代码 printf("I am thread-%lu , the cnt is %d\n", number, cnt++); pthread_mutex_unlock(&mutex); // 释放互斥锁,离开临界区 } return nullptr; } int main() { // 创建多个线程 for (uint64_t i = 0; i < NUM; ++i) { pthread_t tid; // 创建线程并传递其编号 pthread_create(&tid, nullptr, Count, (void *)i); usleep(100); // 确保每个线程都有时间启动 } while (true) { pthread_cond_signal(&cond); // 主线程每秒唤醒一个等待的线程 sleep(1); // 暂停1秒,避免唤醒速度过快 } return 0; }
该程序为计算全局变量cnt
被线程访问的次数;
中在全局中定义了一把锁与一个条件变量;
创建了5
个线程,线程在运行时将自己设置为分离状态使线程结束后资源自动回收;
线程进入一个无限循环,每次循环进行获取互斥锁,调用pthread_cond_wait(&cond, &mutex)
使其进入等待队列同时释放互斥锁;
主函数则是进入一个无限循环,每隔一秒调用一次pthread_cond_signal(&cond)
向条件变量cond
发出信号唤醒一个等待的线程;
pthread_cond_wait()
的使用位置在使用
pthread_cond_wait()
前必须为条件变量上锁,以防止在使用条件变量时多个线程产生竞态条件;本质上是为了确保条件检查和条件等待过程的原子性;
The pthread_cond_timedwait() and pthread_cond_wait() functions shall block on a condition variable. They shall be called with mutex locked by the calling thread or undefined behavior results. # 翻译 # pthread_cond_timedwait() 和 pthread_cond_wait() 函数会在条件变量上阻塞等待。它们必须在调用线程已经锁定了互斥量的情况下调用,否则会导致未定义的行为。
当一个具有锁的线程调用
pthread_cond_wait()
时该线程将会被加入进该条件变量的等待队列中,由于该线程持有锁,其他线程在调用pthread_mutex_lock()
时因为互斥锁对象已经被第一个线程锁定,所以进入阻塞无法与调用pthread_cond_wait()
函数的线程产生竞态条件;当持有锁的线程被载入至等待队列后将释放互斥锁,其他线程依次按照该方式顺序进入并等待被唤醒;
防止竞态条件
互斥锁确保只有一个线程可以进入临界区检查或修改条件变量关联的条件状态;
避免多个线程同时检查和修改条件而产生竞态条件;
条件检查的原子性
在进入等待状态前,线程可以安全地检查条件变量关联的条件状态;
如果条件不满足,线程会在保持互斥锁的情况下调用
pthread_cond_wait()
而后将自己放入等待队列并释放互斥锁;等待和重新加锁的原子操作
pthread_cond_wait()
的调用会自动释放互斥锁,并在等待队列中等待条件变量的信号;当条件变量发出信号唤醒线程时
pthread_cond_wait()
会使被唤醒的线程重新获取互斥锁,确保在继续执行时临界区的独占访问;
该程序的运行结果为:
$ ./mycond The thread 0 running... The thread 1 running... The thread 2 running... The thread 3 running... The thread 4 running... I am thread-0 , the cnt is 0 I am thread-1 , the cnt is 1 I am thread-2 , the cnt is 2 I am thread-3 , the cnt is 3 I am thread-4 , the cnt is 4 ... ...
条件变量的条件检查与线程唤醒
条件变量的条件检查一般是检查临界资源的状态,一般临界资源的状态为:
就绪
资源已经准备好,可以被使用;
未就绪
资源为准备好,需要等待;
针对不同的线程可能出现不同的临界资源状态;
可根据临界资源的不同状态决定线程的行为,以上文代码为基础进行修改;
#include <pthread.h> #include <unistd.h> #include <stdio.h> #include <stdint.h> #define NUM 5 // 定义线程数量 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 初始化互斥锁 pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 初始化条件变量 int cnt = 0; // 共享变量,用于计数 void *Count(void *args) { uint64_t number = (uint64_t)args; pthread_detach(pthread_self()); // 将线程设置为分离状态,不需要主线程显式回收 printf("The thread %lu running...\n", number); while (true) { pthread_mutex_lock(&mutex); // 获取互斥锁 // 当 cnt 为 0 或者 cnt 为奇数时,线程进入等待状态 while (cnt == 0 || cnt % 2 != 0) { pthread_cond_wait(&cond, &mutex); // 进入等待状态,等待条件变量的信号 } // 被唤醒后打印 cnt 值 printf("I am thread-%lu , the cnt is %d\n", number, cnt); pthread_mutex_unlock(&mutex); // 释放互斥锁 sleep(1); // 模拟处理工作,暂停1秒 } return nullptr; // 线程函数的返回值 } int main() { // 创建 NUM 个线程,并传递其编号作为参数 for (uint64_t i = 0; i < NUM; ++i) { pthread_t tid; pthread_create(&tid, nullptr, Count, (void *)i); // 创建线程 usleep(100); // 暂停100微秒,确保线程按序启动 } // 主线程进入无限循环,不断增加 cnt 并根据 cnt 的值发送信号唤醒线程 while (true) { pthread_mutex_lock(&mutex); // 获取互斥锁 ++cnt; // 增加 cnt 值 // 当 cnt 为偶数时,发送条件变量的信号,唤醒一个等待的线程 if (cnt % 2 == 0) { pthread_cond_signal(&cond); // 发送条件变量的信号 } pthread_mutex_unlock(&mutex); // 释放互斥锁 sleep(1); // 暂停1秒,模拟其他操作 } return 0; // 主函数的返回值 }
线程创建
主线程创建了
NUM
个线程,每个线程都执行Count
函数;通过
pthread_detach()
将线程设置为分离状态;工作线程逻辑
每个线程在进入循环后尝试获取互斥锁;
使用
while
进行条件判断,如果临界资源cnt
不满足条件则调用pthread_cond_wait()
进入等待队列;使用了
while
循环等待条件变量确保了线程在被唤醒后重新检查条件以避免了虚假唤醒的问题;一但被唤醒,打印线程号和临界资源值
cnt
并释放互斥锁;主线程逻辑
无限循环增加
cnt
,每次检查cnt
是否为偶数,如果为偶数则发送信号唤醒一个等待线程;释放互斥锁后等待
1s
模拟其他操作;
对应代码的运行结果为:
$ ./mycond The thread 0 running... The thread 1 running... The thread 2 running... The thread 3 running... The thread 4 running... I am thread-0 , the cnt is 2 I am thread-1 , the cnt is 4 I am thread-2 , the cnt is 6 I am thread-3 , the cnt is 8 I am thread-4 , the cnt is 10
条件变量的使用规范
等待条件代码
pthread_mutex_lock(&mutex); while (条件为假) pthread_cond_wait(cond,mutex); 修改条件 pthread_mutex_unlock(&mutex);
获取互斥锁:
pthread_mutex_lock(&mutex)
确保线程对共享资源的独占访问;循环检查条件:
while(条件为假)
使用while
而不是if
检查条件以方式虚假唤醒,即使被唤醒也会重新检查条件;等待条件变量信号:
pthread_cond_wait(&cpnd,&mutex)
在线程等待时释放互斥锁,允许其他线程修改条件,一但收到信号并被唤醒则重新获取互斥锁;修改条件: 一但条件满足,线程可以继续执行并修改条件;
释放互斥锁:
pthread_mutex_unlock(&mutex)
释放互斥锁,允许其他线程访问临界资源;给条件发送信号代码
pthread_mutex_lock(&mutex); 设置条件为真 pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);
获取互斥锁:
pthread_mutex_lock(&mutex)
确保对共享资源的独占访问;设置条件为真: 修改共享资源,满足等待线程所需的条件;
发出信号变量:
pthread_cond_signal(&cond)
同志一个等待线程条件已经满足,如果没有等待线程信号则会丢失(信号丢失);释放互斥锁:
pthread_mutex_unlock(&mutex)
释放互斥锁,允许其他线程访问临界资源;
生产者-消费者模型
生产者消费者模型是一种多线程设计模式,常见于解决多个生产者线程和多个消费者线程之间如何安全有效地共享数据;
)
该模型中存在三种关系,两个角色和一个交易场所;
两种角色分别为 消费者 与 生产者 ;
生产者
生产者用于生产数据或任务,并将其放入共享区域中;
消费者
消费者负责从共享区域中读取数据或任务并进行处理;
一个交易场所指的是一块特定结构的内存空间,该区域用于充当生产者和消费者之间的中介,用于暂存数据,其中该空间可以是有限的也可以是无限的;
三种关系分别为 生产者与生产者 , 消费者与消费者 , 生产者与消费者 ;
生产者与生产者
生产者之间必须是互斥关系;
多个生产者同时向共享空间写入数据时,需要互斥访问以避免共享空间状态的竞争和数据损坏;
可通过互斥锁确保同一时刻只能有一个生产者向共享空间中写入数据;
消费者与消费者
消费者之间必须是互斥关系;
多个消费者同时从缓冲区中读取数据时需要互斥访问以避免共享空间状态的竞争和数据损坏;
可通过互斥锁确保同一时刻只有一个消费者可以从共享空间读取数据;
生产者与消费者
生产者与消费者需要既存在互斥关系也存在同步关系;
互斥关系
生产者和消费者都需要互斥的访问共享空间以避免数据竞争和数据不一致;
通过互斥锁确保当一个线程(生产者或消费者)正在访问共享空间时其他线程不能同时访问;
同步关系
生产者和消费者需要在某些条件下等待对方的操作完成;
例如当共享空间中数据高与一定数量时生产者需要等待消费者消费数据,当共享空间内数据低于一个数量时消费者需要等待生产者生产数据;
需通过条件变量实现线程之间的同步,使生产者和消费者在需要等待时等待并在条件满足时被唤醒;
这种模型的设计的优点为:
支持忙闲不均
生产者和消费者可以以不同的速率进行工作,例如生产者写入数据或任务的速率大于消费者或者相反;
其中共享空间使得生产者和消费者的速率不必严格匹配从而增强了系统应对负载波动的能力;
对生产者和消费者进行解耦
解耦意味着生产者和消费者不需要直接相互依赖或协调,他们通过共享缓冲区间接相互交互;
不需要直接依赖对方的实现,是系统更加模块化和灵活,同时易于拓展和维护;