今天让我们一起来探讨一下多路复用在redis中的具体实现原理。
R**edis客户端和服务端的连接访问介绍**
之前的文章中我们有稍微提到过,Redis的客户端和服务端之间是通过resp协议进行链接通信的。这种通信的本质其实是借助了socket套接字进行网络连接。如下图所示:
当客户端的jedis和redis服务器进行连接的时候,首先需要在对应操作系统的内核中建立连接,接着才会将对应的链接分配到指定的redis服务端程序当中。假设redis采用了默认的6379端口,那么对应的链接请求就会发送到对应的进程上。
如何理解多路复用
当请求处理完毕之后,这一整条的链接链路不会立马断开,而为维持长链接的状态,方便后续继续传输数据使用。
为了方便大家对于多路复用能有个更好的认识,我们接下来通过一段Java程序来认识多路复用的实际应用案例。
Java模拟实现服务端链接
这是一段非常简单的BIO程序,大概思路便是接纳外界链接请求,然后打印数据到控制台。
package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author linhao
* @Date created in 11:16 上午 2021/8/15
*/
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6378);
while (true) {
System.out.println("wait conn --------");
Socket socket = serverSocket.accept();
System.out.println("conn ok ---------");
byte[] bytes = new byte[1024];
System.out.println("wait data ---------");
socket.getInputStream().read(bytes);
System.out.printf("data is {}" + new String(bytes));
}
}
}
测试的方式我采用了nc命令进行模拟:
【idea @ Mac】>>>>>>nc localhost 6378
a
当请求到达服务端的时候,也会在控制台有所打印输出:
上边代码的哪些点是堵塞的位置?
serverSocket.accept(); //堵塞部分
socket.getInputStream().read(bytes); //堵塞部分
我们通过实操可以发现,在accept的位置会出现第一次堵塞的情况,当没有外界连接接入的时候,程序就会堵塞在该位置。
第二次堵塞的位置是在read函数调用的位置,当连接建立之后,服务端会一直等待客户端发送的数据请求,因此一直处于等待状况。
如果第一个连接建立之后迟迟不发送数据,后续又有第二个连接接入会出现什么问题?
为了模拟这个场景,我们来看下以下测试案例:
首先是服务端程序的部分改造,对于客户端的请求,我这里新增了一个handleData函数,用于模拟处理数据时候的堵塞情况(耗时大约为2秒左右)
package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author linhao
* @Date created in 11:16 上午 2021/8/15
*/
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6378);
while (true) {
System.out.println("wait conn --------");
Socket socket = serverSocket.accept();
System.out.println("conn ok --------- " + System.currentTimeMillis());
byte[] bytes = new byte[1024];
System.out.println("wait data ---------");
socket.getInputStream().read(bytes);
handleData();
System.out.printf("data is {}" + new String(bytes));
}
}
/**
* 处理数据信息
*/
public static void handleData(){
System.out.println("handling data begin ----------");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("handling data end ----------");
}
}
接着便是客户端部分的模拟,模拟五个客户端的连接服务端,并且发送数据:
package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Author linhao
* @Date created in 11:17 上午 2021/8/15
*/
public class BioClient {
public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress("localhost", 6378));
socket.getOutputStream().write("this is test".getBytes());
System.out.println("send data -----------" + System.currentTimeMillis());
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
threadPoolExecutor.submit(t);
}
Thread.yield();
}
}
通过请求的日志记录可以发现,服务端对于数据的处理时间点如下:
也就是说当上一个连接在处理请求的时候,下一个连接发送的数据会处于一个就绪状态,单线程依次执行。
多线程异步等待模型改造
将服务端处理线程请求的模块开启单独的线程进行处理。
服务端程序进行改造:
package org.idea.iedis.framework.server.io.core;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author linhao
* @Date created in 11:16 上午 2021/8/15
*/
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(6378);
while (true) {
System.out.println("wait conn --------");
Socket socket = serverSocket.accept();
System.out.println("conn ok --------- " + System.currentTimeMillis());
byte[] bytes = new byte[1024];
System.out.println("wait data ---------");
socket.getInputStream().read(bytes);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
handleData();
}
});
t.start();
System.out.printf("data is {}" + new String(bytes));
}
}
/**
* 处理数据信息
*/
public static void handleData(){
System.out.println("handling data begin ----------");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("handling data end ----------");
}
}
此时控制台打印出来的结果如下所示:
看起来似乎是请求处理的效率变高了,但是实际上这种方案对于性能的开销是比较大的。
其实在tomcat7之前,其服务端的程序一直都是采用bio的模式来进行处理请求数据,每次发送请求之后都需要额外创建一个线程处理数据,因此其并发处理能力并不友好。
改善思路
有了这两个思路点之后,我们再来深入思考:
假设我们是jdk的开发者,需要对现有的代码做怎样的调整?
这里分享一下我自己的思路:
首先ServerSocket是一个已经存在于JDK内部的对象,而且不建议随意对已有的api进行调整,否则会对业界使用者造成一个不兼容的情况。
因此不妨可以通过一些setter方法去修改阻塞的设置项:
来看看实际JDK开发者是如何改良这部分设计的:
首先是使用nio搭建一个简单的服务端代码案例:
package org.idea.iedis.framework.server.io.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
/**
* @Author linhao
* @Date created in 8:41 上午 2021/8/16
*/
public class NioServer {
static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("localhost", 6666);
serverSocketChannel.bind(socketAddress);
serverSocketChannel.configureBlocking(false);
//有点类似于poll模型
while (true) {
for (SocketChannel socketChannel : channelList) {
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("read ... " + read);
byteBuffer.flip();
byte[] bs = new byte[read];
byteBuffer.get(bs);
String content = new String(bs);
System.out.println(content);
byteBuffer.flip();
}
}
SocketChannel accept = serverSocketChannel.accept();
if (accept != null) {
System.out.println("conn success -----");
accept.configureBlocking(false);
channelList.add(accept);
System.out.println(channelList.size() + "---- list --- size");
}
}
}
}
接着是基于这套服务端代码去编写相关的客户端程序脚本:
package org.idea.iedis.framework.server.io.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @Author linhao
* @Date created in 8:40 上午 2021/8/16
*/
public class NioClient {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 6666));
socketChannel.configureBlocking(false);
socketChannel.write(ByteBuffer.wrap("this is a test".getBytes()));
System.out.println("-----");
}
}
上边的这段代码总体的执行思路其实可以抽像化为以下模式:(假设有三个链接请求)
select (3 sockets){
for(Socket socket: sockets) {
int read = socket.getInputStream();
if(read >0){
// 对应处理代码
} else {
// 对应处理代码
}
}
}
但是这样的代码我们在进行实际落地实现的时候不可能采用Java程序去实现,通常都是在os底层来进行封装,然后供高级语言去调用。
思考:
假设构建了100w条链接,那么对于这一批的链接采用暴力的list遍历方式去询问是否有新数据的写入就会显得效率异常低下。
改善点:
假设当我们建立好了链接请求之后,每个链接都会订阅某个事件,假设是有数据写入的时候,主动去发布这个事件,这样就能避免轮训操作了。
NIO内部如何解决多路复用机制
从这个角度思考出发,我们一起来看下nio内部是如何实现多路复用机制的,
当有新的请求过来的时候,server会提前将accept事件委托给selector处理。
selector会根据连接的事件类型(例如read,write,accept)去通知到具体的socketserver。
来看看案例代码:
服务端新增一个selector代码
package org.idea.iedis.framework.server.io.core.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
/**
* @Author linhao
* @Date created in 8:27 上午 2021/8/17
*/
public class NioSelectorServer {
static ArrayList<SocketChannel> socketChannels = new ArrayList<>();
static ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
SocketAddress socketAddress = new InetSocketAddress("localhost", 6555);
serverSocket.configureBlocking(false);
serverSocket.bind(socketAddress);
Selector selector = Selector.open();
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("start select!");
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel socketChannel = serverSocket.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("client is connected!");
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = socketChannel.read(byteBuffer);
if (len > 0) {
System.out.println("receive data!" + new String(byteBuffer.array(),0,len));
} else if (len == -1) {
System.out.println("client is close");
socketChannel.close();
}
}
iterator.remove();
}
}
}
}
客户端向服务端发送数据的案例代码:
package org.idea.iedis.framework.server.io.core.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @Author linhao
* @Date created in 8:53 上午 2021/8/17
*/
public class NioSelectorClient {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",6555));
socketChannel.configureBlocking(false);
while (true) {
System.out.println("client send data");
socketChannel.write(ByteBuffer.wrap(("this is success " + Thread.currentThread().getName()).getBytes()));
Thread.sleep(2500);
}
}
}
从上边服务端的程序案例可以看出,首先获取到链接之后,会将一个接收事件(accept)给注册到一个叫做selector的组件上。接下来后续这个链接如果有发送请求到数据过来就会被识别是read事件,此时selector再会去通知具体到服务线程处理。大概到思路如下图所示:
思考:
selector在进行select的时候,会发生什么情况?
如果外界没有额外请求进入的话,那么select函数会进入一个堵塞等待的状态。看起来效果有些类似于bio的accept函数。
selector调用open函数,它会发生了什么变化?
我们对selector的open函数源代码进行深入挖掘,可以发现具体如下所示:
在其源代码的底部调用了JDK内部的sun.nio.ch.DefaultSelectorProvider.create()函数。这个函数的代码底层实际上会调用到jdk内部的一些函数,再由jdk内部去调用一些native方法,最终会调用到os中的接口,假设该程序所运作的os环境是linux操作系统,那么最底层就会调用到epool相关的三个函数。
epoll_create,epoll_ctl,epoll_wait。
Redis底层源代码中是如何实现多路复用的
这里我打开的是redis6.0的源代码进行阅读:
文件名称:ae_epoll.c
ps:由于自己对于C源代码并不是特别熟悉,所以这里只能简单说下
下边的这张截图中体现了使用epoll_create函数取创建一个链接。
这段代码则是使用了epollo_ctl函数来处理一些事件的注册处理。
下边的epoll_wait看起来则像是监听某些兴趣事件。
由于文章篇幅有限,后边会接着出一篇文章继续深入研究下多路复用的内容。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/cExk2nxaDKznOr1eQwFsQw
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。