0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

线程池的应用

科技绿洲 来源:Linux开发架构之路 作者:Linux开发架构之路 2023-11-10 11:07 次阅读

线程池的应用

在我认知中,任何网络服务器都是一个死循环。这个死循环长下面这个样子。

图片

基本上服务器框架都是基于这个架构而不断开发拓展的。

这个死循环总共分为四个步骤,可以涵盖所有客户端的需求,然而目前绝大多数企业不会用这样的架构。

问题在于容易产生阻塞。

作为客户端,我们当然希望访问服务器的时候,能够在短时间内收到回复,意味着自己连接上了该服务器。但是上述架构却很容易产生响应延迟。

当某一个连接的2.3.4时间过长,也许是因为客户上传了很大的数据,也许是因为业务处理起来比较麻烦,需要计算很多东西,也许是客户需要下载很大的东西,总之只要2,3,4的时间延迟,意味着下一个循环处理其它连接的动作也会被无限延迟。

也就是说,系统需要处理一个很慢的客户端的连接,后面的所有连接,哪怕只是耗时很短的任务,都需要等这个很慢的任务完成才能进行。

所以除了像redis的服务器,数据库都是基于hash的key-value结构,业务处理起来十分快速,才会将1,2,3,4都在一个线程中完成,其它的服务器若要提供千万乃至亿级别的客户接入量,必须更快地处理客户的连接,解除1和234之间的耦合,这才引入了多线程,我的主线程只负责1,然后将2,3,4分发到其它线程中执行。

然而,如果服务器选择这种多线程架构,当我们面临着巨大的客户端流量,则势必需要频繁地创建和销毁线程,这个过程十分浪费系统资源,还容易造成系统崩溃,然后老板震惊,被迫毕业,流落街头,思之令人发笑。

解决办法就是线程池。

我们预先创建好一系列线程,就好比后宫佳丽三千,然后皇上(线程池中枢)来了兴致(收到任务),就去翻一个妃子(线程池中某个线程)的牌子。妃子(线程)解决完需求后,回到后宫(线程池),等待下一次召唤。

不用创建和销毁,而是回收利用,所有池式结构都可以看做是一种对资源调度的缓冲,这就是线程池的精髓。

线程池设计

我们手撕线程池,目的还是搞懂基本原理,不弄太多花里胡哨的架构,比如工厂模式之类的。

当前这个版本的线程池是基于互斥锁和条件变量实现的。

预告(画饼):无锁线程池后续也会手撕。

线程池总体上可以分为三大组件。

  • 任务队列(存还没有执行的任务)
  • 执行队列(可以看成就是线程池,存放着可以用来执行任务的线程)
  • 线程池管理中枢(负责封装前两个类,任务的分发,线程池的创建,销毁,等等。对外提供统一的接口

其工作流程大概如图所示

图片

任务队列节点数据结构

//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

任务队列负责存还没有执行的业务,我们可以将每个业务都抽象成一个函数,每个函数自然有可能需要参数

所以任务队列的节点需要两个成员:

  • taskCallback:函数回调,执行客户端想要的业务。
  • user_data:函数参数,包含客户端的信息,比如socketfd等。

顺便提供了一个接口,可以修改回调函数。

执行队列节点数据结构

//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
  • tid:每个节点都对应一个线程,所以需要一个id成员来存线程id,
  • usable:这个成员非常妙,它代表当前线程是否可用,默认为true,一旦设置为false,则该线程会结束。
  • 使用usable可以在最后销毁线程池的时候,以一种优雅的方式结束每个线程,而代替pthread_cancel这种强制销毁线程的方式,因为你不知道线程中的任务是否处理完,强制销毁就会使某些业务中断。
  • pool:这个成员是指向中枢管理(后面会讲)的指针,主要是为了在每个线程中通过pool获取到一个全局(对于所有线程池线程共享)的互斥锁和条件变量。
  • start:是线程池对象执行的一个实现线程再回收利用的任务循环。具体实现代码也是在后面会讲。

线程池管理中枢设计

总体结构:

class ThreadPool{
public:
//-任务队列和执行队列
deque task_queue;
deque exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count);
//-创建线程池
void createPool();
//-加入任务
void push_task(void(*tcb)(void* arg),int i);
//-利用析构销毁线程池
~ThreadPool();
};*>*>

关于数据成员:

  • task_queue、exec_queue: 任务队列和执行队列,我使用deque作为容器实现队列。
  • cont:所有线程共享的条件变量。
  • mutex:所有线程共享的互斥锁。
  • thread_count: 线程池创建的时候,初始大小

关于成员方法:

  • ThreadPool:构造函数
  • createPool:创建线程池
  • push_task: 给服务器主循环用的,给线程池添加任务。
  • ~ ThreadPool: 销毁线程池,事实上应该单独定义一个destroyPool的api,我这里为了简便合并到析构中了。

ExecEle的start函数实现

现在对于ThreadPool对象有概念以后,可以先将刚刚执行队列节点ExecEle的start函数实现,其代表了每个线程池的线程始终在跑的循环,在无任务分配的时候阻塞在某个位置。

void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}

arg参数指向的是该线程函数对应的执行元素ExecEle本身的指针,我们定义其为ee。然后进入死循环,通过ee,我们可以获得线程池中枢对象pool。

通过pool。我们可以获得任务队列的情况,当任务队列为空,则线程进入阻塞状态,等待任务队列有任务进来后,通过条件变量通知,再恢复执行。

恢复执行后,从任务队列中取出队首的任务,这个过程需要在mutex的范围内,保证独占性。

之后解除互斥锁,开始执行任务的回调。执行完进行入下个循环,尝试再次获得互斥锁。

最后说一说usable,当我们销毁线程池的时候,设置每一个线程的usable为false,那么不会立刻中断每个线程正在执行的回调,而是等回调结束后,在下一次循环中如果检测到usable为false后,就会退出整个大循环,并释放自己的锁,唤醒线程池其它休眠的线程。退出大循环后,线程自然而优雅地结束。

之后是ThreadPool自己的api实现

构造函数ThreadPool实现:

ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}

主要为了初始化cont,mutex,thread_count。

创建线程池createPool实现:

void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){

通过pthread_create创建thread_count个线程,每个线程执行自己的start函数进入等待任务循环,并阻塞在锁和条件变量的位置。将exec对象push进执行队列。

添加任务 push_task实现:

void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);
}

主要功能是构造TaskEle对象并加入到执行队列中。

每个TaskEle可能需要执行不同的业务,所以push_task需要传入对应业务的回调tcb(task callback)

i是我加的额外参数,代表主线程中连接的客户端编号,其意义可以是socketfd。

注意在修改执行队列(push)的时候,需要加锁保证独占。

销毁线程池~ ThreadPool 实现:

~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){

先将所有线程的usable设置为false,之后加锁,清空任务队列,并通过条件变量通知所有线程,等所有线程退出后,销毁执行队列,销毁锁和条件变量。

业务代码和服务器主循环

//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}

随便写的线程执行的业务,打印一下客户信息。

主线程创建100大小的线程池,并添加1000个任务(连接)。

完整代码

// *C++和posix接口实现一个线程池
//-三个组件:任务队列,执行队列,线程池(中枢管理)

#include
#include
#include
#include
#include
#include
#include
#include
#include

using namespace std;

//-打印线程错误专用,根据err来识别错误信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}

class ThreadPool;//-声明

//- 任务队列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

//-执行队列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};

//-线程池
class ThreadPool{
public:
//-任务队列和执行队列
deque task_queue;
deque exec_queue;
//-条件变量
pthread_cond_t cont;
//-互斥锁
pthread_mutex_t mutex;
//-线程池大小
int thread_count;
//-构造函数
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化条件变量和互斥锁
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始执行队列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任务
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加锁
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知执行队列中的一个进行任务
pthread_cond_signal(&cont);
//-解锁
pthread_mutex_unlock(&mutex);

}
//-销毁线程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任务队列
task_queue.clear();
//-广播给每个执行线程令其退出(执行线程破开循环会free掉堆内存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-让其他线程拿到锁
//-等待所有线程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空执行队列
exec_queue.clear();
//-销毁锁和条件变量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};

void* ExecEle::start(void*arg){
//-获得执行对象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加锁
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任务队列为空,等待新任务
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解锁
pthread_mutex_unlock(&(ee->pool -> mutex));
//-执行任务回调
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-删除线程执行对象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}


//-线程执行的业务函数
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-创建线程池
ThreadPool pool(100);
pool.createPool();
//-创建任务
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    12

    文章

    8352

    浏览量

    83542
  • 数据库
    +关注

    关注

    7

    文章

    3650

    浏览量

    63758
  • 线程池
    +关注

    关注

    0

    文章

    54

    浏览量

    6783
收藏 人收藏

    评论

    相关推荐

    Java中的线程包括哪些

    线程是用来统一管理线程的,在 Java 中创建和销毁线程都是一件消耗资源的事情,线程可以重复
    的头像 发表于 10-11 15:33 644次阅读
    Java中的<b class='flag-5'>线程</b><b class='flag-5'>池</b>包括哪些

    线程是如何实现的

    线程的概念是什么?线程是如何实现的?
    发表于 02-28 06:20

    基于线程技术集群接入点的应用研究

    本文在深入研究高级线程技术的基础上,分析、研究了固定线程数目的线程线程数目动态变化的
    发表于 01-22 14:21 5次下载

    如何正确关闭线程

    前言本章分为两个议题 如何正确关闭线程 shutdown 和 shutdownNow 的区别 项目环境jdk 1.8 github 地址:https://github.com
    的头像 发表于 09-29 14:41 9632次阅读

    基于Nacos的简单动态化线程实现

    本文以Nacos作为服务配置中心,以修改线程核心线程数、最大线程数为例,实现一个简单的动态化线程
    发表于 01-06 14:14 706次阅读

    线程线程

    线程通常用于服务器应用程序。 每个传入请求都将分配给线程池中的一个线程,因此可以异步处理请求,而不会占用主线程,也不会延迟后续请求的处理
    的头像 发表于 02-28 09:53 551次阅读
    多<b class='flag-5'>线程</b>之<b class='flag-5'>线程</b><b class='flag-5'>池</b>

    Java线程核心原理

    看过Java线程源码的小伙伴都知道,在Java线程池中最核心的类就是ThreadPoolExecutor,
    的头像 发表于 04-21 10:24 675次阅读

    细数线程的10个坑

    JDK开发者提供了线程的实现类,我们基于Executors组件,就可以快速创建一个线程
    的头像 发表于 06-16 10:11 544次阅读
    细数<b class='flag-5'>线程</b><b class='flag-5'>池</b>的10个坑

    线程的两个思考

    今天还是说一下线程的两个思考。 池子 我们常用的线程, JDK的ThreadPoolExecutor. CompletableFutures 默认使用了
    的头像 发表于 09-30 11:21 2755次阅读
    <b class='flag-5'>线程</b><b class='flag-5'>池</b>的两个思考

    Spring 的线程应用

    我们在日常开发中,经常跟多线程打交道,Spring 为我们提供了一个线程方便我们开发,它就是 ThreadPoolTaskExecutor ,接下来我们就来聊聊 Spring 的线程
    的头像 发表于 10-13 10:47 390次阅读
    Spring 的<b class='flag-5'>线程</b><b class='flag-5'>池</b>应用

    线程基本概念与原理

    一、线程基本概念与原理 1.1 线程概念及优势 C++线程简介
    的头像 发表于 11-10 10:24 337次阅读

    线程的基本概念

    线程的基本概念 不管线程是什么东西!但是我们必须知道线程被搞出来的目的就是:提高程序执行效
    的头像 发表于 11-10 16:37 321次阅读
    <b class='flag-5'>线程</b><b class='flag-5'>池</b>的基本概念

    线程三大核心参数的含义 线程核心线程数制定策略

    以上考点作为线程面试几乎必问的内容,大部分人应该都是如数家珍,张口就来,但是懂了面试八股文真的就不一定在实际运用中真的就会把线程用好 。
    的头像 发表于 12-01 10:20 361次阅读
    <b class='flag-5'>线程</b><b class='flag-5'>池</b>三大核心参数的含义 <b class='flag-5'>线程</b><b class='flag-5'>池</b>核心<b class='flag-5'>线程</b>数制定策略

    线程的创建方式有几种

    线程是一种用于管理和调度线程的技术,能够有效地提高系统的性能和资源利用率。它通过预先创建一组线程并维护一个工作队列,将任务提交给线程
    的头像 发表于 12-04 16:52 526次阅读

    什么是动态线程?动态线程的简单实现思路

    因此,动态可监控线程一种针对以上痛点开发的线程管理工具。主要可实现功能有:提供对 Spring 应用内线程
    的头像 发表于 02-28 10:42 303次阅读