信号同步机制封装
Lock类
信号量——sem类
信号量是一种特殊的变量,它只能取自然数并且只支持两种操作,P(wait)和V(signal)
P 操作会使得信号量的值减 1,如果此时信号量的值小于 0,则调用进程或线程会被阻塞,等待其他进程或线程对信号量进行 V 操作,使得信号量的值大于 0,此时阻塞的进程或线程才能继续执行
V 操作会使得信号量的值加 1,如果此时信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个进程或线程
信号量的取值可以是任何自然数,根据初始值的不同可以分为两类:
二进制信号量:指初始值为 1 的信号量,此类信号量只有 1 和 0 两个值,通常用来替代互斥锁实现线程同步
计数信号量:指初始值大于 1 的信号量,当进程中存在多个线程,但某公共资源允许同时访问的线程数量是有限的,这时就可以用计数信号量来限制同时访问资源的线程数量
根据使用场景的不同,信号量也可以分为两类:
- 无名信号量:也被称作基于内存的信号量,只可以在共享内存的情况下,比如实现进程中各个线程之间的互斥和同步
- 命名信号量:通常用于不共享内存的情况下,比如进程间通信
在本项目中主要实现的是线程同步,只需要使用无名信号量,主要使用以下几个函数
初始化信号量sem_init()
include <semaphore.h> int sem_init(sem_t *sem,int pshared,unsignedint value) //初始化一个信号量 sem:指向要初始化的信号量的指针 pshared:指定信号量的共享方式。如果值为 0,则信号量将在进程内部共享。如果值为非 0,则信号量可以在不同进程之间共享,需要使用共享内存 value:指定信号量的初值 return:成功则返回0,否则返回-1
销毁信号量sem_destory()
include <semaphore.h> int sem_destroy(sem_t *sem) //销毁一个信号量 sem:指向要销毁的信号量的指针 return:成功返回0,否则返回-1
对信号量进行P操作sem_wait()
include <semaphore.h> int sem_wait(sem_t *sem) //对信号量进行 P 操作,如果信号量的值小于等于 0,则会阻塞当前线程 sem:指向要操作的信号量的指针 return:成功返回0,否则返回-1
对信号进行V操作sem_post()
include <semaphore.h> int sem_post(sem_t *sem) //对信号量进行 V 操作,如果信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个线程 sem:指向要操作的信号量的指针 return:成功返回0,否则返回-1
互斥锁——locker类
互斥锁,也称互斥量,可以保护关键代码段,以确保独占式访问.当进入关键代码段,获得互斥锁将其加锁;离开关键代码段,唤醒等待该互斥锁的线程
互斥量是一种用于保护临界区的同步机制,可以确保同一时刻只有一个线程访问共享资源。当一个线程访问共享资源时,需要先获取互斥量的锁,其他线程需要等待该锁释放才能继续执行
互斥量不是为了消除竞争,实际上,资源还是共享的,线程间也还是竞争的,只不过通过这种“锁”机制就将共享资源的访问变成互斥操作,也就是说一个线程操作这个资源时,其它线程无法操作它,从而消除与时间有关的错误。但是,这种锁机制不是强制的,互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制
因此,即使有了mutex,其它线程如果不按照这种锁机制来访问共享数据的话,依然会造成数据混乱。所以为了避免这种情况,所有访问该共享资源的线程必须采用相同的锁机制
初始化互斥量pthread_mutex_init()
include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) //初始化互斥量 mutex:指向要初始化的互斥量的指针 attr:指向互斥量属性对象的指针,通常设置为 NULL return:成功则返回0,否则返回一个正整数的错误码
销毁互斥量pthread_mutex_destroy()
include <pthread.h> int pthread_mutex_destroy(pthread_mutex_t *mutex) //销毁互斥量 mutex:指向要销毁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
给互斥锁加锁pthread_mutex_lock()
include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mutex) //给互斥量加锁,如果互斥量已经被锁住,则阻塞当前线程,直到互斥量被解锁 mutex:指向要加锁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
解锁互斥量pthread_mutex_unlock()
include <pthread.h> int pthread_mutex_unlock(pthread_mutex_t *mutex) //解锁互斥量,如果有等待该互斥量的线程,则唤醒其中的一个线程 mutex:指向要加锁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
条件变量——cond类
条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程
条件变量利用线程间共享的全局变量进行同步,主要包括两个动作:一个线程等待条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起
初始化条件变量pthread_cond_init()
include <pthread.h> int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) //初始化条件变量对象,设置相关属性 cond:指向条件变量对象的指针 attr:指向线程条件属性对象的指针。一般为 NULL return:成功则返回0,失败返回错误号
销毁条件变量对象pthread_cond_destory()
include <pthread.h> int pthread_cond_destroy(pthread_cond_t *cond) //销毁条件变量对象,释放资源 cond:指向条件变量对象的指针 return:成功则返回0,失败返回错误号
唤醒线程pthread_cond_broadcast()
函数以广播的方式唤醒所有等待目标条件变量的线程
include <pthread.h> int pthread_cond_broadcast(pthread_cond_t *cond) //唤醒所有在条件变量上等待的线程 cond:指向条件变量对象的指针 return:成功则返回0,失败返回错误号
阻塞线程pthread_cond_wait()
函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,互斥锁会再次被锁上. 也就是说函数内部会有一次解锁和加锁操作
include <pthread.h> int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) //让当前线程阻塞在条件变量上等待唤醒 cond:指向条件变量对象的指针 mutex:指向互斥锁对象的指针,用于保护条件变量 return:成功则返回0,失败返回错误号
发送信号通知线程pthread_cond_signal()
pthread_cond_signal 只会通知一个等待该条件变量的线程,如果有多个线程在等待,则只有一个线程会收到通知,其余线程还会继续等待,直到下一次收到信号
必须在已经获得与条件变量相关的互斥锁之后才能调用该函数
如果没有等待该条件变量的线程,调用该函数也不会产生任何作用
include <pthread.h> int pthread_cond_signal(pthread_cond_t *cond) cond:指向条件变量的指针 return:成功则返回0,失败返回错误号
锁机制的功能
锁机制用来实现多线程同步,通过锁机制,确保任一时刻只能有一个线程能进入关键代码段
为便于实现同步类的RAII机制(“Resource Acquisition is Initialization”直译过来是“资源获取即初始化”),该项目在pthread库的基础上进行了封装,实现了类似于C++11的mutex标准库功能
————————————————
源码
/****************locker.h*******************/ #ifndef LOCKER_H #define LOCKER_H #include <exception> #include <semaphore.h> #include <pthread.h> class sem { public: sem(int value = 0) { if (sem_init(&m_sem, 0, value) != 0) { throw std::exception(); } } ~sem() { sem_destroy(&m_sem); } bool wait() { return sem_wait(&m_sem) == 0; } bool post() { return sem_post(&m_sem) == 0; } private: sem_t m_sem; }; class locker { public: locker() { if (pthread_mutex_init(&m_mutex, NULL) != 0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy(&m_mutex); } bool lock() { return pthread_mutex_lock(&m_mutex); } bool unlock() { return pthread_mutex_unlock(&m_mutex); } pthread_mutex_t* get() { return &m_mutex; } private: pthread_mutex_t m_mutex; }; class cond { public: cond() { if (pthread_cond_init(&m_cond, NULL) != 0) { throw std::exception(); } } ~cond() { pthread_cond_destroy(&m_cond); } bool wait(pthread_mutex_t* m_mutex) { int ret = pthread_cond_wait(&m_cond, m_mutex); return ret == 0; } bool timewait(pthread_mutex_t* m_mutex, struct timespec t) { return pthread_cond_timedwait(&m_cond, m_mutex, &t); } bool signal() { return pthread_cond_signal(&m_cond) == 0; } bool broadcast() { return pthread_cond_broadcast(&m_cond) == 0; } private: pthread_cond_t m_cond; }; #endif /*测试locker类代码 // 生产者线程函数 void* producer(void* arg) { block_queue<int>* queue = static_cast<block_queue<int>*>(arg); for (int item = 0; item < 20; item++) { queue->push(item); LOG_INFO("生产者产生数据i = %d", item); usleep((rand() % 5 + 1) * 50000); } return nullptr; } // 消费者线程函数 void* consumer(void* arg) { block_queue<int>* queue = static_cast<block_queue<int>*>(arg); for (int item = 0; item < 20; item++) { queue->pop(item); LOG_INFO("消费者处理数据i = %d", item); } return nullptr; } int main() { block_queue<int> queue; pthread_t tid_prod, tid_cons; pthread_create(&tid_prod, nullptr, producer, &queue); pthread_create(&tid_cons, nullptr, consumer, &queue); pthread_join(tid_prod, nullptr); pthread_join(tid_cons, nullptr); return 0; } */