本系列是新开的解读Netty源码的文章系列。
要解读Netty源码需要做比较多的前期铺垫内容,主要是Linux网络、Java NIO的内容,特别是Reactor模型,这些基础内容对理解其他网络框架也是有所帮助的。
本系列文章在网络模型、Java NIO的基础之上,打算按照两条脉络来解读Netty的使用和源码分析:
- 组件的创建、启动、初始化和启动。
- 网络请求的收发处理过程。
本系列文章主要参考资料有:
- 《深入理解Linux网络技术内幕》
- 《计算机网络:自顶向下方法》
- 《TCP/IP协议详解》
- 《HTTP权威指南》
- 《Netty实战》
一、NIO基础
在介绍Netty之前需要了解一些Java NIO的基础知识,Java NIO提供了不同于BIO的IO模型。
普通的阻塞读,对通道发起Read操作会阻塞当前线程等待操作系统准备数据就绪,再从内核缓存区复制到进程缓存区,完成读取操作。
NIO的同步非阻塞读,或者说多路复用IO,使用复用器器selector轮询通道的读就绪/写就绪事件,通道可读/可写在由内核缓存区复制到进程缓存区,完成读取操作。
需要注意的是NIO不是真正的异步非阻塞读,异步非阻塞IO或者说AIO是JDK1.7新加入的AsyncXXXChannel等类型,实现了真正的异步非阻塞读/写、
下面是一些BIO、NIO、AIO的实际例子,帮助我们理解和比较三者。案例来源是李林峰老师的《Netty权威指南》这本书。
源码地址是:https://github.com/zouhuanli/NettyTheDefinitiveGuide.git。
1.1 BIO案例
一般认为,BIO是最简单,并发性能最差的IO类型。只适合很简单,性能要求不高的网络编程需求。
案例的基本功能是client发生Query命令,server获取当前时间并返回给client。案例仅供学习,未处理异常和优化结构。
BIO的Server源码如下:
public class TimeServer {
public static void main(String [] args) throws IOException {
int port = 8080;
ServerSocket server;
try {
server = new ServerSocket(port);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("The time server is start in port : " + port);
while(true){
new Thread(new TimeServerHandler(server.accept())).start();
}
}
}
public class TimeServerHandler implements Runnable {
private final Socket clientSocket;
public TimeServerHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
out = new PrintWriter(clientSocket.getOutputStream(), true);
String currentTime;
String body;
while (true) {
body = in.readLine();
if (body == null) {
break;
}
System.out.println("The time server receive order : " + body);
currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) : "BAD ORDER";
out.println(currentTime);
}
} catch (IOException e) {
if (in != null) {
try {
in.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
if (out != null) {
out.close();
}
try {
this.clientSocket.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null) {
out.close();
}
}
}
基本操作就是通过ServerSocket.accept接受客户端连接,并写回当前时间。编码模型还是很简单的。
客户端源码如下:
public class TimeClient {
/**
* 读取控制台输入 发送到127.0.0.1:8080,反现服务器返回结果
* BAD PRACTICE:未处理异常
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 8080;
Socket clientSocket;
BufferedReader in;
PrintWriter out;
clientSocket = new Socket("127.0.0.1", port);
in = new BufferedReader(new java.io.InputStreamReader(clientSocket.getInputStream()));
out = new PrintWriter(clientSocket.getOutputStream(), true);
out.println("QUERY TIME ORDER");
System.out.println("Send order 2 server succeed.");
String resp = in.readLine();
System.out.println("Now is :" + resp);
}
}
可以看到BIO的Server和Client的编码模型是很简单,适合并发量很小、无性能要求的场景。
1.2 NIO案例
NIO的核心原理是通过Reactor模式的轮询器/复用器Selector不断轮询注册的Channel事件,然后再交给具体线程去处理通道的读/写。具体的通道内容的读/写,这块是阻塞的,不是异步直接获取结果的。
NIO的底层是封装Epoll,本质是Linux的epoll的事件轮询。
下面来看下Java原生的NIO的Server和Client源码案例。
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
NioTimeServerHandler server;
server = new NioTimeServerHandler(port);
new Thread(server).start();
}
}
public class NioTimeServerHandler implements Runnable {
private final int port;
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public NioTimeServerHandler(int port) {
this.port = port;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(this.port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
stop = false;
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public void stop() {
stop = true;
}
@Override
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
try {
selector.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
try {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
// 处理读消息
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = 0;
readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = "";
body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) : "BAD ORDER";
doWrite(socketChannel, currentTime);
} else if (readBytes < 0) {
key.cancel();
try {
socketChannel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
// 读到0字节,忽略
}
}
}
}
// 异步写
private void doWrite(SocketChannel socketChannel, String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
ByteBuffer writeBuffer = ByteBuffer.allocate(currentTime.getBytes().length);
writeBuffer.put(currentTime.getBytes());
writeBuffer.flip();
try {
socketChannel.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
我们先看下Server的代码。上面源码可能没有处理粘包/半包情况,客户端多次调用可能会有错误。
Server主要流程是创建ServerChannel,bind端口,配置非阻塞。然后打开Selector,ServerChannel注册到Selector上注册Accept事件。
再”死循环”轮询Selector获得Channel的事件:如果是ServerChannel的accept事件,则调用ServerChannel开始接受客户端连接clientChannel,并设置clientChannel为非阻塞和注册它的READ事件。
然后轮询clientChannel的READ/WRITE事件,再交给具体的线程或者Handler去处理。
可以看到整体的编程模型是比BIO复杂了很多。
回到NIO的Client:
public class NioTimeClient {
public static void main(String[] args) throws IOException
{
int port = 8080;
if (args != null && args.length > 0)
{
try
{
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException e)
{
// 采用默认值
}
}
new Thread(new NioTimeClientHandler("127.0.0.1", port), "NIO-TimeClient-001").start();
}
}
public class NioTimeClientHandler implements Runnable {
private final int port;
private final String host;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public NioTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
stop = false;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
} catch (IOException e) {
throw new RuntimeException(e);
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey selectionKey;
while (it.hasNext()) {
selectionKey = it.next();
it.remove();
try {
handleInput(selectionKey);
} catch (Exception e) {
if (selectionKey != null) {
selectionKey.cancel();
if (selectionKey.channel() != null) {
try {
selectionKey.channel().close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}
}
}
}
private void handleInput(SelectionKey selectionKey) throws IOException {
if (selectionKey.isValid()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 连接成功
if (selectionKey.isConnectable()) {
if (socketChannel.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
System.exit(1);
}
}
if (selectionKey.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
selectionKey.cancel();
socketChannel.close();
}
}
}
}
private void doConnect() throws IOException {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel socketChannel) {
try {
String req = "QUERY TIME ORDER";
byte[] bytes = req.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()) {
System.out.println("Send order 2 server succeed.");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里就不做过多解读了。其主要流程是clientChannel先执行connect,再轮询监听READ事件,再做对于处理。
1.3 AIO案例
NIO的核心是Selector或者说一个单独的线程不断轮询获取Epoll上注册的IO事件,获得就绪事件,在就绪读/就绪写时候交给具体的线程或者Handler处理。其在等待就绪解读是非阻塞的,但是仍是由进程或者上层的业务代码执行Channel的READ操作,这步还是阻塞的。
而AIO是完全异步的,进程或者上层的业务代码发起Channel的读/写后,完全不关心何时就绪,也不需要其主动发起读/写。其操作都交给OS执行,OS读/写完成后再通过回调给进程/业务代码回调结果。
从这里看,理论上,AIO会比NIO有更高的性能和吞吐。实际上,由于Linux AIO支持不完善,Java AIO的编程复杂以及缺少完善的支持,目前Netty还是使用多路复用NIO的。Tomcat在比较新的版本是有AIO实现的,在其Nio2EndPoint内。
因为AIO主要编程工作就是在回调的处理,因此编程复杂度会比NIO更低。
下面来看下AIO的简单案例。
Server案例源码:
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
AioTimeSeverHandler timeServer = new AioTimeSeverHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
public class AioTimeSeverHandler implements Runnable {
private int port;
private CountDownLatch latch;
private AsynchronousServerSocketChannel serverSocketChannel;
public AioTimeSeverHandler(int port) {
this.port = port;
try {
serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server is start in port : " + port);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
serverSocketChannel.accept(this, new AcceptCompletionHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AioTimeSeverHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AioTimeSeverHandler attachment) {
attachment.serverSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AioTimeSeverHandler attachment) {
attachment.latch.countDown();
}
}
private static class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date())
: "BAD ORDER";
doWrite(currentTime);
}
private void doWrite(String currentTime) {
if (currentTime != null && !currentTime.trim().isEmpty()) {
byte[] bytes = currentTime.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
channel.write(attachment, attachment, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
可以看到Server的源码主要是打开AsyncChannel,然后注册回调函数。其主要编程工作就是回调函数。
Client端的源码如下:
public class TimeClient {
public static void main(String[] args) {
new Thread(new TimeClientHandler("127.0.0.1", 8080)).start();
}
}
public class TimeClientHandler implements Runnable, CompletionHandler<Void, TimeClientHandler> {
private AsynchronousSocketChannel client;
private final String host;
private final int port;
private CountDownLatch latch;
public TimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, TimeClientHandler attachment) {
System.out.println("客户端连接成功");
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer, new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (attachment.hasRemaining()) {
client.write(attachment, attachment, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body;
body = new String(bytes, StandardCharsets.UTF_8);
System.out.println("Now is :" + body);
latch.countDown();
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
});
}
@Override
public void failed(Throwable exc, TimeClientHandler attachment) {
}
}
为避免过多匿名内部类的回调地狱,可以设计好单独的回调类。
二、Netty简述
Netty是一个高性能、可伸缩、上层易用的Java网络通信框架,主要用于构建Client/Server应用。其基于异步事件驱动,通过事件和回调来处理网络事件,最大程度避免阻塞对性能的影响。
同时提供了简单良好易用的API,封装或者完善Java的原生NIO,避免面对直接使用Java的原生NIO的复杂度,专注于上层业务开发,减少对底层网络通讯细节的关注。其对上层开放的接口主要ChannelHandler,ChannelPipeline,ChannelContext等服务编排类。
此外,netty在很多开源框架/组件中都有应用,充当底层的网络通信模块,如Dubbo的NettyClient和NettyServer。
总结的说,Netty是一个高性能的Java网络通信框架,有着广泛的应用。
三、Netty入门
使用Netty改写上面的Client和Server的案例如下。
3.1 Server侧源码
Server源码:
/**
* 引导类和服务端,比较固化
*/
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler())
;
ChannelFuture future;
future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
System.exit(-1);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
//主要拓展点
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
引导类是比较固化的,Server侧需要创建两个EventLoopGroup,一个用于接受客户端连接,一个用于处理客户端的读写事件。EventLoopGroup类似与ThreadGroup,内部的EventLoop是一个轮询线程,轮询注册事件的状态然后进行处理。
bossGroup内部一般是一个EventLoop,也就是只需要一个轮询线程ServeChannel的事件并不断accept客户端连接。worker的EventLoopGroup内部一般是逻辑CPU个数个EventLoop,轮询处理客户端的读写事件。
对上层开发者的主要拓展点是ChannelHandler。
Server的Handler如下:
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes(StandardCharsets.UTF_8));
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
handler主要是实现Channel事件的回调。
3.2 Client侧源码
Client侧源码如下:
/**
* 引导类和客户端,比较固化
*/
public class TimeClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) {
//主要拓展点
ch.pipeline().addLast(new TimeClientHandler());
}
}
);
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
System.exit(-1);
} finally {
group.shutdownGracefully();
}
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.copiedBuffer(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, StandardCharsets.UTF_8);
System.out.println("Now is : " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
System.out.println("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
类似于Server代码,不过Client只需要一个EventLoopGroup。同样上层开发者主要开发和实现ChannelHandler的回调方法。
和上面原生Java NIO代码相比,Netty的引导类比较固定,主要是开发ChannelHandler。Netty的代码结构更加清晰,代码量也更少。
四、Netty核心架构
4.1 组件架构
首先看下官网的系统组件图:
整体系统组件分为三层:
- Core 核心层:事件模型、通用网络API、ByteBuf等。
- Transport 传输层:TCP、UDP、Socket等网络传输能力的抽象和封装。
- Protocol 协议层:主要是Http、Websocket、Protobuf等协议的官方实现。
4.2 逻辑架构
上图的组件结构重点在于组件功能,划分的结构缺少了组件之间的交互和关联。笔者更喜欢划分成这样的三层架构(图片来源:阿里云开发者公众号):
这样的划分不仅仅区分组件的功能,还表明组件的依赖关系和交互关系。
- 首先是最底层的网络通信层:封装和优化原生Java NIO。核心组件是Bootstrap、Channel、ServerBootstrap。引导类是服务器/客户端启动初始化、服务器连接等操作的核心类。
Channel是对Java NIO的Channel的更高抽象,而且不仅仅包含NIO的channel, 也支持BIO的channel。
- 然后是调度层:或者说是核心引擎层,负责线程池、事件循环、事件分发等。核心组件是EventLoopGroup、EventLoop等。其最主要的组件就是EventLoop,它是Netty的精髓所在,负责事件分发、线程池调度等。
Server侧的NioEventLoop实现了reactor模式,其boss线程负责accept客户端连接,worker线程负责处理客户端读写事件。
- 最后是服务编排层:负责组装服务,链式传递服务。核心组件是ChannelPipeline、ChannelHandler、ChannelHandlerContext等。ChannelHandler是上层开发者主要拓展点,主要负责实现Channel事件的回调。
五、核心源码总览
上面提及了Netty的一些核心类,如:Bootstrap、Channel、ServerBootstrap、EventLoopGroup、EventLoop、ChannelPipeline、ChannelHandler、ChannelHandlerContext等。
本文不会对这些核心类做过多的解读,而是简单介绍一下Server执行引擎(EventLoop)是如何串联或者使用这些核心类的。
主要是要区分Server的bossEventLoop和workerEventLoop的执行流程。
5.1 Server启动和引导过程源码简单解读
首先Server启动的两个EventLoopGroup中,BossGroup是1线程的,主要是使用ServerChannel不断获取客户端连接clientChannel。将clientChannel注册和分配到workerEventLoop(默认2*CPU数量)中,会从workerEventLoopGroup中获得一个注册进去。
然后有workerEventLoop的去处理这个客户端连接channel的IO事件。这是二者最主要的区别。
启动过程中首先需要创建ServerChannel,和注册ServerChannelServerChannel。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//创建ServerChannel
channel = channelFactory.newChannel();
init(channel);
}
//注册ServerChannel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
然后注册Channel到Selector。Selector是eventLoop(一个EventLoop一般一个线程)里面只有一个。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//注册Channel到Selector。Selector是eventLoop(一个线程)里面只有一个。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
最后使用ServerBootstrapAcceptor注册clientChannel到worker的EventLoop中。
具体的执行流程大致如下:
5.2 bossEventLoop执行过程源码简单解读
在BossEventLoop的run方法中,首先不断轮询Epoll事件,获得ACCEPT事件。BossEventLoop再处理ACCEPT事件。
int selectNow() throws IOException {
return selector.selectNow();
}
对ACCEPT事件处理是从这里开始:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
来到:
doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
最终获得客户端连接clientChannel。
然后注册clientChannel到workerEventLoop中。然后回调ServerBootstrapAcceptor的channelRead方法,也就是将clientChannel注册到WorkerGroup的一个EventLoop中.
childGroup.register(child).addListener(new ChannelFutureListener();
这样bossEventLoop就处理完一个客户端连接,将其注册和交付给workerEventLoop中的一个EventLoop(循环线程)。
下面简单介绍workerEventLoop的流程:
5.3 workerEventLoop执行流程源码简单解读
在workerEventLoop的run方法中,具体处理clientChannel的READ或者WRITE事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
unsafe.forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
一个workerEventLoop只有一个Selector,可以处理不同个clientChannel的READ或者WRITE事件。但是一个clientChannel只会被注册和分配到workerEventLoop的一个EventLoop中。
处理具体的READ和WRITE事件时候,会回调ChannelHandler注册的回调方法。通过Pipeline的链式调用,会调用多个ChannelHandler。ChannelHandler就是上层开发者具体开发的业务功能点或者拓展点。
六、总结
-
Netty是什么?
Netty是一个高性能的异步事件驱动的网络应用框架,封装和优化Java NIO,极大的简化网络应用开发的难度,方便我们开发CS应用和私有协议。
-
Netty整体架构
1) 网络通信层:封装和优化原生Java NIO。提供Bootstrap、Channel、ServerBootstrap等类,负责服务器/客户端启动初始化、服务器连接等功能。这层整体还是优化Java NIO。
2) 调度引擎层:提供EventLoopGroup、EventLoop等,负责线程池、事件循环、事件分发等功能。这层是实际Netty线程执行事件循环的层。
3)服务编排层:负责组装服务,链式传递服务。提供ChannelPipeline、ChannelHandler、ChannelHandlerContext等。ChannelHandler是上层开发者主要拓展点,主要负责实现Channel事件的回调。这层是上层开发拓展业务的层。
-
Server执行核心流程:
Boss的EventLoop主要是监听Accept事件,负责ACCEPT事件,不断accept客户端连接,然后注册和交付给workerEventLoop中的一个EventLoop。boss一般是只有一个线程。
worker的EventLoop不断轮询clientChannel的IO事件,并处理IO事件,处理事件时调用Pipeline的链式的ChannelHandler去做具体的处理。worker一般是多个线程。
这是MainRector+多SubReactor的模式。
七、参考材料
-
《Netty权威指南》
-
Netty源码(4.1)