阅读量:2
文章目录
九、多线程
7. 生产者消费者模型
生产者消费者模型的简单代码
Makefile:
cp:Main.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f cp
Thread.hpp:
#ifndef __THREAD_HPP__ #define __THREAD_HPP__ #include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h> namespace ThreadModule { template<typename T> using func_t = std::function<void(T&)>; template<typename T> class Thread { public: void Excute() { _func(_data); } public: Thread(func_t<T> func, T& data, const std::string &name="none-name") : _func(func) , _data(data) , _threadname(name) , _stop(true) {} static void *threadroutine(void *args) { Thread<T> *self = static_cast<Thread<T> *>(args); self->Excute(); return nullptr; } bool Start() { int n = pthread_create(&_tid, nullptr, threadroutine, this); if(!n) { _stop = false; return true; } else { return false; } } void Detach() { if(!_stop) { pthread_detach(_tid); } } void Join() { if(!_stop) { pthread_join(_tid, nullptr); } } std::string name() { return _threadname; } void Stop() { _stop = true; } ~Thread() {} private: pthread_t _tid; std::string _threadname; T& _data; func_t<T> _func; bool _stop; }; } #endif
BlockQueue.hpp:
#ifndef __BLOCKQUEUE_HPP__ #define __BLOCKQUEUE_HPP__ #include <iostream> #include <string> #include <queue> #include <pthread.h> template<typename T> class BlockQueue { private: bool IsFull() const { return _block_queue.size() == _cap; } bool IsEmpty() const { return _block_queue.empty(); } public: BlockQueue(int cap) :_cap(cap) { _productor_wait_num = 0; _consumer_wait_num = 0; // 初始化互斥锁 pthread_mutex_init(&_mutex, nullptr); // 初始化条件变量 pthread_cond_init(&_productor_cond, nullptr); // 初始化条件变量 pthread_cond_init(&_consumer_cond, nullptr); } // 生产者使用的入队列接口 void Enqueue(const T& in) { // 加锁 pthread_mutex_lock(&_mutex); while (IsFull()) { // 生产者等待数量加1 _productor_wait_num++; // 等待条件变量通知唤醒并竞争到互斥锁 pthread_cond_wait(&_productor_cond, &_mutex); // 生产者等待数量减1 _productor_wait_num--; } // 生产的数据入资源队列 _block_queue.push(in); // 解锁 pthread_mutex_unlock(&_mutex); // 通知消费者可以从等待队列中出队列 if (_consumer_wait_num > 0) pthread_cond_signal(&_consumer_cond); } // 消费者使用的出队列接口 void Pop(T* out) { // 加锁 pthread_mutex_lock(&_mutex); while (IsEmpty()) { // 消费者等待数量加1 _consumer_wait_num++; // 等待条件变量通知唤醒并竞争到互斥锁 pthread_cond_wait(&_consumer_cond, &_mutex); // 消费者等待数量减1 _consumer_wait_num--; } // 获取数据 *out = _block_queue.front(); // 数据出队列 _block_queue.pop(); // 解锁 pthread_mutex_unlock(&_mutex); // 通知生产者可以从等待队列中出队列 if (_productor_wait_num > 0) pthread_cond_signal(&_productor_cond); } ~BlockQueue() { // 销毁互斥锁 pthread_mutex_destroy(&_mutex); // 销毁条件变量 pthread_cond_destroy(&_productor_cond); // 销毁条件变量 pthread_cond_destroy(&_consumer_cond); } private: std::queue<T> _block_queue; // 容量上限 int _cap; // 互斥锁 pthread_mutex_t _mutex; // 条件变量,用于通知生产者可以入队列 pthread_cond_t _productor_cond; // 条件变量,用于通知消费者可以出队列 pthread_cond_t _consumer_cond; // 生产者等待数量 int _productor_wait_num; // 消费者等待数量 int _consumer_wait_num; }; #endif
Task.hpp:
#pragma once #include <iostream> #include <string> class Task { public: Task() {} Task(int a, int b) :_a(a) ,_b(b) ,_result(0) {} // 执行任务 void Execute() { _result = _a + _b; } std::string ResultToString() { return std::to_string(_a) + " + " + std::to_string(_b) + " = " + std::to_string(_result); } std::string DebugToString() { return std::to_string(_a) + " + " + std::to_string(_b) + " = ?"; } private: int _a; int _b; int _result; };
Main.cc:
#include "BlockQueue.hpp" #include "Thread.hpp" #include "Task.hpp" #include <string> #include <vector> #include <unistd.h> using namespace ThreadModule; // 创建类型别名 using blockqueue_t = BlockQueue<Task>; // 消费者线程 void Consumer(blockqueue_t& bq) { while (true) { Task t; // 从阻塞队列中获取任务资源 bq.Pop(&t); // 执行任务 t.Execute(); // 输出结果 std::cout << "Consumer: " << t.ResultToString() << std::endl; } } // 生产者线程 void Productor(blockqueue_t& bq) { srand(time(nullptr)^pthread_self()); while (true) { // 分配任务 int a = rand() % 10 + 1; usleep(1234); int b = rand() % 20 + 1; Task t(a, b); // 任务放入阻塞队列 bq.Enqueue(t); // 输出任务信息 std::cout << "Productor: " << t.DebugToString() << std::endl; sleep(1); } } // 启动线程 void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func) { for (int i = 0; i < num; i++) { // 创建一批线程 std::string name = "thread-" + std::to_string(i + 1); threads->emplace_back(func, bq, name); threads->back().Start(); } } // 创建消费者线程 void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq) { StartComm(threads, num, bq, Consumer); } // 创建生产者线程 void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq) { StartComm(threads, num, bq, Productor); } // 等待所有线程结束 void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads) { for (auto& thread : threads) { thread.Join(); } } int main() { // 创建阻塞队列,容量为5 blockqueue_t* bq = new blockqueue_t(5); // 创建线程 std::vector<Thread<blockqueue_t>> threads; // 创建 1个消费者线程 StartConsumer(&threads, 1, *bq); // 创建 1个生产者线程 StartProductor(&threads, 1, *bq); // 等待所有线程结束 WaitAllThread(threads); return 0; }
结果演示
这里使用的是单生产者和单消费者,当然也可以在主函数处创建多生产者和多消费者的模型。