redis7.0源码阅读:Redis中的IO多线程(线程池)
ccwgpt 2024-10-01 08:16 23 浏览 0 评论
一、Redis中的IO多线程原理
服务端收到一条信息,给它deconde成一条命令
然后根据命令获得一个结果(reply)
然后将结果encode后,发送回去
redis的单线程是指,命令执行(logic)都是在单线程中运行的
接受数据read和发送数据write都是可以在io多线程(线程池)中去运行
在Redis中,生产者也可以作为消费者,反之亦然,没有明确界限。
二、设置io多线程(调试设置)
在redis.conf中
设置io-threads-do-reads yes就可以开启io多线程
设置io-threads 2,设置为2(为了方便调试,真正使用的时候,可以根据需要设置),其中一个为主线程,另外一个是io线程
在networking.c中找到stopThreadedIOIfNeeded,如果在redis-cli中输入一条命令,是不会执行多线程的,因为它会判断,如果pending(需要做的命令)个数比io线程数少,就不会执行多线程
因此提前return 0,确保执行多线程,便于调试
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
return 0;//为了调试,提前退出(自己添加的一行)
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
到此为止,只需要,运行redis-server,在networking.c的 readQueryFromClient中打个断点,然后在redis-cli中输入任意set key value就可以进入io多线程,进行调试
下图可以看到箭头指向的两个线程,一个是主线程,另一个是io线程
相关视频推荐
从nginx、redis、skynet开源框架看线程池在后端开发的应用
学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
三、Redis中的IO线程池
1、读取任务readQueryFromClient
postponeClientRead(c)就是判断io多线程模式,并将任务添加到 任务队列中
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
//后面省略......
}
2、主线程将 待读客户端 添加到Read任务队列(生产者)postponeClientRead
如果是io多线程模式,那么将任务添加到任务队列。 (这个函数名的意思,延迟读,就是将任务加入到任务队列,后续去执行)
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
io_threads_op == IO_THREADS_OP_IDLE)
{
listAddNodeHead(server.clients_pending_read,c);//往任务队列中插入任务
c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
}
}
3、多线程Read IO任务 handleClientsWithPendingReadsUsingThreads
基本原理和多线程Write IO是一样的,直接看多线程Write IO就行了。
其中processInputBuffer是解析协议
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
io_threads_op = IO_THREADS_OP_IDLE;
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
listDelNode(server.clients_pending_read,ln);
c->pending_read_list_node = NULL;
serverAssert(!(c->flags & CLIENT_BLOCKED));
if (beforeNextClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
/* Once io-threads are idle we can update the client in the mem usage buckets */
updateClientMemUsageBucket(c);
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
if (processInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
4、多线程write IO任务(消费者)handleClientsWithPendingWritesUsingThreads
1.判断是否有必要开启IO多线程
2.如果没启动IO多线程,就启动IO多线程
3.负载均衡:write任务队列,均匀分给不同io线程
4.启动io子线程
5.主线程执行io任务
6.主线程等待io线程写结束
/* This function achieves thread safety using a fan-out -> fan-in paradigm:
* Fan out: The main thread fans out work to the io-threads which block until
* setIOPendingCount() is called with a value larger than 0 by the main thread.
* Fan in: The main thread waits until getIOPendingCount() returns 0. Then
* it can safely perform post-processing and return to normal synchronous
* work. */
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {//判断是否有必要开启IO多线程
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();//开启io多线程
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);//创建一个迭代器li,用于遍历任务队列clients_pending_write
int item_id = 0;//默认是0,先分配给主线程去做(生产者也可能是消费者),如果设置成1,则先让io线程1去做
//io_threads_list[0] 主线程
//io_threads_list[1] io线程
//io_threads_list[2] io线程
//io_threads_list[3] io线程
//io_threads_list[4] io线程
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);//取出一个任务
c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {//表示该客户端的输出缓冲区超过了服务器允许范围,将在下一次循环进行一个关闭,也不返回任何信息给客户端,删除待读客户端
listDelNode(server.clients_pending_write, ln);
continue;
}
/* Since all replicas and replication backlog use global replication
* buffer, to guarantee data accessing thread safe, we must put all
* replicas client into io_threads_list[0] i.e. main thread handles
* sending the output buffer of all replicas. */
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
listAddNodeTail(io_threads_list[0],c);
continue;
}
//负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了
//这样做的好处是,避免加锁。当前是在主线程中,进行分配任务
//通过取余操作,将任务均分给不同io线程
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);//设置io线程启动条件,启动io线程
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);//让主线程去处理一部分任务(io_threads_list[0])
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {//剩下的任务io_threads_list[1],io_threads_list[2].....给io线程去做,等待io线程完成任务
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);//等待io线程结束,并返回处理的数量
if (pending == 0) break;
}
io_threads_op = IO_THREADS_OP_IDLE;
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Update the client in the mem usage buckets after we're done processing it in the io-threads */
updateClientMemUsageBucket(c);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
负载均衡:将任务队列中的任务 添加 到不同的线程消费队列中去,每个线程就可以从当前线程的消费队列中取任务就行了。这样做的好处是,避免加锁。当前是在主线程中,进行分配任务通过取余操作,将任务均分给不同的io线程。
四、线程调度
1、开启io线程startThreadedIO
每个io线程都有一把锁,如果主线程把锁还回去了,那么io线程就会启动,不再阻塞
并设置io线程标识为活跃状态io_threads_active=1
void startThreadedIO(void) {
serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
2、关闭io线程stopThreadedIO
每个io线程都有一把锁,如果主线程拿了,那么io线程就会阻塞等待,也就是停止了IO线程 并设置io线程标识为非活跃状态io_threads_active=0
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);//
server.io_threads_active = 0;
}
相关推荐
- 如何为Hadoop选择最佳弹性MapReduce框架
-
ZDNet至顶网服务器频道07月22日新闻消息:亚马逊Web服务的弹性MapReduce是一项基于Hadoop的实施,它可允许你运行大型的预处理工作,如格式转换和数据聚合等。虽然我们可以选择很多的...
- 《平安小猪》:J.K.罗琳用“魔法”放大的真实
-
对很多孩子来说,某些玩具是抚慰心灵的“忠实伙伴”,几乎无可替代。J.K.罗琳在看到儿子大卫对玩偶小猪的依恋后创作了“平安小猪”的故事,这也是她自《哈利·波特》之后创作的首部儿童长篇小说。男孩杰克在平安...
- 一页纸精华 | HDFS
-
要入门大数据,最好的办法就是理清hadoop的生态系统。本期为你介绍分布式文件系统HDFS。ApacheHadoop2.0生态系统如下图所示:Hadoop2.0生态系统图Hadoop核心项目包括:H...
- 谷歌搁置与法国出版商的协议,将等候反垄断裁定
-
据路透社6月29日消息,两位知情消息人士称,谷歌搁置了与一些法国出版商达成的为新闻内容付费的初步协议,将等待反垄断审议结果。该决定可能为欧洲在线新闻的版权谈判定下基调。文件显示,按照谷歌与法国新闻总联...
- Java 微服务从源码实战开始 | Gitee 项目推荐
-
在软件开发的不同时期、阶段,对技术架构的理解、选择和应用都有着不一样的诉求。微服务架构是当前互联网业界的一个技术热点,它的思想也更符合我们的目标:根据业务模块划分服务种类。每个服务可以独立部署并且互相...
- 快讯|谷歌搁置向法国出版商付费协议:等待反垄断决定
-
财经网科技6月30日讯,据新浪科技消息,两位知情人士透露,谷歌已经搁置此前与一些法国出版商达成的为新闻内容付费的初步协议。因为谷歌正在等待一项反垄断决定,这项决定可能会为该公司的欧洲在线新闻版权谈判定...
- 外媒:谷歌搁置与法国出版商的协议 等候反垄断决定
-
路透中文网30日报道,据两位知情消息人士透露,谷歌GOOGL.O搁置了与一些法国出版商达成的为新闻内容付费的初步协议,等待一项反垄断决定。该决定可能为欧洲在线新闻的版权谈判定下基调。报道显示,根据路透...
- 大数据任务调度框架Oozie
-
Oozie(驯象人)是一个基于工作流引擎的开源框架,由Cloudera公司贡献给Apache,提供对HadoopMapReduce、PigJobs的任务调度与协调。Oozie需要部署到JavaS...
- 惊了!SpringBoot 3.4 触雷,升级后参数绑定竟悄悄破坏你的代码?
-
背景在微服务架构中,我们经常利用HTTP请求头来控制系统行为,比如实现灰度发布和流量控制。在PIG微服务框架中,我们通过重写SpringCloudLoadBalancer,根据请求he...
- 《终结者》:科幻电影巅峰的里程碑
-
在阅读此文之前,麻烦您点击一下“关注”,既方便您进行讨论和分享,又能给您带来不一样的参与感,感谢您的支持。文|庭芥摘要:本文以一位影评家的视角赏析詹姆斯·卡梅隆执导的经典科幻电影《终结者》。通过对该...
- AI已经越过红线?复旦大学:在知道自己将被关闭后,AI复制了自己
-
2024年12月9日,复旦大学的一项研究引发了全球科技界的强烈关注。研究团队对Meta与阿里巴巴旗下的两个大型AI系统展开测试,结果发现,在知晓自身可能被关闭的情况下,它们居然选择自我复制。这不是普通...
- 重磅开源!LocalAI让你在个人电脑上运行AI大模型,无需显卡,已获28K Star!
-
随着AI技术的快速发展,如何在本地设备上高效运行AI模型成为了开发者关注的焦点。LocalAI开源项目提供了一个革命性的解决方案-它让用户能够在个人电脑上轻松部署和运行各种AI模型,并且完全兼容...
- 了解《终结者》的恐怖末日世界观,能让你看懂《终结者6》
-
相信很多人的科幻动作启蒙片,应该就是《终结者》系列,起码对于我来说,童年的暑假里,不止一次反复看着《终结者2》的电影,深深被影片中施瓦辛格的硬核铁汉形象吸引,也为片中的液态机器人着迷。《终结者》系列成...
- Golang底层是用什么语言编写的?
-
Go底层语言Go语言在1.5版本之前主要由汇编和C语言写的,C语言占比85%以上,另外有少量的周边模块如文档等,带了些htmlshellperl代码,可以忽略不计。1.5版本及之后...
- skynet服务的缺陷 lua死循环
-
服务端高级架构—云风的skynet这边有一个关于云风skynet的视频推荐给大家观看点击就可以观看了!skynet是一套多人在线游戏的轻量级服务端框架,使用C+Lua开发。skynet的显著优点是,...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- MVC框架 (46)
- spring框架 (46)
- 框架图 (58)
- bootstrap框架 (43)
- flask框架 (53)
- quartz框架 (51)
- abp框架 (47)
- jpa框架 (47)
- laravel框架 (46)
- springmvc框架 (49)
- 分布式事务框架 (65)
- scrapy框架 (56)
- shiro框架 (61)
- 定时任务框架 (56)
- java日志框架 (61)
- grpc框架 (55)
- ppt框架 (48)
- 内联框架 (52)
- winform框架 (46)
- gui框架 (44)
- cad怎么画框架 (58)
- ps怎么画框架 (47)
- ssm框架实现登录注册 (49)
- oracle字符串长度 (48)
- oracle提交事务 (47)