Java NIO
BIO
首先回忆一下传统的服务器端同步阻塞I/O处理(也就是BIO,Blocking I/O)的经典编程模型:
{
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100); //线程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8080);
// 主线程死循环等待新连接到来
while(!Thread.currentThread.isInturrupted()){
// 接受新的socket
Socket socket = serverSocket.accept();
// 为新的连接创建新的线程
executor.submit(new ConnectIOnHandler(socket));
}
}
class ConnectIOnHandler extends Thread{
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
// 死循环处理读写事件
while(!Thread.currentThread.isInturrupted() && !socket.isClosed()){
// 读取数据
String someThing = socket.read()....
if(someThing!=null){
// 处理数据
...
// 写数据
socket.write()....
}
}
}
}
这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于
socket.accept()
、socket.read()
、socket.write()
三个主要函数都是同步阻塞的,当一个连接在处理 I/O 的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里。这也是为什么在做计网实验的时候单线程只能处理一次请求,如果再次发出请求服务器根本不会响应,因为服务器被阻塞在读写了。
- 多线程一般都使用线程池,可以让线程的创建和回收成本相对较低。在活动连接数不是特别高的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的I/O并且编程模型简单。
这个模型最本质的问题在于,严重依赖于线程。但线程是很”贵”的资源,主要表现在:
- 线程的创建和销毁成本很高
- 线程本身占用较大内存
- 线程的切换成本很高
NIO
所有的系统I/O都分为两个阶段:等待就绪和操作。举例来说,读函数,分为等待系统可读和真正的读;同理,写函数分为等待网卡可以写和真正的写。
传统的BIO里面 socket.read()
,如果TCP RecvBuffer
里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。
对于NIO,如果TCP RecvBuffer
有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。
最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。
换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。(解释地真地道)
NIO只有在连接/通道真正有读写事件发生时(事件驱动),才会进行读写,这就大大地减少了系统的开销。
NIO的核心 | 对应的类或接口 | 应用 | 作用 |
---|---|---|---|
缓冲区 | Buffer |
文件IO/网络IO | 存储数据 |
通道 | Channel |
文件IO/网络IO | 运输 |
选择器 | Selector |
网络IO | 控制器 |
单线程 Reactor
Reactor
模型中定义的三种角色:
Reactor
:负责监听和分配事件,将I/O事件分派给对应的Handler
。新的事件包含连接建立就绪、读就绪、写就绪等。Acceptor
:处理客户端新连接,并分派请求到处理器链中。Handler
:将自身与事件绑定,执行非阻塞读/写任务,完成channel
的读入,完成处理业务逻辑后,负责将结果写出channel
。可用资源池来管理。
interface ChannelHandler{
void channelReadable(Channel channel);
void channelWritable(Channel channel);
}
class Channel{
Socket socket;
Event event;//读,写或者连接
}
//IO线程主循环:
class IoThread extends Thread{
public void run(){
Channel channel;
//选择就绪的事件和对应的连接
while(channel=Selector.select()){
if(channel.event==accept){
//如果是新连接,则注册一个新的读写处理器
registerNewChannelHandler(channel);
}
if(channel.event==write){
//如果可以写,则执行写事件
getChannelHandler(channel).channelWritable(channel);
}
if(channel.event==read){
//如果可以读,则执行读事件
getChannelHandler(channel).channelReadable(channel);
}
}
}
//所有channel的对应事件处理器
Map<Channel,ChannelHandler> handlerMap;
}
这也是最简单的
Reactor
模式:注册所有感兴趣的事件处理器,单线程轮询选择就绪事件,执行事件处理器。NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。
Reactor
处理请求的流程:
Reactor
对象通过select
监控连接事件,收到事件后通过dispatch
进行转发。- 如果是连接建立的事件,则由
acceptor
接受连接,并创建handler
处理后续事件。 - 如果不是建立连接事件,则
Reactor
会分发调用Handler
来响应。 handler
会完成read
->
业务处理->
send
的完整业务流程。
单线程模式的缺点:
- 当其中某个
handler
阻塞时, 会导致其他所有的 client 的handler
都得不到执行, 并且更严重的是,handler
的阻塞也会导致整个服务不能接收新的 client 请求(因为acceptor
也被阻塞了)。
多线程 Reactor
连接的处理和读写的处理通常可以选择分开,这样对于海量连接的注册和读写就可以分发。
在线程
Reactor
模式基础上,做如下改进:(1)将
Handler
处理器的执行放入线程池,多线程进行业务处理。(2)而对于
Reactor
而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor
拆分为两个线程。
下图是 netty in action
中关于NIO的流程图。
多线程 Reactor
消息处理流程:
- 从主线程池中随机选择一个
Reactor
线程作为acceptor
线程,用于绑定监听端口,接收客户端连接 acceptor
线程接收客户端连接请求之后创建新的SocketChannel
,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作- 步骤2完成之后,业务层的链路正式建立,将
SocketChannel
从主线程池的Reactor
线程的多路复用器上摘除,重新注册到Sub线程池的线程上,并创建一个Handler
用于处理各种连接事件 - 当有新的事件发生时,
SubReactor
会调用连接对应的Handler
进行响应 Handler
通过Read
读取数据后,会分发给后面的Worker
线程池进行业务处理Worker
线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler
进行处理Handler
收到响应结果后通过Send
将响应结果返回给Client
Reactor
模式在 IO 读写数据时还是在同一个线程中实现的,即使使用多个Reactor
机制的情况下,那些共享一个Reactor
的Channel
如果出现一个长时间的数据读写,会影响这个Reactor
中其他Channel
的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection
或许是一个更好的选择,或则此时使用改进版的Reactor
模式如Proactor
模式。
Proactor
Reactor
模型用于同步I/O,而Proactor
模型运用于异步I/O操作。
模型
模块关系
Procator Initiator
负责创建Procator
和Handler
,并将Procato
r和Handler
都通过Asynchronous operation processor
注册到内核。Asynchronous operation processor
负责处理注册请求,并完成IO操作。完成IO操作后会通知procator
。procator
根据不同的事件类型回调不同的handler
进行业务处理。handler
完成业务处理,handler
也可以注册新的handler
到内核进程。
消息处理流程:
读取操作:
- 应用程序初始化一个异步读取操作,然后注册相应的事件处理器,此时事件处理器不关注读取就绪事件,而是关注读取完成事件,这是区别于Reactor的关键。
- 事件分离器等待读取操作完成事件。
- 在事件分离器等待读取操作完成的时候,操作系统调用内核线程完成读取操作,并将数据读写到应用传递进来的缓冲区(如Netty中的
ByteBuf
)中。这也是区别于Reactor的一点,Proactor中,应用程序需要传递缓存区。 - 事件分离器捕获到读取完成事件后,激活应用程序注册的事件处理器,事件处理器直接从缓存区读取数据,而不需要进行实际的读取操作。
异步IO都是操作系统负责将数据读写到应用传递进来的缓冲区供应用程序操作。
Proactor中写入操作和读取操作,只不过感兴趣的事件是完成事件。
在 Linux 下的异步 I/O 是不完善的,
aio
系列函数是由 POSIX 定义的异步操作接口,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,并且仅仅支持基于本地文件的 aio 异步操作,网络编程中的 socket 是不支持的,这也使得基于 Linux 的高性能网络程序都是使用 Reactor 方案。
Netty的线程模型
Netty采用的是主从线程模型。下面是Netty使用中很常见的一段代码。
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpStaticFileServerHandler());
});
Channel ch = b.bind(PORT).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
- 定义了两个
EventLoopGroup
,其中bossGroup
对应的就是主线程池,只接收客户端的连接(注册,初始化逻辑),具体的工作由workerGroup
这个从线程池来完成。 - 客户端和服务器建立连接后,NIO会在两者之间建立
Channel
,所以启动类调用channel
方法就是为了指定建立什么类型的通道。 - 启动类还调用了
handler()
和childHandler()
方法,这两个方法中提及的handler
是一个处理类的概念,他负责处理连接后的一个个Channel
的相应处理。handler()
指定的处理类是主线程池中对通道的处理类,childHandler()
方法指定的是从线程池中对通道的处理类。- Netty中,可以注册多个
handler
。ChannelInboundHandler
按照注册的先后顺序执行;ChannelOutboundHandler
按照注册的先后顺序逆序执行
- Netty中,可以注册多个
如果需要在客户端连接前的请求进行
handler
处理,则需要配置handler()
;如果是处理客户端连接之后的handler
,则需要配置在childHandler()
。
Netty三大组件
Channel
接口提供了一个API,大大降低了直接使用Sockets
的复杂性。当创建Channel
时,它会自动分配自己的ChannelPipeline
。ChannelPipeline
中含有一些ChannelHandlers
,用于用户自定义处理事件。数据通过Channel
在各个Handler
之间流动。
EventLoop
定义了Netty的核心抽象,用于处理在连接生命周期内发生的事件。Channel
在其生命周期内注册为一个EventLoop
。一个EventLoop
可以被分配给一个或多个Channel
。
关系如下图所示:
Netty中的所有I/O操作都是异步的。因为一个操作可能不会立即返回,所以我们需要一种方法在以后确定它的结果。为此,Netty提供了ChannelFuture
,其addListener()
方法注册一个ChannelFutureListener
,当操作完成时通知它(无论是否成功)。
channelFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println(future.channel() + " Transfer progress: " + progress);
} else {
System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) {
System.err.println(future.channel() + " Transfer complete.");
}
});
上面的代码就是通过添加 FutureListener
实现I/O操作完成后在Console中打印信息的功能。
参考:
Netty In Action
netty Reactor模式(源码死磕 - 云+社区 - 腾讯云 (tencent.com)
彻底搞懂Reactor模型和Proactor模型 - 云+社区 - 腾讯云 (tencent.com)
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
如何深刻理解Reactor和Proactor? - 知乎 (zhihu.com)