C++线程池

文章目录[x]
  1. 0.1:线程池原理:

线程池原理:

线程:

线程是一条执行路径,是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来处理。

一个正在运行的软件就是一个进程,一个进程可以同时运行多个任务,可以简单的认为进程是线程的集合。

线程是一条可以执行的路径。

对于单核CPU而言:多线程就是一个CPU在来回的切换,在交替执行。 对于多核CPU而言:多线程就是同时有多条执行路径在同时(并行)执行,每个核执行一个线程,多个核就有可能是一块同时执行的

线程带来了如此多的便利同时,当一个服务器面对高并发的请求时候,那么就会不断的创建线程,销毁线程,造成了不必要的性能浪费,那么有没有一种方法可以不用这样重复创建销毁呢

那就是引入这篇利用线程池来进行管理线程的创建和销毁来重复利用已经创建好的线程进行任务以避免频繁创建销毁的开销

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果任务队列为空则进入阻塞状态等待唤醒;如果所有线程池线程都始终保持繁忙,超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池主要分为三个部分:

  • 工作线程

    • 里面维护这一定数量的线程,不停的对任务队列进行读取,判断是否有任务

    • 如果任务队列为空则进入阻塞,等待唤醒

    • 如果任务队列有任务,那么则唤醒一个等待中的线程进行依次处理任务

  • 任务队列

    • 创建一个队列,对任务依次压入,并唤醒一个等待中的线程

    • 取出任务时候将该任务弹出

  • 结束线程

    • 在线程的析构函数当中进行结束线程

    • 唤醒所有等待当中的线程,并进行join结束

任务队列代码:

#include <queue>
#include <thread>

template<typename T>
class ThreadQueue{
private:
    std::queue<T> safeQueue;    //线程队列
    std::mutex queueLock;   //访问队列锁
public:
    //判断队列是否为空
    bool empty(){
    std::unique_lock<std::mutex> lock(queueLock);   //加锁,防止队列被改变
    return safeQueue.empty();
}
    //添加队列元素
    void add(T &t){
        std::unique_lock<std::mutex> lock(queueLock);
        safeQueue.template emplace(t);
    }
    //取出元素
    bool getQueue(T &t){
        std::unique_lock<std::mutex> lock(queueLock);
        if(safeQueue.empty())
            return false;
        t=std::move(safeQueue.front());
        safeQueue.pop();
        return true;
    }

};

线程工作代码:

#include "ThreadQueue.h"
#include <future>

class ThreadPool{
private:
    class ThreadWork{
    private:
        ThreadPool *in_pool;
    public:
        ThreadWork(ThreadPool *pool):in_pool(pool){
        }
        ThreadWork operator()(){
            std::function<void()> func;
            bool que;   //是否正在取出元素
            while (!in_pool->shut_thread){
                {
                    std::unique_lock<std::mutex> lock(in_pool->mutually_thread);

                    //如果任务为空就阻塞
                    if(in_pool->task_queue.empty()){
                        //等待唤醒
                        in_pool->notice_thread.wait(lock);
                    }
                    que=in_pool->task_queue.getQueue(func);
                    if(que){
                        func();
                    }
                }
            }
        }
    };
    bool shut_thread;   //判断线程池是否关闭
    std::vector<std::thread> thread_list;       //线程工作列表
    std::mutex mutually_thread;    //互斥锁
    std::condition_variable notice_thread;  //通知唤醒线程
    ThreadQueue<std::function<void()>> task_queue;      //任务队列
public:
    ThreadPool(const int n_thread=4){
        thread_list=std::vector<std::thread>(n_thread);
        shut_thread= false;
    }
    //删除默认拷贝构造函数
    ThreadPool(const ThreadPool&)=delete;
    ThreadPool(const ThreadPool&&)=delete;
    ThreadPool &operator=(const ThreadPool&)=delete;
    ThreadPool &operator=(const ThreadPool&&)=delete;

    //初始化线程池
    void init(){
        for(int i=0;i<thread_list.size();i++){
            //分配线程
            thread_list.at(i)=std::thread(ThreadWork(this));
        }
    }
    //结束关闭线程池
    ~ThreadPool(){
        shut_thread= true;

        //唤醒所有线程池
        notice_thread.notify_all();
        for(int i=0;i<thread_list.size();i++){
            //判断线程是否在等待
            if(thread_list.at(i).joinable()){
                thread_list.at(i).join();
            }
        }
    }
    template <typename F, typename... Args>
    auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
    {
        std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误

        auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);

        std::function<void()> warpper_func = [task_ptr]()
        {
            (*task_ptr)();
        };

        // 队列通用安全封包函数,并压入安全队列
        task_queue.add(warpper_func);

        // 唤醒一个等待中的线程
        notice_thread.notify_one();

        // 返回先前注册的任务指针
        return task_ptr->get_future();
    }
};

测试代码:

void test(){
    std::thread::id tid=std::this_thread::get_id();
    std::cout<<"test id:"<<tid<<std::endl;
}
void test1(){
    std::thread::id tid=std::this_thread::get_id();
    std::cout<<"test1 id:"<<tid<<std::endl;
}
void test2(){
    std::thread::id tid=std::this_thread::get_id();
    std::cout<<"test2 id:"<<tid<<std::endl;
}

int main()
{    // 创建3个线程的线程池
    ThreadPool pool(8);

    // 初始化线程池
    pool.init();
    std::thread::id tid=std::this_thread::get_id();
    std::cout<<"主线id"<<tid<<std::endl;
    pool.submit(test);
    pool.submit(test1);
    pool.submit(test2);

    return 0;
}
点赞
  1. 古乙丁三说道:

    牛啊🐮🐸

发表评论

昵称和uid可以选填一个,填邮箱必填(留言回复后将会发邮件给你)
tips:输入uid可以快速获得你的昵称和头像