【Linux】多线程_6

avatar
作者
筋斗云
阅读量:2

文章目录


九、多线程

7. 生产者消费者模型

生产者消费者模型的简单代码

Makefile

cp:Main.cc 	g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: 	rm -f cp 

Thread.hpp

#ifndef __THREAD_HPP__ #define __THREAD_HPP__  #include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h>  namespace ThreadModule {     template<typename T>     using func_t = std::function<void(T&)>;      template<typename T>     class Thread     {     public:         void Excute()         {             _func(_data);         }     public:         Thread(func_t<T> func, T& data, const std::string &name="none-name")             : _func(func)             , _data(data)             , _threadname(name)             , _stop(true)         {}          static void *threadroutine(void *args)         {             Thread<T> *self = static_cast<Thread<T> *>(args);             self->Excute();             return nullptr;         }          bool Start()         {             int n = pthread_create(&_tid, nullptr, threadroutine, this);             if(!n)             {                 _stop = false;                 return true;             }             else             {                 return false;             }         }          void Detach()         {             if(!_stop)             {                 pthread_detach(_tid);             }         }          void Join()         {             if(!_stop)             {                 pthread_join(_tid, nullptr);             }         }          std::string name()         {             return _threadname;         }          void Stop()         {             _stop = true;         }          ~Thread() {}     private:         pthread_t _tid;         std::string _threadname;         T& _data;         func_t<T> _func;         bool _stop;     }; }  #endif 

BlockQueue.hpp

#ifndef __BLOCKQUEUE_HPP__ #define __BLOCKQUEUE_HPP__  #include <iostream> #include <string> #include <queue> #include <pthread.h>  template<typename T> class BlockQueue { private:     bool IsFull() const     {         return _block_queue.size() == _cap;     }      bool IsEmpty() const     {         return _block_queue.empty();     } public:     BlockQueue(int cap)         :_cap(cap)     {         _productor_wait_num = 0;         _consumer_wait_num = 0;         // 初始化互斥锁         pthread_mutex_init(&_mutex, nullptr);         // 初始化条件变量         pthread_cond_init(&_productor_cond, nullptr);          // 初始化条件变量         pthread_cond_init(&_consumer_cond, nullptr);     }      // 生产者使用的入队列接口     void Enqueue(const T& in)     {         // 加锁         pthread_mutex_lock(&_mutex);         while (IsFull())         {             // 生产者等待数量加1             _productor_wait_num++;             // 等待条件变量通知唤醒并竞争到互斥锁             pthread_cond_wait(&_productor_cond, &_mutex);             // 生产者等待数量减1             _productor_wait_num--;         }         // 生产的数据入资源队列         _block_queue.push(in);         // 解锁         pthread_mutex_unlock(&_mutex);         // 通知消费者可以从等待队列中出队列         if (_consumer_wait_num > 0) pthread_cond_signal(&_consumer_cond);     }      // 消费者使用的出队列接口     void Pop(T* out)     {         // 加锁         pthread_mutex_lock(&_mutex);         while (IsEmpty())         {             // 消费者等待数量加1             _consumer_wait_num++;             // 等待条件变量通知唤醒并竞争到互斥锁             pthread_cond_wait(&_consumer_cond, &_mutex);             // 消费者等待数量减1             _consumer_wait_num--;         }         // 获取数据         *out = _block_queue.front();         // 数据出队列         _block_queue.pop();         // 解锁         pthread_mutex_unlock(&_mutex);         // 通知生产者可以从等待队列中出队列         if (_productor_wait_num > 0) pthread_cond_signal(&_productor_cond);     }      ~BlockQueue()     {         // 销毁互斥锁         pthread_mutex_destroy(&_mutex);         // 销毁条件变量         pthread_cond_destroy(&_productor_cond);         // 销毁条件变量         pthread_cond_destroy(&_consumer_cond);     } private:     std::queue<T> _block_queue;     // 容量上限     int _cap;     // 互斥锁     pthread_mutex_t _mutex;     // 条件变量,用于通知生产者可以入队列     pthread_cond_t _productor_cond;     // 条件变量,用于通知消费者可以出队列     pthread_cond_t _consumer_cond;     // 生产者等待数量     int _productor_wait_num;     // 消费者等待数量     int _consumer_wait_num; };  #endif 

Task.hpp

#pragma once  #include <iostream> #include <string>   class Task { public:     Task()     {}      Task(int a, int b)         :_a(a)         ,_b(b)         ,_result(0)     {}      // 执行任务     void Execute()     {         _result = _a + _b;     }      std::string ResultToString()     {         return std::to_string(_a) + " + " + std::to_string(_b) + " = " + std::to_string(_result);     }      std::string DebugToString()     {         return std::to_string(_a) + " + " + std::to_string(_b) + " = ?";     } private:     int _a;     int _b;     int _result; }; 

Main.cc

#include "BlockQueue.hpp" #include "Thread.hpp" #include "Task.hpp" #include <string> #include <vector> #include <unistd.h>  using namespace ThreadModule; // 创建类型别名 using blockqueue_t = BlockQueue<Task>;  // 消费者线程 void Consumer(blockqueue_t& bq) {     while (true)     {         Task t;         // 从阻塞队列中获取任务资源         bq.Pop(&t);         // 执行任务         t.Execute();         // 输出结果         std::cout << "Consumer: " << t.ResultToString() << std::endl;     } }  // 生产者线程 void Productor(blockqueue_t& bq) {     srand(time(nullptr)^pthread_self());     while (true)     {         // 分配任务         int a = rand() % 10 + 1;         usleep(1234);         int b = rand() % 20 + 1;         Task t(a, b);         // 任务放入阻塞队列         bq.Enqueue(t);         // 输出任务信息         std::cout << "Productor: " << t.DebugToString() << std::endl;         sleep(1);     } }  // 启动线程 void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func) {     for (int i = 0; i < num; i++)     {         // 创建一批线程         std::string name = "thread-" + std::to_string(i + 1);         threads->emplace_back(func, bq, name);         threads->back().Start();     } }  // 创建消费者线程 void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq) {     StartComm(threads, num, bq, Consumer); }  // 创建生产者线程 void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq) {     StartComm(threads, num, bq, Productor); }  // 等待所有线程结束 void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads) {     for (auto& thread : threads)     {         thread.Join();     } }  int main() {     // 创建阻塞队列,容量为5     blockqueue_t* bq = new blockqueue_t(5);     // 创建线程     std::vector<Thread<blockqueue_t>> threads;     // 创建 1个消费者线程     StartConsumer(&threads, 1, *bq);     // 创建 1个生产者线程     StartProductor(&threads, 1, *bq);      // 等待所有线程结束     WaitAllThread(threads);      return 0; } 

结果演示

在这里插入图片描述
这里使用的是单生产者和单消费者,当然也可以在主函数处创建多生产者和多消费者的模型。


未完待续

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!