一、生产者消费者问题概述
生产者、消费者问题也被称作有限缓冲问题。可以描述为:两个或者更多的线程共享同一个缓冲区,其中一个或多个线程作为“生产者”会不断地向缓冲区中添加数据,另一个或者多个线程作为“消费者”从缓冲区中取走数据。生产者、消费者模型关注的是以下几点:
1、生产者和消费者必须互斥的使用缓冲区。
2、缓冲区空时,消费者不能读取数据。
3、缓冲区满时,生产者不能添加数据。
二、生产者消费者模型的优点
1、解耦:因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这样生产者和消费者的代码发生变化,都不会对对方产生影响。这样其实就是把生产者和消费者之间的强耦合解开,变成了生产者和缓冲区,消费者和缓冲区。
2、支持并发:如果消费者直接从生产者拿数据,则消费者需要等待生产者生产数据,同样生产者需要等待消费者消费数据。而有了生产者、消费者模型,生产者和消费者可以是两个独立的并发主体。生产者把制造出来的数据添加到添加到缓冲区,就可以再去生产下一个数据了。而消费者也是一样的,从缓冲区中读取数据,不需要等待生产者。这样,生产者和消费者就可以并发的执行。
3、支持忙闲不均:如果消费者直接从生产者这里拿数据,而生产者生产数据很慢,消费者消费数据很 快,或者生产者生产数据很多,消费者消费数据很慢。都会造成占用CPU的时间片白白浪费。生产 者/消费者模型中,生产者只需要将生产的数据添加到缓冲区,缓冲区满了就不生产了。消费者从 缓冲区中读取数据,缓冲区空了就不消费了,使得生产者/消费者的处理能力达到一个动态的平 衡。
三、生产者、消费者模型的实现
假定缓冲池中有N个缓冲区,一个缓冲区只能存储一个int类型的数据。定义互斥锁mutex实现对缓 冲区的互斥访问;计数信号量dempty用来表示空闲缓冲区的数量,其初值为N;计数信号量dfull用来表 示有数据的缓冲区的数量,其初值为0
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <assert.h> #include <pthread.h> #include <semaphore.h> #include <time.h> #define BUFF_MAX 30 #define SC_NUM 2 #define XF_NUM 3 int in = 0; int out = 0; sem_t sem_empty; sem_t sem_full;pthread_mutex_t mutex; int buff[BUFF_MAX] = {0}; void * sc_thread(void* arg) { int index = (int)arg; while( 1 ) { sem_wait(&sem_empty); pthread_mutex_lock(&mutex); buff[in] = rand()%100; printf("生产者%d 产生数据%d,in=%d\n",index,buff[in],in); in = (in + 1) % BUFF_MAX; pthread_mutex_unlock(&mutex); sem_post(&sem_full); int n = rand() % 10; sleep(n); } } void * xf_thread(void* arg) { int index = (int)arg; while(1) { sem_wait(&sem_full); pthread_mutex_lock(&mutex); printf("消费者%d 消费数据%d, out=%d\n",index,buff[out],out); out = (out+1) % BUFF_MAX; pthread_mutex_unlock(&mutex); sem_post(&sem_empty); int n = rand() % 10; sleep(n); } } int main() { pthread_mutex_init(&mutex,NULL); sem_init(&sem_empty,0,BUFF_MAX); sem_init(&sem_full,0,0); srand((int)time(NULL)); pthread_t sc_id[SC_NUM]; pthread_t xf_id[XF_NUM]; int i = 0; for( ; i < SC_NUM; i++ ) { pthread_create(&sc_id[i],NULL,sc_thread,(void*)i); } for( i = 0; i < XF_NUM; i++ ) { pthread_create(&xf_id[i],NULL,xf_thread,(void*)i); } for( i = 0; i < SC_NUM; i++ ) { pthread_join(sc_id[i],NULL); } for( i = 0; i < XF_NUM; i++ ) { pthread_join(xf_id[i],NULL); } sem_destroy(&sem_empty); sem_destroy(&sem_full); pthread_mutex_destroy(&mutex); exit(0); }
简易版实现:(条件变量和互斥锁)
#include<iostream> #include<mutex> #include<condition_variable> #include<functional> #include<vector> #include<list> using namespace std; std::mutex mtx; std::condition_variable pcv; std::condition_variable scv; std::list<int> ilist; const int maxelem = 10; const int n = 100; //生产者生产任务 void Add() { std::unique_lock<std::mutex> lock(mtx); for(int i = 0;i<n;++i) { while (ilist.size() >= maxelem) { pcv.wait(lock); } ilist.push_back(i); scv.notify_all(); } } //消费者消费任务 void Take(int task) { std::unique_lock<std::mutex> lock(mtx); while (ilist.size() == 0) { scv.wait(lock); } task = ilist.front(); ilist.pop_front(); pcv.notify_all(); }