1、多线程
C++11之前没有引入线程的概念,如果想要实现多线程,需要借助操作系统平台提供的API,比如Linux的<pthead.h>,或者windows下的<windows.h>。
C++11提供了语言层面上的多线程,包含在头文件<thread.h>中,解决了跨平台的问题,提供了管理线程、保护共享数据、线程间同步操作、原子操作等类。C++11新标准中引入了5个头文件来支持多线程编程。
1.1、多进程与多线程
- 多进程并发
使用多进程并发是将一个应用程序划分为多个独立的进程(其中每个进程只有一个线程),这些独立的进程间可以相互通信,共同完成任务。由于操作系统对进程提供了大量的保护机制,以避免一个进程修改了另一个进程的数据,使用多进程比多线程更容易写出安全的代码,但这也就造成了多进程并发的两个缺点:
- 在进程间的通信,无论是用信号、套接字、还是文件、管道等方式,其使用要么是比较复杂,要么就是速度较慢,或者干脆就是又复杂又慢;
- 运行多个进程的开销很大,操作系统要分配很多的资源来对这些进程进行管理。
由于多个进程并发完成同一任务时,不可避免的要操作同一数据和进程间的相互通信,上述的两个缺点也就决定了多进程不是一个好的选择。
- 多线程并发
多线程并发指的是在同一个进程中执行多个进程。
☆优点:
线程是轻量级的进程,每个线程可以独立的运行不同的指令序列,但是线程不独立的拥有资源,依赖于创建它的进程而存在。也就是说,同一进程的多个线程共享相同的地址空间,可以访问进程中的大部分数据,指针和引用可以在线程间进行传递。这样,统一进程内的多个线程能够很方便的进行数据共享以及通信,也就比进程更适用于并发操作。
☆缺点:
由于缺少操作系统提供的保护机制,在多线程共享数据以及通信时,程序员就需要做出措施来保证对共享数据段的操作是以预想的操作顺序进行的,并且要极力的避开死锁(deadlock)。
1.2、并发(concurrency)和并行(parallel)
- 并发
一个时间片运行一个线程的代码,宏观上是同时,但其实不是
- 并行
宏观与微观上都是同时运行
1.3、创建线程
创建线程:将函数添加进线程当中即可。
- 形式1:thread 线程名(函数名);
#include<iostream> #include<thread> using namespace std; void thread_fun1(){ cout<<"子线程Mythread1正在运行"<<endl; } int main(){ //创建线程Mythread1 thread Mythread1 (thread_fun1); //加入线程 Mythread1.join(); cout<<"主线程正在运行"<<endl; }
- 形式2:thread 线程名(函数名(参数));
void thread_fun2(int x){ cout<<x<<endl; } //... thread Mythread2 (thread_fun2(100)); Mythread2.join(); //...
- 形式3:thread (函数,参数).join();
void thrad_fun3(int x){ cout<<x<<endl; } //... thread (thread_fun3,1).join(); //...
- 形式4:利用类的仿函数作为线程处理函数
class A{ public: void operator()(){ cout<<"子线程"<<endl; } } int main(){ //类的实例化对象充当线程函数 A a; thread Mythread4(a); Mythread4.join(); //或者这样写 //thread Mythread((A())); //Mythrread.join(); cout<<"主线程"<<endl; }
- 形式5:通过Lambda表达式创建线程
简单来讲,就是把函数得定义和调用放在一处实现。
//... thread Mythread5([]{cout<<"子线程调用"<<endl;}); Mythread5.join();
- 形式6:通过智能指针的方式创建线程
即以智能指针为参数的函数作为线程的处理函数
void thread_fun3(unique_ptr<int>ptr){ cout<<"子线程:"<<ptr.get()<<endl; cout<<"子线程id:"<<this_thread::get_id()<<endl; } int main(){ //智能指针作为参数的线程处理函数 int *p = new int(12); cout<<*p<<endl; unique_ptr<int> ptr(new int(1000)); cout<<"主线程"<<ptr.get()<<endl;//ptr.get()用于获取智能指针的地址 thread Mythread6(thread_fun3,move(ptr)); Mythread6.join(); cout<<"主线程id"<<this_thread::get_id()<<endl; cout<<"主线程:"<<ptr.get()<<endl; return 0; }
- 形式7:类的成员函数做线程处理函数
class A { public: void func(int x) { cout << "子线程id:" <<this_thread::get_id()<< endl; } }; int main(){ A a; thread Mythread7(&A::func,a,1);//注意写法 Mythread7.join(); cout<<"主线程id:"<<this_thread::get_id()<<endl; return 0; }
1.4、join()与detach()方式
当线程启动后,一定要在和线程相关联的thread销毁前,确定以何种方式等待线程执行结束。
- join():等待启动的线程完成,再会继续往下执行;
- detach():启动的线程自主在后台运行,当前的代码继续往下执行,不等待新线程结束。
注意:thread对象只能被join或detach一次。
可以用joinable()来判断对象是否被join过,已经join过的线程用joinable会返回0。
1.5、this_thread
this_thread是一个类,它有4个功能函数,具体如下:
函数 | 使用 | 说明 |
---|---|---|
get_id | this_thread::get_id() | 获得线程id |
yield | this_thread::yield() | 放弃线程执行,回到就绪状态 |
sleep_for | this_thread::sleep_for(x) | 暂停x秒 |
sleep_until | 具体用法如下 | 直到…时间才开始运行 |
#include<iostream> #include<thread> //包含标准时间库 #include<chrono> //包含时间和日期函数 #include<ctime> #include<iomanip> //禁用编译器对localtime的警告4996 #pragma warning(disable:4996) using namespace std; int main(){ using chrono::system_clock; time_t tt = system_clock::to_time_t(system_clock::now());//输出当前时间并转换为time_t类型 struct tm *ptm = localtime(&tt);//将time_t类型的时间转换为struct tm类型 cout<<"Current time:"<<put_time(ptm,"%X")<<endl;//必须大写X,若小写,输出的为日期 cout<<"Waiting for the next minute to begin..."<<endl; ++ptm->tm_min;//增加当前分钟数 ptm->tm_sec = 0;//将秒数设为0 this_thread::sleep_until(system_clock::from_time_t(mktime(ptm)));//使当前线程休眠直到指定时间 cout<<put_time(ptm,"%X")<<"reached"<<endl; getchar(); return 0; }
2、mutex
2.1、mutex
mutex头文件主要声明了与互斥量(mutex)相关的类。
互斥量mutex:是线程间通信的一种方式,只有用户互斥对象的线程才能访问公共资源,因为互斥对象只有一个,从而避免了多个线程同时访问公共资源。
mutex提供了4种互斥类型,如下所示:
类型 | 说明 |
---|---|
mutex | 最基本的Mutex类 |
recursive_mutex | 递归Mutex类 |
time_mutex | 定时Mutex类 |
recursive_timed_mutex | 定时递归Mutex类 |
示例,不加锁的情况:
#include <iostream> #include <thread> #include <vector> #include <mutex> #include <chrono> #include <stdexcept> int counter = 0; void increase(int time) { for (int i = 0; i < time; i++) { // 当前线程休眠1毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(1)); counter++; } } int main(int argc, char** argv) { std::thread t1(increase, 100); std::thread t2(increase, 100); t1.join(); t2.join(); std::cout << "counter:" << counter << std::endl; return 0; }
第一次运行的结果为:
第二次运行的结果为:
加上锁:
#include <iostream> #include <thread> #include <vector> #include <mutex> #include <chrono> #include <stdexcept> int counter = 0; std::mutex mtx; void increase(int time) { for (int i = 0; i < time; i++) { //上锁 mtx.lock(); // 当前线程休眠1毫秒 std::this_thread::sleep_for(std::chrono::milliseconds(1)); counter++; //解锁 mtx.unlock(); } } int main(int argc, char** argv) { std::thread t1(increase, 100); std::thread t2(increase, 100); t1.join(); t2.join(); std::cout << "counter:" << counter << std::endl; return 0; }
第一次运行结果为:
第二次运行结果为:
注意:
- 任意时刻只允许一个线程对其上锁;
- mtx.lock():调用该函数得线程尝试加锁,如果上锁不成功,即其他线程已经上锁且未释放,则当前线程block。如果上锁成功,则执行后面的操作,操作完成后要调用mtx.unlock()释放锁,否则会导致死锁的产生;
- mutex还有一个操作为:mtx.try_lock(),字面意思就是“尝试上锁”,与mtx.lock()不同的是:如果上锁不成功,当前线程不会阻塞。
2.2、lock_guard
创建lock_guard时,它将尝试获取提供给它的互斥锁的所有权。当控制流离开lock_guard对象的作用域时,lock_guard析构并释放互斥量。
特点:
- 创建即加锁,作用域结束后自动析构并解锁,不需要手动解锁;
- 不能中途解锁,必须等作用域结束才能解锁;
- 不能复制。
示例:
#include <iostream> #include <thread> #include <vector> #include <mutex> #include <chrono> #include <stdexcept> using namespace std; int counter = 0; std::mutex mtx; void increase(int time) { for (int i = 0; i < time; i++) { //上锁 const lock_guard<std::mutex>lock(mtx); ++counter; } } int main(int argc, char** argv) { std::thread t1(increase, 100); std::thread t2(increase, 100); t1.join(); t2.join(); std::cout << "counter:" << counter << std::endl; return 0; }
每次运行后结果都为200。
2.3、unique_lock
unique_lock是lock_guard的优化版,具有lock_guard的所有功能,还具有很多其他方法,使用起来更加灵活方便,能够应对更复杂的锁需要。
特点:
- 创建时可以不锁定(通过指定第二个参数defer_lock),而在需要时再锁;
- 可以随时加锁解锁;
- 作用域结束后自动析构并解锁;
- 不可复制,可移动;
- 条件变量需要改类型的锁作为参数(此时必须使用unique_lock)
ref():用于包装引用传递的值;
cref():用于包装按const引用传递的值。
3、condition_variable
condition_variable的头文件有两个variable类,一个是condition_variable,另一个是condition_variable_any。condition_variable必须结合unique_lock使用,而condition_variable_any可以使用任意的锁。
condition_variable条件变量可以阻塞(wait 、wait_for、wait_until)调用的线程直到使用(notify_one、notify_all)通知恢复为止。condition_variable是一个类,既有构造函数,也有析构函数,使用时需要构造对应的condition_variable对象,调用对象相应的函数来实现上面的功能。
类型 | 说明 |
---|---|
condition_variable | 构建对象 |
析构 | 删除,释放资源 |
wait | wait until notified |
wait_for | wait for timeout or until notified |
wait_until | wait until notified or time point |
notify_one | 解锁一个线程,若有多个,则未知哪个线程执行 |
botify_all | 解锁所有线程 |
cv_status | 这是一个类,表示variable的状态 |
enum class cv_status{no time_out, timeout};
3.1、wait
condition_variable提供了两种wait()函数分别是:
//只有一个参数为unique_lock对象,当前线程的执行会被阻塞,直到收到notify为止 void wait(unique_lock<mutex>&lck); //有两个参数分别为unique_lock对象和一个可调用对象(函数或者Lambda表达式等),当前线程仅在pred=false时阻塞 template <class Predicate> void wait(unique_lock<mutex>&lck, Predicate pred);
调用wait时,该函数会自动调用lck.unlock()释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行,然后阻塞当前线程,另外,一旦当前线程获得通知(notified,通常是另外某个线程调用notify_*唤醒了当前线程),wait()函数再次调用lck.lock()重新上锁然后wait返回退出,可以理解为lck的状态变换和wait函数被调用(退出)是同时进行的。
示例:
#include <iostream> // std::cout #include <thread> // std::thread, std::this_thread::yield #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; std::condition_variable cv; int cargo = 0; //当cargo为0时,返回bool值0,否则返回1 bool shipment_available() {return cargo!=0;} void consume (int n) { for (int i=0; i<n; ++i) { std::unique_lock<std::mutex> lck(mtx);//自动上锁 //第二个参数为false才阻塞(wait),阻塞完即unlock,给其它线程资源 cv.wait(lck,shipment_available); // consume: std::cout << cargo << '\n'; cargo=0; } } int main () { std::thread consumer_thread (consume,10); for (int i=0; i<10; ++i) { //每次cargo每次为0时,shipment_avariable会返回false,就不会进入while下的语句(即不会放弃当前线程) while (shipment_available()) std::this_thread::yield(); std::unique_lock<std::mutex> lck(mtx); cargo = i+1; cv.notify_one(); } consumer_thread.join();, return 0; }
说明:
- 主线程中的while,要在cargo为0时才会执行;
- 每次cargo置0后,子线程consumer_thread会解锁,主线程得以执行;
- 且每次cargo被置0后,wait就会启动等待。
3.2、wait_for
与wait()类似,不过wait_for()可以指定等待一个时间段,在当前线程收到notify或者rel_time超时之前,该线程都会处于阻塞状态,而一旦超时或收到通知,wait_for返回,剩下的处理步骤与wait类似。
template <class Rep, class Period> cv_status wait_for(unique_lock<mutex>&lck, const chrono::duration<Rep,Period>&rel_time);
另外,wait_for()的重载版本的最后一个参数pred表示wait_for的预测条件,只有当pred为false时,调用wait_for()才会阻塞当前线程,并且在收到其他线程的通知后,只有当pred为true时才会解除阻塞。
template<class Rep, class Period> cv_status wait_for(unique_lock<mutex>&lck, const chrono::duration<Rep, Period>&rel_time, Predicate pred);
示例:
#include<iostream> #include<thread> #include<chrono> #include<mutex> #include<condition_variable> using namesapce std; condition_variable cv; int value; void readvalue(){ cin>>value; cv.notify_one(); } int main(){ cout<<"请输入一个整数:"<<endl; thread th(readvalue); mutex mtx; unique_lock<mutex>lck(mtx); while(cv.wait_for(lck,chrono::seconds(1)) == cv_status::timeout){ cout<<"."<<endl; } cout<<"输入的整数为:"<<value<<endl; th.join(); return 0; }
说明:
通知或超时都会解锁,所以主线程会一直输出。
4、线程池
4.1、线程池的概念
在一个程序中,如果我们需要多次使用线程,这就意味着,需要多次的创建和销毁,而创建线程的过程必定会消耗内存,线程过多会带来调用的开销,进而影响缓存局部性能和整体性能。
所存在的问题如下:
- 创建太多线程,会浪费一定的资源,有些线程没有得到充分利用;
- 销毁太多线程,会导致之后再浪费时间重新进行创建;
- 创建线程太慢,会导致长时间的等待,弱化性能;
- 销毁线程太慢,会导致其他线程饥饿。
而线程池的作用就体现出来了,它维护着多个线程,避免了在处理短时间任务时,创建与销毁线程的代价。
4.2、线程池的实现
在程序开始运行前就创建多个线程,这样,在程序运行时,只需要从线程池中拿来用就可以了,大大提高了程序运行效率。
一般线程池都由以下几个部分构成:
- 线程池管理(ThreadPoolManager):用于创建并管理线程池,也就是线程池类;
- 工作线程(WorkThread):线程池中线程;
- 任务队列task:用于存放没有处理的任务,提供一种缓冲机制;
- append:用于添加任务的接口。
线程池实现代码:
#ifndef _THREADPOOL_H #define _THREADPOOL_H #include <vector> #include <queue> #include <thread> #include <iostream> #include <stdexcept> #include <condition_variable> #include <memory> //unique_ptr #include<assert.h> const int MAX_THREADS = 1000; //最大线程数目 template <typename T> class threadPool { public: threadPool(int number = 1);//默认开一个线程 ~threadPool(); std::queue<T *> tasks_queue; //任务队列 bool append(T *request);//往请求队列<task_queue>中添加任务<T *> private: //工作线程需要运行的函数,不断的从任务队列中取出并执行 static void *worker(void *arg); void run(); private: std::vector<std::thread> work_threads; //工作线程 std::mutex queue_mutex; std::condition_variable condition; //必须与unique_lock配合使用 bool stop; };//end class //构造函数,创建线程 template <typename T> threadPool<T>::threadPool(int number) : stop(false) { if (number <= 0 || number > MAX_THREADS) throw std::exception(); for (int i = 0; i < number; i++) { std::cout << "created Thread num is : " << i <<std::endl; work_threads.emplace_back(worker, this);//添加线程 //直接在容器尾部创建这个元素,省去了拷贝或移动元素的过程。 } } template <typename T> inline threadPool<T>::~threadPool() { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; condition.notify_all(); for (auto &ww : work_threads) ww.join();//可以在析构函数中join } //添加任务 template <typename T> bool threadPool<T>::append(T *request) { /*操作工作队列时一定要加锁,因为他被所有线程共享*/ queue_mutex.lock();//同一个类的锁 tasks_queue.push(request); queue_mutex.unlock(); condition.notify_one(); //线程池添加进去了任务,自然要通知等待的线程 return true; } //单个线程 template <typename T> void *threadPool<T>::worker(void *arg) { threadPool *pool = (threadPool *)arg; pool->run();//线程运行 return pool; } template <typename T> void threadPool<T>::run() { while (!stop) { std::unique_lock<std::mutex> lk(this->queue_mutex); /* unique_lock() 出作用域会自动解锁 */ this->condition.wait(lk, [this] { return !this->tasks_queue.empty(); }); //如果任务为空,则wait,就停下来等待唤醒 //需要有任务,才启动该线程,不然就休眠 if (this->tasks_queue.empty())//任务为空,双重保障 { assert(0&&"断了");//实际上不会运行到这一步,因为任务为空,wait就休眠了。 continue; } else { T *request = tasks_queue.front(); tasks_queue.pop(); if (request)//来任务了,开始执行 request->process(); } } } #endif
说明:
- 构造函数创建所需要的线程数;
- 一个线程对应一个任务,任务可能随时完成,线程则可能休眠,所以用任务队列queue实现(线程数量有限),线程采用wait机制;
- 任务在不断地添加,有可能大于线程数,处于队首的任务先执行;
- 只有添加任务(append)后,才开启condition.notify_one();
- wait表示任务为空时,线程休眠,等待新任务的加入;
- 添加新任务时需要添加锁,因为共享资源。
测试代码:
#include "mythread.h" #include<string> #include<math.h> using namespace std; class Task { public: void process() { //cout << "run........." << endl; //测试任务数量 long i=1000000; while(i!=0) { int j = sqrt(i); i--; } } }; int main(void) { threadPool<Task> pool(6);//6个线程,vector std::string str; while (1) { Task *tt = new Task(); //使用智能指针 pool.append(tt);//不停的添加任务,任务是队列queue,因为只有固定的线程数 cout<<"添加的任务数量: "<<pool.tasks_queue.size()<<endl;; delete tt; } }