阅读量:1
文章目录
九、多线程
10. 线程池
这里我没实现一些 懒汉单例模式 的线程池,并且包含 日志打印 的线程池:
Makefile:
threadpool:Main.cc g++ -o $@ $^ -std=c++11 -lpthread .PHONY:clean clean: rm -f threadpool
Thread.hpp:
#ifndef __THREAD_HPP__ #define __THREAD_HPP__ #include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h> namespace ThreadModule { // 类型别名 using func_t = std::function<void(std::string)>; // 线程类 class Thread { public: void Excute() { _func(_threadname); } public: Thread(func_t func, std::string name = "none-name") :_func(func) ,_threadname(name) ,_stop(true) {} // 执行任务 static void *threadroutine(void *args) { Thread *self = static_cast<Thread*>(args); self->Excute(); return nullptr; } // 启动线程 bool Start() { int n = pthread_create(&_tid, nullptr, threadroutine, this); if(!n) { _stop = false; return true; } else { return false; } } // 停止线程 void Detach() { if(!_stop) { pthread_detach(_tid); } } // 等待线程结束 void Join() { if(!_stop) { pthread_join(_tid, nullptr); } } std::string name() { return _threadname; } void Stop() { _stop = true; } ~Thread() {} private: pthread_t _tid; std::string _threadname; func_t _func; bool _stop; }; } #endif
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__ #define __LOCK_GUARD_HPP__ #include <iostream> #include <pthread.h> class LockGuard { public: // 构造函数加锁 LockGuard(pthread_mutex_t *mutex) :_mutex(mutex) { pthread_mutex_lock(_mutex); } // 析构函数解锁 ~LockGuard() { pthread_mutex_unlock(_mutex); } private: pthread_mutex_t *_mutex; }; #endif
Log.hpp:
#pragma once #include <iostream> #include <fstream> #include <cstdio> #include <string> #include <ctime> #include <cstdarg> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #include "LockGuard.hpp" // 宏定义,用于定义日志格式 #define LOG(level, format, ...) do{LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__);}while (0) // 将日志输入到文件 #define EnableFile() do{gIsSave = true;}while (0) // 将日志输出到显示器 #define EnableScreen() do{gIsSave = false;}while (0) bool gIsSave = false; // 日志文件名 const std::string logname = "log.txt"; // 枚举日志级别 enum Level { DEBUG = 0, INFO, WARNING, ERROR, FATAL }; // 保存日志到文件 void SaveFile(const std::string &filename, const std::string &message) { std::ofstream out(filename, std::ios::app); if (!out.is_open()) { return; } out << message; out.close(); } // 日志级别转字符串 std::string LevelToString(int level) { switch (level) { case DEBUG: return "Debug"; case INFO: return "Info"; case WARNING: return "Warning"; case ERROR: return "Error"; case FATAL: return "Fatal"; default: return "Unknown"; } } // 获取当前时间字符串 std::string GetTimeString() { time_t curr_time = time(nullptr); struct tm *format_time = localtime(&curr_time); if (format_time == nullptr) return "None"; char time_buffer[1024]; snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", format_time->tm_year + 1900, format_time->tm_mon + 1, format_time->tm_mday, format_time->tm_hour, format_time->tm_min, format_time->tm_sec); return time_buffer; } // 日志锁,同一时刻只能写一个日志 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 日志信息 void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...) { // 日志级别 std::string levelstr = LevelToString(level); // 时间 std::string timestr = GetTimeString(); // 进程id pid_t selfid = getpid(); // 日志内容 char buffer[1024]; va_list arg; va_start(arg, format); vsnprintf(buffer, sizeof(buffer), format, arg); va_end(arg); // 日志格式化 std::string message = "[" + timestr + "]" + "[" + levelstr + "]" + "[" + std::to_string(selfid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n"; LockGuard lockguard(&lock); // 输出日志 if (!issave) { std::cout << message; } else { SaveFile(logname, message); } }
Task.hpp:
#pragma once #include <iostream> #include <string> #include <functional> class Task { public: Task() {} Task(int a, int b) :_a(a) ,_b(b) ,_result(0) {} // 执行加法功能 void Excute() { _result = _a + _b; } // 结果 std::string ResultToString() { return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result); } // 问题 std::string DebugToString() { return std::to_string(_a) + "+" + std::to_string(_b) + "= ?"; } // 重载()运算符 void operator()() { Excute(); } private: int _a; int _b; int _result; };
ThreadPool.hpp:
#pragma once #include <iostream> #include <vector> #include <queue> #include <pthread.h> #include "Log.hpp" #include "Thread.hpp" #include "LockGuard.hpp" using namespace ThreadModule; // 线程池默认线程数 const static int gdefaultthreadnum = 10; template <typename T> class ThreadPool { private: // 线程互斥锁 void LockQueue() { pthread_mutex_lock(&_mutex); } // 线程互斥解锁 void UnlockQueue() { pthread_mutex_unlock(&_mutex); } // 线程等待 void ThreadSleep() { pthread_cond_wait(&_cond, &_mutex); } // 线程唤醒 void ThreadWakeup() { pthread_cond_signal(&_cond); } // 唤醒全部线程 void ThreadWakeupAll() { pthread_cond_broadcast(&_cond); } // 私有构造函数 ThreadPool(int threadnum = gdefaultthreadnum) :_threadnum(threadnum) ,_waitnum(0) ,_isrunning(false) { // 初始化锁 pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); // 日志 LOG(INFO, "ThreadPool Construct()"); } // 初始化线程池 void InitThreadPool() { // 创建一批线程 for (int num = 0; num < _threadnum; num++) { std::string name = "thread-" + std::to_string(num + 1); _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 日志 LOG(INFO, "init thread %s done", name.c_str()); } _isrunning = true; } // 启动线程池 void Start() { for (auto &thread : _threads) { thread.Start(); } } // 任务处理函数 void HandlerTask(std::string name) // 类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用! { // 日志 LOG(INFO, "%s is running...", name.c_str()); while (true) { // 加锁 LockQueue(); while (_task_queue.empty() && _isrunning) { _waitnum++; ThreadSleep(); _waitnum--; } // 退出情况 if (_task_queue.empty() && !_isrunning) { UnlockQueue(); break; } // 取出任务 T t = _task_queue.front(); _task_queue.pop(); UnlockQueue(); // 日志 LOG(DEBUG, "%s get a task", name.c_str()); // 执行任务 t(); // 日志 LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str()); } } // 禁用拷贝构造和赋值操作 ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; ThreadPool(const ThreadPool<T> &) = delete; public: static ThreadPool<T> *GetInstance() { // 首次使用时,创建线程池单例 if (nullptr == _instance) { // 对于多线程创建单例时加锁,保证线程安全 LockGuard lockguard(&_lock); if (nullptr == _instance) { // 创建线程池实例 _instance = new ThreadPool<T>(); _instance->InitThreadPool(); _instance->Start(); LOG(DEBUG, "创建线程池单例"); return _instance; } } // 已经创建过线程池单例,直接返回 LOG(DEBUG, "获取线程池单例"); return _instance; } // 停止线程池 void Stop() { LockQueue(); _isrunning = false; ThreadWakeupAll(); UnlockQueue(); } // 等待线程池退出 void Wait() { for (auto &thread : _threads) { thread.Join(); LOG(INFO, "%s is quit...", thread.name().c_str()); } } // 向线程池中添加任务 bool Enqueue(const T &t) { bool ret = false; LockQueue(); if (_isrunning) { _task_queue.push(t); if (_waitnum > 0) { ThreadWakeup(); } LOG(DEBUG, "enqueue task success"); ret = true; } UnlockQueue(); return ret; } // 析构自动释放锁资源 ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } private: // 线程池中线程个数 int _threadnum; // 线程 std::vector<Thread> _threads; // 任务队列 std::queue<T> _task_queue; // 互斥锁 pthread_mutex_t _mutex; // 条件变量 pthread_cond_t _cond; // 等待线程数 int _waitnum; // 线程池是否运行 bool _isrunning; // 线程池单例 static ThreadPool<T> *_instance; // 全局锁 static pthread_mutex_t _lock; }; // 初始化静态变量 template <typename T> ThreadPool<T> *ThreadPool<T>::_instance = nullptr; // 全局锁 template <typename T> pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
Main.cc:
#include "ThreadPool.hpp" #include "Task.hpp" #include "Log.hpp" #include <iostream> #include <string> #include <memory> #include <ctime> int main() { // 日志 LOG(DEBUG, "程序已经加载"); sleep(2); // 创建线程池单例 ThreadPool<Task>::GetInstance(); sleep(2); // 获取单例 ThreadPool<Task>::GetInstance(); sleep(2); ThreadPool<Task>::GetInstance(); sleep(2); ThreadPool<Task>::GetInstance(); sleep(2); // 等待线程结束 ThreadPool<Task>::GetInstance()->Wait(); sleep(2); return 0; }
结果演示: