【Linux】多线程_9

avatar
作者
猴君
阅读量:1

文章目录


九、多线程

10. 线程池

这里我没实现一些 懒汉单例模式 的线程池,并且包含 日志打印 的线程池:
Makefile

threadpool:Main.cc 	g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: 	rm -f threadpool 

Thread.hpp

#ifndef __THREAD_HPP__ #define __THREAD_HPP__  #include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h>  namespace ThreadModule {     // 类型别名     using func_t = std::function<void(std::string)>;      // 线程类     class Thread     {     public:         void Excute()         {             _func(_threadname);         }     public:         Thread(func_t func, std::string name = "none-name")             :_func(func)             ,_threadname(name)             ,_stop(true)         {}          // 执行任务         static void *threadroutine(void *args)         {             Thread *self = static_cast<Thread*>(args);             self->Excute();             return nullptr;         }          // 启动线程         bool Start()         {             int n = pthread_create(&_tid, nullptr, threadroutine, this);             if(!n)             {                 _stop = false;                 return true;             }             else             {                 return false;             }         }          // 停止线程         void Detach()         {             if(!_stop)             {                 pthread_detach(_tid);             }         }          // 等待线程结束         void Join()         {             if(!_stop)             {                 pthread_join(_tid, nullptr);             }         }          std::string name()         {             return _threadname;         }          void Stop()         {             _stop = true;         }          ~Thread()         {}     private:         pthread_t _tid;         std::string _threadname;         func_t _func;         bool _stop;     }; }  #endif 

LockGuard.hpp

#ifndef __LOCK_GUARD_HPP__ #define __LOCK_GUARD_HPP__  #include <iostream> #include <pthread.h>  class LockGuard { public:     // 构造函数加锁     LockGuard(pthread_mutex_t *mutex)         :_mutex(mutex)     {         pthread_mutex_lock(_mutex);     }      // 析构函数解锁     ~LockGuard()     {         pthread_mutex_unlock(_mutex);     } private:     pthread_mutex_t *_mutex; };  #endif 

Log.hpp

#pragma once  #include <iostream> #include <fstream> #include <cstdio> #include <string> #include <ctime> #include <cstdarg> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #include "LockGuard.hpp"  // 宏定义,用于定义日志格式 #define LOG(level, format, ...) do{LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__);}while (0) // 将日志输入到文件 #define EnableFile() do{gIsSave = true;}while (0) // 将日志输出到显示器 #define EnableScreen() do{gIsSave = false;}while (0)  bool gIsSave = false; // 日志文件名 const std::string logname = "log.txt";  // 枚举日志级别 enum Level {     DEBUG = 0,     INFO,     WARNING,     ERROR,     FATAL };  // 保存日志到文件 void SaveFile(const std::string &filename, const std::string &message) {     std::ofstream out(filename, std::ios::app);     if (!out.is_open())     {         return;     }     out << message;     out.close(); }  // 日志级别转字符串 std::string LevelToString(int level) {     switch (level)     {     case DEBUG:         return "Debug";     case INFO:         return "Info";     case WARNING:         return "Warning";     case ERROR:         return "Error";     case FATAL:         return "Fatal";     default:         return "Unknown";     } }  // 获取当前时间字符串 std::string GetTimeString() {     time_t curr_time = time(nullptr);     struct tm *format_time = localtime(&curr_time);     if (format_time == nullptr)         return "None";     char time_buffer[1024];     snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",              format_time->tm_year + 1900,              format_time->tm_mon + 1,              format_time->tm_mday,              format_time->tm_hour,              format_time->tm_min,              format_time->tm_sec);     return time_buffer; }  // 日志锁,同一时刻只能写一个日志 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;  // 日志信息 void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...) {     // 日志级别     std::string levelstr = LevelToString(level);     // 时间     std::string timestr = GetTimeString();     // 进程id     pid_t selfid = getpid();      // 日志内容     char buffer[1024];     va_list arg;     va_start(arg, format);     vsnprintf(buffer, sizeof(buffer), format, arg);     va_end(arg);      // 日志格式化     std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +                           "[" + std::to_string(selfid) + "]" +                           "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";     LockGuard lockguard(&lock);      // 输出日志     if (!issave)     {         std::cout << message;     }     else     {         SaveFile(logname, message);     } } 

Task.hpp

#pragma once  #include <iostream> #include <string> #include <functional>  class Task { public:     Task()     {}      Task(int a, int b)         :_a(a)         ,_b(b)         ,_result(0)     {}      // 执行加法功能     void Excute()     {         _result = _a + _b;     }      // 结果     std::string ResultToString()     {         return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);     }      // 问题     std::string DebugToString()     {         return std::to_string(_a) + "+" + std::to_string(_b) + "= ?";     }      // 重载()运算符     void operator()()     {         Excute();     } private:     int _a;     int _b;     int _result; }; 

ThreadPool.hpp

#pragma once  #include <iostream> #include <vector> #include <queue> #include <pthread.h> #include "Log.hpp" #include "Thread.hpp" #include "LockGuard.hpp"  using namespace ThreadModule;  // 线程池默认线程数 const static int gdefaultthreadnum = 10;  template <typename T> class ThreadPool { private:     // 线程互斥锁     void LockQueue()     {         pthread_mutex_lock(&_mutex);     }      // 线程互斥解锁     void UnlockQueue()     {         pthread_mutex_unlock(&_mutex);     }      // 线程等待     void ThreadSleep()     {         pthread_cond_wait(&_cond, &_mutex);     }      // 线程唤醒     void ThreadWakeup()     {         pthread_cond_signal(&_cond);     }      // 唤醒全部线程     void ThreadWakeupAll()     {         pthread_cond_broadcast(&_cond);     }      // 私有构造函数     ThreadPool(int threadnum = gdefaultthreadnum)         :_threadnum(threadnum)         ,_waitnum(0)         ,_isrunning(false)     {         // 初始化锁         pthread_mutex_init(&_mutex, nullptr);         pthread_cond_init(&_cond, nullptr);         // 日志         LOG(INFO, "ThreadPool Construct()");     }      // 初始化线程池     void InitThreadPool()     {         // 创建一批线程         for (int num = 0; num < _threadnum; num++)         {             std::string name = "thread-" + std::to_string(num + 1);             _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);             // 日志             LOG(INFO, "init thread %s done", name.c_str());         }         _isrunning = true;     }      // 启动线程池     void Start()     {         for (auto &thread : _threads)         {             thread.Start();         }     }      // 任务处理函数     void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用!     {         // 日志         LOG(INFO, "%s is running...", name.c_str());         while (true)         {             // 加锁             LockQueue();             while (_task_queue.empty() && _isrunning)             {                 _waitnum++;                 ThreadSleep();                 _waitnum--;             }             // 退出情况             if (_task_queue.empty() && !_isrunning)             {                 UnlockQueue();                 break;             }             // 取出任务             T t = _task_queue.front();             _task_queue.pop();             UnlockQueue();             // 日志             LOG(DEBUG, "%s get a task", name.c_str());             // 执行任务             t();             // 日志             LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());         }     }      // 禁用拷贝构造和赋值操作     ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;     ThreadPool(const ThreadPool<T> &) = delete; public:     static ThreadPool<T> *GetInstance()     {         // 首次使用时,创建线程池单例         if (nullptr == _instance)         {             // 对于多线程创建单例时加锁,保证线程安全             LockGuard lockguard(&_lock);             if (nullptr == _instance)             {                 // 创建线程池实例                 _instance = new ThreadPool<T>();                 _instance->InitThreadPool();                 _instance->Start();                 LOG(DEBUG, "创建线程池单例");                 return _instance;             }         }         // 已经创建过线程池单例,直接返回         LOG(DEBUG, "获取线程池单例");         return _instance;     }      // 停止线程池     void Stop()     {         LockQueue();         _isrunning = false;         ThreadWakeupAll();         UnlockQueue();     }          // 等待线程池退出     void Wait()     {         for (auto &thread : _threads)         {             thread.Join();             LOG(INFO, "%s is quit...", thread.name().c_str());         }     }      // 向线程池中添加任务     bool Enqueue(const T &t)     {         bool ret = false;         LockQueue();         if (_isrunning)         {             _task_queue.push(t);             if (_waitnum > 0)             {                 ThreadWakeup();             }             LOG(DEBUG, "enqueue task success");             ret = true;         }         UnlockQueue();         return ret;     }          // 析构自动释放锁资源     ~ThreadPool()     {         pthread_mutex_destroy(&_mutex);         pthread_cond_destroy(&_cond);     } private:     // 线程池中线程个数     int _threadnum;     // 线程     std::vector<Thread> _threads;     // 任务队列     std::queue<T> _task_queue;     // 互斥锁     pthread_mutex_t _mutex;     // 条件变量     pthread_cond_t _cond;      // 等待线程数     int _waitnum;     // 线程池是否运行     bool _isrunning;      // 线程池单例     static ThreadPool<T> *_instance;     // 全局锁     static pthread_mutex_t _lock; };  // 初始化静态变量 template <typename T> ThreadPool<T> *ThreadPool<T>::_instance = nullptr;  // 全局锁 template <typename T> pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER; 

Main.cc

#include "ThreadPool.hpp" #include "Task.hpp" #include "Log.hpp" #include <iostream> #include <string> #include <memory> #include <ctime>  int main() {     // 日志     LOG(DEBUG, "程序已经加载");     sleep(2);     // 创建线程池单例     ThreadPool<Task>::GetInstance();     sleep(2);     // 获取单例     ThreadPool<Task>::GetInstance();     sleep(2);      ThreadPool<Task>::GetInstance();     sleep(2);      ThreadPool<Task>::GetInstance();     sleep(2);      // 等待线程结束     ThreadPool<Task>::GetInstance()->Wait();     sleep(2);      return 0; } 

结果演示:
在这里插入图片描述


未完待续

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!