学习链接
狂野架构师第四期netty视频 - B站视频
狂野架构师训练营6期 - B站视频
Netty学习example示例(含官方示例代码)
LG-Netty学习
【硬核】肝了一月的Netty知识点 - 启动过程写的很详细
Reactor模型讲解
一文搞懂Reactor模型与实现
高性能网络编程之 Reactor 网络模型(彻底搞懂)
文章目录
- 学习链接
- BIO&NIO代码实现
- BIO(传统阻塞IO)
- BioServer
- BioClient
- NIO1(基础理解,不交互)
- NioServer
- NioClient
- NIO2(交互,不处理write后续)
- NioServer
- NioClient
- NIO3(交互,处理write后续)
- NioServer
- NioClient
- Reactor模型
- 单Reactor-单线程
- 单Reactor-多线程
- 主从Reactor-多线程
- 工作流程
- 优势
- Netty
- 介绍
- 关于异步
- Netty核心架构
- 网络通信框架为什么非得是Netty
- Netty现状如何
- Netty对三种IO的支持
- Netty中的Reactor实现
- 工作流程
- 总结
- Pipeline 和 Handler
- ChannelPipeline & ChannelHandler
- ChannelHandler 分类
- Netty如何使用Reactor模式
BIO&NIO代码实现
BIO(传统阻塞IO)
BioServer
@Slf4j
public class BioServer {public static void main(String[] args) {//由Acceptor线程负责监听客户端的连接ServerSocket serverSocket = null;try {// 执行完,服务器启动成功(在此阻塞,直到启动成功或抛出异常)serverSocket = new ServerSocket(8888);System.out.println("服务端启动监听.......");while (true) {//Acceptor线程接收到客户端连接请求之后为每个客户端创建一个新的线程进行业务处理(在此阻塞,直到接收到1个客户端连接或抛出异常)Socket socket = serverSocket.accept();System.out.println("成功接收一个客户端连接:"+socket.getInetAddress());new Thread(new ServerHandler(socket)).start();}} catch (IOException e) {log.error("服务端发生异常", e);}finally {if (serverSocket!=null) {try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}
}@Slf4j
class ServerHandler implements Runnable{private final Socket socket;public ServerHandler(Socket socket) {this.socket = socket;}public void run() {BufferedReader in = null;BufferedWriter out = null;try {//获取客户端的输入流in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));out = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream()));System.out.println("准备接收来自客户端:"+this.socket.getInetAddress()+"的数据");//读取客户端发送过来的数据while (true) {// 当客户端调用socket.close()时,line会为null;// 但当客户端突然关闭(比如代码执行完,客户端直接退出),此时readLine()会抛出异常String line = in.readLine();if (line == null) {log.info("读取内容是null");break;}System.out.println("成功接收来自客户端的数据:"+ line);//进行业务处理//给客户端响应数据out.write("success! i am server \n");out.flush();}} catch (IOException e) {if (in != null) {try {in.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (out != null) {try {out.close();} catch (IOException ioException) {ioException.printStackTrace();}}}}
}
BioClient
public class BioClient {public static void main(String[] args) {Socket socket = null;BufferedReader in = null;BufferedWriter out = null;try {// 当 new Socket("127.0.0.1", 8080) 构造函数成功返回时,意味着客户端Socket已经与服务器成功建立了TCP连接socket = new Socket("127.0.0.1",8888);in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));System.out.println("准备向服务端写数据!");//向服务端写数据out.write("hello server , i am client ! \n");//注意别丢 \n 因为服务端是readLineout.flush();//接收来自服务端的数据String line = in.readLine();System.out.println("成功接收到来自服务端的数据:"+line);// 可以选择在此处给服务端1个友好的关闭信号// socket.close();} catch (IOException e) {if (in != null) {try {in.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (out != null) {try {out.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (socket != null) {try {socket.close();} catch (IOException ioException) {ioException.printStackTrace();}}}}
}
NIO1(基础理解,不交互)
NioServer
public class NioServer {/*** 基于 Channel开发** @param args*/public static void main(String[] args) {try {//1、打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道(代表客户端连接的管道都是通过它创建的)ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2、绑定监听端口,设置连接为非阻塞模式(在非阻塞模式下,bind方法也是同步调用)// 只有(要)绑定后,才(就)能调用 serverSocketChannel.accept()serverSocketChannel.socket().bind(new InetSocketAddress(8888));// 此方法可以在任何时候调用。新的阻塞模式仅对在此方法返回之后发起的 I/O 操作生效。serverSocketChannel.configureBlocking(false);//3、创建多路复用器SelectorSelector selector = Selector.open();//4、将ServerSocketChannel注册到selector上,监听客户端连接事件ACCEPTserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端已成功启动,可以接收连接!");//5、创建 Reactor线程,让多路复用器在 Reactor 线程中执行多路复用程序new Thread(new SingleReactor(selector)).start();} catch (IOException e) {e.printStackTrace();}}
}@Slf4j
class SingleReactor implements Runnable {private final Selector selector;public SingleReactor(Selector selector) {this.selector = selector;}public void run() {//6、selector轮询准备就绪的事件while (true) {try {// 1、当客户端突然退出,也会触发可读的就绪事件集,并且此时去读,会抛出异常,// 如果此时忽略这个异常,那么下次select()查询时,不会阻塞住,所以此时就需要取消这个keyselector.select(1000);// 获取到所查询到的感兴趣的事件集Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 关于这个SelectionKey有几个需要理解的点// 1. select()时,当注册到selector的channel中发生了感兴趣的事件时,就会返回代表channel和selector注册关系的selectionKey,// 这个selectionKey就是channel注册到selector时返回的,是同一对象。// 通过selectionKey可以直到是何种感兴趣的事件,这个事件有可能是多个。// 2. 既然产生了感兴趣的事件,那么这个事件就必须得到处理,否则下次select()查询时,由于仍然有感兴趣的事件,所以不会阻塞住,必须处理掉这个事件。// 3. 必须将当前处理的key从key集合中移除掉,假设处理了事件,但并不移除这个key,那么下次select()查询会阻塞,但当其它channel上发生的感兴趣事件时,// 此时就不阻塞了,然后再调用selector.selectedKeys()方法,会把上次没有移除的这个key也给返回在key集合中,但是那个对应的事件实际上已经被处理了。// 所以这个selectedKeys集合,在jdk底层是不会帮我们自动移除的,它只会在注册的channel发生了感兴趣的事件时,会把这个channel对应的selectionKey放入到selectedKeys集合SelectionKey selectionKey = iterator.next();iterator.remove();try {processKey(selectionKey);} catch (IOException e) {log.error("处理selectionKey发生异常", e);// 1、当客户端突然退出,也会触发可读的就绪事件集,并且此时去读,会抛出异常,// 如果此时忽略这个异常(啥都不干,只是打印个日志),那么下次select()查询时,不会阻塞住,所以此时就需要取消这个key,避免死循环selectionKey.cancel();SelectableChannel channel = selectionKey.channel();if (channel != null) {channel.close();}}}} catch (IOException e) {log.info("服务端发生异常", e);}}}private void processKey(SelectionKey key) throws IOException {if (key.isValid()) {//7、根据准备就绪的事件类型分别处理if (key.isAcceptable()) { //客户端请求连接事件就绪//7.1、接收一个新的客户端连接,创建对应的SocketChannel,ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 此处accept方法会以非阻塞方式执行SocketChannel socketChannel = serverSocketChannel.accept();//7.2、设置socketChannel的非阻塞模式,并将其注册到Selector上,监听读事件// 只有非阻塞模式的socketChannel才能注册到selector上socketChannel.configureBlocking(false);// 此处注册时,可以指定携带1个附件socketChannel.register(this.selector, SelectionKey.OP_READ);}if (key.isReadable()) {//读事件准备继续//7.1、读客户端发送过来的数据SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 1、将socketChannel的数据读到readBuffer中// 2、read仍然以非阻塞方式执行// 3、如果1次没有读完接收缓冲区中的数据,则下次仍然会触发可读事件,可以接着读// 4、如果接收缓冲区中没有数据,则立即返回0(比如说,我先一次全部读完了,我又去调用这个方法去读一遍,此时缓冲区中已经没有数据可读了)// 5、当客户端给了1个关闭的信号时,会触发readyOps就会事件为可读,此时读取会返回-1// 6、如果客户端没给关闭信号,就直接退出了,这时去读就会抛出异常(比如客户端执行完所有代码就退出了,或者debug模式下直接强制关闭客户端)int readBytes = socketChannel.read(readBuffer);//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数if (readBytes > 0) {readBuffer.flip();//读写模式切换byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String msg = new String(bytes, "utf-8");//进行业务处理String response = doService(msg);//给客户端响应数据System.out.println("服务端开始向客户端响应数据");byte[] responseBytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(responseBytes.length);writeBuffer.put(responseBytes);writeBuffer.flip();// 1、以非阻塞方式写出数据给客户端,意思就是发送缓冲区中当前能写多少量,反正不可能超过这个量// 2、这里不一定能够一次就能把writeBuffer中的数据全部写给客户端// 3、返回的int表示写了多少// 4、所以如果没写完,还需监听可写事件,然后将未写完的数据继续写出去socketChannel.write(writeBuffer);} else if (readBytes < 0) {//值为-1表示链路通道已经关闭key.cancel();socketChannel.close();} else {//没读取到数据,忽略log.warn("没读取到数据,忽略");}}}}private String doService(String msg) {System.out.println("成功接收来自客户端发送过来的数据:" + msg);return msg + "---from nioserver";}}
NioClient
public class NioClient {public static void main(String[] args) {try {//1、窗口客户端SocketChannel,绑定客户端本地地址(不选默认随机分配一个可用地址)SocketChannel socketChannel = SocketChannel.open();//2、设置非阻塞模式,socketChannel.configureBlocking(false);//3、创建SelectorSelector selector = Selector.open();//3、创建Reactor线程new Thread(new SingleReactorClient(socketChannel, selector)).start();} catch (IOException e) {e.printStackTrace();}}
}@Slf4j
class SingleReactorClient implements Runnable {private final SocketChannel socketChannel;private final Selector selector;public SingleReactorClient(SocketChannel socketChannel, Selector selector) {this.socketChannel = socketChannel;this.selector = selector;}public void run() {try {//连接服务端doConnect(socketChannel, selector);} catch (IOException e) {e.printStackTrace();System.exit(1);}//5、多路复用器执行多路复用程序while (true) {try {selector.select(1000);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();processKey(selectionKey);iterator.remove();}} catch (IOException e) {log.info("nio client 异常", e);}}}private void doConnect(SocketChannel sc, Selector selector) throws IOException {System.out.println("客户端成功启动,开始连接服务端");//3、连接服务端boolean connect = sc.connect(new InetSocketAddress("127.0.0.1", 8888));//4、将socketChannel注册到selector并判断是否连接成功,连接成功监听读事件,没有连接成功则继续监听连接事件System.out.println("connect=" + connect);if (connect) { // 如果连接的是本地,通常这里的connect是true了// 如果上来就连接成功了,则直接注册读事件(此时,就完全不需要连接事件了)sc.register(selector, SelectionKey.OP_READ);System.out.println("客户端成功连上服务端,准备发送数据");//开始进行业务处理,向服务端发送数据(连接成功了,就(才)可以向socketChannel中写入数据了)doService(sc);} else {// 如果上来就没连接成功,则注册连接事件sc.register(selector, SelectionKey.OP_CONNECT);}}private void processKey(SelectionKey key) throws IOException {if (key.isValid()) {//6、根据准备就绪的事件类型分别处理if (key.isConnectable()) {//服务端可连接事件准备就绪SocketChannel sc = (SocketChannel) key.channel();// 当selector监听到连接结果后,就调用finishConnect确认连接是否成功// 当确认与客户端连接成功后,才向服务端发送数据if (sc.finishConnect()) {//6.1、向selector注册可读事件(接收来自服务端的数据)// 注意这行代码// 1、在监听到连接成功后,此处操作同时暗含了取消了对连接事件的监听,// 如果此处不取消对连接事件的监听,则会一直触发OP_CONNECT事件(经测试,后续一直触发OP_CONNECT事件时,selector.selectedKeys()又无法获取到对应的key)// 猜测原因:因为连接事件只会触发一次,只处理一次,后续没有再处理的必要了,所以不会再把仅关注OP_CONNECT事件的通道的key放到selectedKeys中了// 2、所以正确的处理步骤就是取消对连接事件的监听,而监听读事件(写事件需要在有东西可写的时候再去关注)sc.register(selector, SelectionKey.OP_READ);//6.2、处理业务 向服务端发送数据doService(sc);} else {//连接失败,退出System.exit(1);}}if (key.isReadable()) {//读事件准备继续//6.1、读服务端返回的数据SocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBufer = ByteBuffer.allocate(1024);// 要么是链路通道关闭了,这会触发可读事件,下一次select()将不会阻塞,因为链路通道关闭作为可读事件一直触发// 要么是读到数据了但没全读完,下一次select()将不会阻塞,因为可读事件一直触发// 要么是数据全部一次就读完了,// 要么读取的时候抛出异常了int readBytes = sc.read(readBufer);//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数if (readBytes > 0) {readBufer.flip();//读写模式切换byte[] bytes = new byte[readBufer.remaining()];readBufer.get(bytes);String msg = new String(bytes, "utf-8");//接收到服务端返回的数据后进行相关操作doService(msg);} else if (readBytes < 0) {//值为-1表示链路通道已经关闭// 如果此时不取消该key,那么 链路通道关闭一直作为读事件触发,而引发不断的循环key.cancel();sc.close();} else {//没读取到数据,忽略}}}}private static void doService(SocketChannel socketChannel) throws IOException {System.out.println("客户端开始向服务端发送数据:");//向服务端发送数据byte[] bytes = "hello nioServer,i am nioClient !".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();// 这里可能不能一次性写完socketChannel.write(writeBuffer);}private String doService(String msg) {System.out.println("成功接收来自服务端响应的数据:" + msg);return "";}
}
NIO2(交互,不处理write后续)
添加交互,但不考虑是否能一次性写完的问题。
NioServer
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
public class NioServer {public static void main(String[] args) {try {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);new Thread(new SingleReactor(serverSocketChannel, selector)).start();} catch (IOException e) {log.info("服务器发生异常", e);}}}@Slf4j
class SingleReactor implements Runnable {private ServerSocketChannel serverSocketChannel;private Selector selector;private List<SocketChannel> channels = new CopyOnWriteArrayList<>();private AtomicBoolean terminated = new AtomicBoolean(false);public SingleReactor(ServerSocketChannel serverSocketChannel, Selector selector) {this.serverSocketChannel = serverSocketChannel;this.selector = selector;}@Overridepublic void run() {new Thread(() -> {Scanner sc = new Scanner(System.in);while (true) {log.info("请输入:");String msg = sc.nextLine();if ("exit".equals(msg)) {terminated.set(true);try {// 【此处发现,当调用的serverSocketChannel的close()方法时,selector.select()不会停止阻塞】serverSocketChannel.close();} catch (IOException e) {log.error("关闭serverSocketChannel发生异常");}log.info("关闭服务 {}", channels.size());for (SocketChannel channel : channels) {try {// 【此处发现,当调用socketChannel的close()方法时,selector.select()会停止阻塞,相当于给了选择器一个事件通知】channel.close();log.info("关闭客户端");} catch (IOException e) {log.error("exit 关闭channel错误", e);}}break;} else {for (SocketChannel channel : channels) {try {channel.write(ByteBuffer.wrap(("服务器主动群发消息->" + msg).getBytes()));} catch (IOException e) {log.error("server发送数据发生异常", e);try {channel.close();} catch (IOException ex) {log.error("server关闭连接发生异常", ex);}}}}}}).start();while (!terminated.get()) {try {log.info("server 开始进入select");// 客户端channel调用close 或者 服务端这边的channel(SocketChannel,不包括ServerSocketChannel)调用close都会唤醒selectorselector.select();Set<SelectionKey> keys = selector.selectedKeys();log.info("server selectedKeys {}", keys.size());Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();log.info("selectionKey {}, {}", key, key.readyOps());try {if (key.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);log.info("接收到客户端连接,并注册可读事件:{}", socketChannel);channels.add(socketChannel);}if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();log.info("读取客户端 {} 的数据", socketChannel);ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(readBuffer);log.info("读取到 {} 字节数据", readBytes);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String msg = new String(bytes, "utf-8");log.info("接收到客户端 {} 的数据:{}", socketChannel, msg);socketChannel.write(ByteBuffer.wrap(("服务器已收到消息->" + msg).getBytes()));} else if (readBytes == -1) {log.info("客户端 {} 已关闭连接", socketChannel);key.cancel();socketChannel.close();channels.remove(socketChannel);} else {log.info("没有数据可读");}}} catch (IOException e) {log.error("处理key发生异常", e);key.cancel();key.channel().close();}}} catch (IOException e) {log.error("server发生异常", e);}}}
}
NioClient
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
public class NioClient {public static void main(String[] args) {try {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Selector selector = Selector.open();new Thread(new SingleReactorClient(selector, socketChannel)).start();} catch (IOException e) {log.info("客户端抛出异常", e);}}}@Slf4j
class SingleReactorClient implements Runnable {private Selector selector;private SocketChannel socketChannel;private AtomicBoolean terminated = new AtomicBoolean(false);private Thread chatThread = null;public SingleReactorClient(Selector selector, SocketChannel socketChannel) {this.selector = selector;this.socketChannel = socketChannel;}@Overridepublic void run() {try {boolean connected = socketChannel.connect(new InetSocketAddress("localhost", 8080));if (connected) {log.info("第一次就连接成功了");writeToServer("【连接成功111】", socketChannel);startChatThread();} else {log.info("注册连接事件监听");socketChannel.register(selector, SelectionKey.OP_CONNECT);}while (!terminated.get()) {log.info("开始进入select");// 打印当前所有注册的键Set<SelectionKey> allKeys = selector.keys();log.info("当前注册的键数量: {}", allKeys.size());for (SelectionKey k : allKeys) {log.info("键: {}, interestOps: {}, readyOps: {}", k, k.interestOps(), k.readyOps());}int selectedCount = selector.select();log.info("select返回,就绪通道数量: {}", selectedCount);Set<SelectionKey> keys = selector.selectedKeys();log.info("selectedKeys {}", keys.size());Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();log.info("selectionKey {}, {}", key, key.readyOps());if (key.isConnectable()) {if (socketChannel.finishConnect()) {log.info("监听连接成功事件");socketChannel.register(selector, SelectionKey.OP_READ);writeToServer("【连接成功222】", socketChannel);startChatThread();} else {log.error("连接失败222");return;}}if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer readBufer = ByteBuffer.allocate(1024);int readBytes = channel.read(readBufer);if (readBytes > 0) {log.info("读取到{}字节数据", readBytes);readBufer.flip();log.info("读取数据:{}", new String(readBufer.array(), 0, readBytes));} else if (readBytes == -1) {log.info("客户端 {} 已关闭连接", socketChannel);key.cancel();terminated.set(true);if (chatThread != null) {// 中断并不能使得scanner的nextLine()方法停止阻塞,所以导致chatThread不能停止// 所以,也可以使用System.in.exit()chatThread.interrupt();}} else {log.info("没有数据可读");}}}}log.info("客户端循环结束...");} catch (IOException e) {log.error("client发生异常", e);}log.info("客户端停止...");}private void startChatThread() {chatThread = new Thread(new Runnable() {@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()) {try {log.info("请输入:");String msg = getInput();if ("exit".equals(msg)) {terminated.set(true);// 【此处发现,当调用socketChannel的close()方法时,selector.select()会停止阻塞,相当于给了选择器一个事件通知】socketChannel.close();break;// 也可以选择直接退出程序// System.exit(0);} else {writeToServer(msg, socketChannel);}} catch (IOException e) {log.error("client发送数据发生异常", e);try {socketChannel.close();} catch (IOException ex) {log.error("client关闭连接发生异常", ex);}}}}private String getInput() {InputStream in = System.in;while (!Thread.currentThread().isInterrupted()) {try {if (in.available() > 0) {Scanner sc = new Scanner(System.in);String line = sc.nextLine();return line;}Thread.sleep(100);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {// 直接结束return "exit";}}return null;}});chatThread.start();}private static void writeToServer(String msg, SocketChannel socketChannel) throws IOException {byte[] bytes = msg.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();socketChannel.write(writeBuffer);}
}
NIO3(交互,处理write后续)
NioServer
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
public class NioServer {public static void main(String[] args) {try {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);new Thread(new SingleReactor(serverSocketChannel, selector)).start();} catch (IOException e) {log.info("服务器发生异常", e);}}}@Slf4j
class SingleReactor implements Runnable {private ServerSocketChannel serverSocketChannel;private Selector selector;private List<SocketChannel> channels = new CopyOnWriteArrayList<>();private AtomicBoolean terminated = new AtomicBoolean(false);public SingleReactor(ServerSocketChannel serverSocketChannel, Selector selector) {this.serverSocketChannel = serverSocketChannel;this.selector = selector;}@Overridepublic void run() {new Thread(() -> {Scanner sc = new Scanner(System.in);while (true) {log.info("请输入:");String msg = sc.nextLine();if ("exit".equals(msg)) {terminated.set(true);try {// 【此处发现,当调用的serverSocketChannel的close()方法时,selector.select()不会停止阻塞】serverSocketChannel.close();} catch (IOException e) {log.error("关闭serverSocketChannel发生异常");}log.info("关闭服务 {}", channels.size());for (SocketChannel channel : channels) {try {// 【此处发现,当调用socketChannel的close()方法时,selector.select()会停止阻塞,相当于给了选择器一个事件通知】channel.close();log.info("关闭客户端");} catch (IOException e) {log.error("exit 关闭channel错误", e);}}break;} else {for (SocketChannel channel : channels) {try {// 不直接去写了,而是关注可写事件// channel.write(ByteBuffer.wrap(("服务器主动群发消息->" + msg).getBytes()));ByteBuffer buffer = ByteBuffer.wrap(("服务器主动群发消息->" + msg).getBytes());SelectionKey key = channel.keyFor(selector);Session session = (Session) key.attachment();session.setData(buffer);// 如果selector正在select(),此时调用key.interestOps(ops)修改感兴趣的事件集,// 将不会让selector停止阻塞,对于感兴趣的事件集的改变只有在下一次selector.select()的时候才会生效// 所以此时就需要代码去主动唤醒selector.select()停止阻塞key.interestOps(SelectionKey.OP_WRITE);// 需要去唤醒selector.wakeup();} catch (Exception e) {log.error("server发送数据发生异常", e);try {channel.close();} catch (IOException ex) {log.error("server关闭连接发生异常", ex);}}}}}}).start();while (!terminated.get()) {try {log.info("server 开始进入select");// 客户端channel调用close 或者 服务端这边的channel(SocketChannel,不包括ServerSocketChannel)调用close都会唤醒selectorselector.select();Set<SelectionKey> keys = selector.selectedKeys();log.info("server selectedKeys {}", keys.size());Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();if (key.isValid()) {log.info("selectionKey {}, {}", key, key.readyOps());try {if (key.isAcceptable()) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);// 携带1个附件socketChannel.register(selector, SelectionKey.OP_READ, new Session());log.info("接收到客户端连接,并注册可读事件:{}", socketChannel);channels.add(socketChannel);}if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();log.info("读取客户端 {} 的数据", socketChannel);ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(readBuffer);log.info("读取到 {} 字节数据", readBytes);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String msg = new String(bytes, "utf-8");log.info("接收到客户端 {} 的数据:{}", socketChannel, msg);// 不直接去写了,而是关注可写事件// socketChannel.write(ByteBuffer.wrap(("服务器已收到消息->" + msg).getBytes()));// 将数据放入session中,待到写事件中处理ByteBuffer buffer = ByteBuffer.wrap(("服务器已收到消息->" + msg).getBytes());((Session) key.attachment()).setData(buffer);// 关注可写事件的同时,取消了可读事件的关注(也就意味着即使后续有可读的数据,也得等把当前的数据写完了,再去读)key.interestOps(SelectionKey.OP_WRITE);} else if (readBytes == -1) {log.info("客户端 {} 已关闭连接", socketChannel);key.cancel();socketChannel.close();channels.remove(socketChannel);} else {log.info("没有数据可读");}}// 因为isWritable会校验key是否被取消,如果被取消,再调用isWritable,就会抛异常if (key.isValid() && key.isWritable()) {log.info("处理写事件");Session session = (Session) key.attachment();ByteBuffer buffer = session.getData();if (buffer != null) {SocketChannel channel = (SocketChannel) key.channel();int written = channel.write(buffer);log.info("limit: {},position: {},写了 :{}", buffer.limit(), buffer.position(), written);if (!buffer.hasRemaining()) {key.interestOps(SelectionKey.OP_READ);log.info("写完了,继续关注可读事件");}}}} catch (IOException e) {log.error("处理key发生异常", e);key.cancel();key.channel().close();}}}} catch (IOException e) {log.error("server发生异常", e);}}}
}@Data
class Session {private ByteBuffer data;public Session() {}}
NioClient
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
public class NioClient {public static void main(String[] args) {try {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);Selector selector = Selector.open();new Thread(new SingleReactorClient(selector, socketChannel)).start();} catch (IOException e) {log.info("客户端抛出异常", e);}}}@Slf4j
class SingleReactorClient implements Runnable {private Selector selector;private SocketChannel socketChannel;private AtomicBoolean terminated = new AtomicBoolean(false);private Thread chatThread = null;public SingleReactorClient(Selector selector, SocketChannel socketChannel) {this.selector = selector;this.socketChannel = socketChannel;}@Overridepublic void run() {try {boolean connected = socketChannel.connect(new InetSocketAddress("localhost", 8080));if (connected) {log.info("第一次就连接成功了");writeToServer("【连接成功111】", socketChannel);startChatThread();} else {log.info("注册连接事件监听");socketChannel.register(selector, SelectionKey.OP_CONNECT);}while (!terminated.get()) {log.info("开始进入select");// 打印当前所有注册的键Set<SelectionKey> allKeys = selector.keys();log.info("当前注册的键数量: {}", allKeys.size());for (SelectionKey k : allKeys) {log.info("键: {}, interestOps: {}, readyOps: {}", k, k.interestOps(), k.readyOps());}int selectedCount = selector.select();log.info("select返回,就绪通道数量: {}", selectedCount);Set<SelectionKey> keys = selector.selectedKeys();log.info("selectedKeys {}", keys.size());Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();log.info("selectionKey {}, {}", key, key.readyOps());if (key.isConnectable()) {if (socketChannel.finishConnect()) {log.info("监听连接成功事件");socketChannel.register(selector, SelectionKey.OP_READ);writeToServer("【连接成功222】", socketChannel);startChatThread();} else {log.error("连接失败222");return;}}if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();ByteBuffer readBufer = ByteBuffer.allocate(1024);int readBytes = channel.read(readBufer);if (readBytes > 0) {log.info("读取到{}字节数据", readBytes);readBufer.flip();log.info("读取数据:{}", new String(readBufer.array(), 0, readBytes));} else if (readBytes == -1) {log.info("客户端 {} 已关闭连接", socketChannel);key.cancel();terminated.set(true);if (chatThread != null) {// 中断并不能使得scanner的nextLine()方法停止阻塞,所以导致chatThread不能停止// 所以,也可以使用System.in.exit()chatThread.interrupt();}} else {log.info("没有数据可读");}}}}log.info("客户端循环结束...");} catch (IOException e) {log.error("client发生异常", e);}log.info("客户端停止...");}private void startChatThread() {chatThread = new Thread(new Runnable() {@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()) {try {log.info("请输入:");String msg = getInput();if ("exit".equals(msg)) {terminated.set(true);// 【此处发现,当调用socketChannel的close()方法时,selector.select()会停止阻塞,相当于给了选择器一个事件通知】socketChannel.close();break;// 也可以选择直接退出程序// System.exit(0);} else {writeToServer(msg, socketChannel);}} catch (IOException e) {log.error("client发送数据发生异常", e);try {socketChannel.close();} catch (IOException ex) {log.error("client关闭连接发生异常", ex);}}}}private String getInput() {InputStream in = System.in;while (!Thread.currentThread().isInterrupted()) {try {if (in.available() > 0) {Scanner sc = new Scanner(System.in);String line = sc.nextLine();return line;}Thread.sleep(100);} catch (IOException e) {throw new RuntimeException(e);} catch (InterruptedException e) {// 直接结束return "exit";}}return null;}});chatThread.start();}private static void writeToServer(String msg, SocketChannel socketChannel) throws IOException {byte[] bytes = msg.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();socketChannel.write(writeBuffer);}
}
Reactor模型
Reactor线程模型不是Java专属,也不是Netty专属,它其实是一种并发编程模型,是一种思想,具有指导意义
Reactor模型中定义了三种角色
Reactor
:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。Acceptor
:处理客户端新连接,并分派请求到处理器链中。Handler
:将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel
单Reactor-单线程
NIO下Reactor单线程,所有的接收连接,处理数据的相关操作都在一个线程中来完成,性能上有瓶颈
单Reactor-多线程
NIO下Reactor多线程,把比较耗时的数据的编解码运算操作放入线程池中来执行,提升了性能但还不是最好的方式
主从Reactor-多线程
主从Reactor多线程,主从多线程,对于服务器来说,接收客户端的连接是比较重要的,因此将这部分操作单独用线程去操作
工作流程
这种模式的基本工作流程为:
1)Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过 Acceptor 处理客户端连接事件。
2)当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由SubReactor 监听后面的 IO 事件。)
3)SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理。
4)当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理。
5)Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理。
6)Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。Handler 通过send 向客户端发送响应数据。
7)一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个 SubReactor 线程
优势
这种模式的优势如下:
1)MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要接收新连接,SubReactor 线程完成后续的业务处理。
2)MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传SubReactor 线程,SubReactor 线程无需返回数据。
3)多个 SubReactor 线程能够应对更高的并发请求。
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括 Nginx、Memcached、Netty 等。
这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
Netty
介绍
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供非阻塞的
、事件驱动
的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器
和客户端
程序。
- 本质:网络应用程序框架
- 实现:异步、事件驱动
- 特性:高性能、可维护、快速开发
- 用途:开发服务器和客户端
https://netty.io/
https://netty.io/wiki/user-guide-for-4.x.html#wiki-h2-3
关于异步
程同步、异步是相对的,在请求或执行过程中,如果会阻塞等待,就是同步操作,反之就是异步操作
Netty核心架构
核心:
-
可扩展的事件模型
-
统一的通信api,简化了通信编码
-
零拷贝机制与丰富的字节缓冲区
传输服务:
-
支持socket以及datagram(数据报)
-
http传输服务
-
In-VM Pipe (管道协议,是jvm的一种进程)
协议支持:
-
http 以及 websocket
-
SSL 安全套接字协议支持
-
Google Protobuf (序列化框架)
-
支持zlib、gzip压缩
-
支持大文件的传输
-
RTSP(实时流传输协议,是TCP/IP协议体系中的一个应用层协议)
-
支持二进制协议并且提供了完整的单元测试
网络通信框架为什么非得是Netty
-
Apache Mina:和Netty是同一作者,但是推荐Netty,作者认为Netty是针对Mina的重新打造版本,解决了一些问题并提高了扩展性
-
Sun Grizzly:用得少、文档少,更新少。
-
Apple Swift NIO、ACE 等:其他语言不作考虑
-
Cindy 等:生命周期不长
-
Tomcat、Jetty:还没有独立出来,另外他们有自己的网络通信层实现,是为了专门针对servelet容器而做的,不具备通用性。
- 那tomcat在网络通信层为什么不选择Netty呢?主要是由于tomcat出现的比较早
Netty现状如何
使用Netty的典型项目
- 数据库: Cassandra
- 大数据处理: Spark、Hadoop
- Message Queue:RocketMQ
- 检索: Elasticsearch
- 框架:gRPC、Apache Dubbo、Spring5(响应式编程WebFlux)
- 分布式协调器:ZooKeeper
- 工具类: async-http-clien
…
Netty对三种IO的支持
Netty中的Reactor实现
Netty线程模型是基于Reactor模型实现的,对Reactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构
模型。
工作流程
1)Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,每个线程池中都有EventLoop 线程(可以是OIO,NIO,AIO)。BossGroup中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写, EventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环
2)EventLoop 表示一个不断循环的执行事件处理的线程,每个EventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel)。
3)每个 Boss EventLoop 中循环执行以下三个步骤:
-
select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
-
processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个SocketChannel,并将其注册到某个 WorkerEventLoop 上的 Selector 上
-
runAllTasks:再去以此循环处理任务队列中的其他任务
4)每个 Worker EventLoop 中循环执行以下三个步骤:
-
select:轮训注册在其上的SocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
-
processSelectedKeys:在对应的SocketChannel 上处理 read/write 事件
-
runAllTasks:再去以此循环处理任务队列中的其他任务
5)在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
总结
1)Netty 的线程模型基于主从多Reactor模型。通常由一个线程负责处理OP_ACCEPT事件,拥有 CPU 核数的两倍的IO线程处理读写事件
2)一个通道的IO操作会绑定在一个IO线程中,而一个IO线程可以注册多个通道
3)在一个网络通信中通常会包含网络数据读写,编码、解码、业务处理。默认情况下网络数据读写,编码、解码等操作会在IO线程中运行,但也可以指定其他线程池。
4)通常业务处理会单独开启业务线程池(看业务类型),但也可以进一步细化,例如心跳包可以直接在IO线程中处理,而需要再转发给业务线程池,避免线程切换
5)在一个IO线程中所有通道的事件是串行处理的。
6)通常业务操作会专门开辟一个线程池,那业务处理完成之后,如何将响应结果通过 IO 线程写入到网卡中呢?业务线程调用 Channel对象的 write 方法并不会立即写入网络,只是将数据放入一个待写入缓存区,然后IO线程每次执行事件选择后,会从待写入缓存区中获取写入任务,将数据真正写入到网络中
Pipeline 和 Handler
ChannelPipeline & ChannelHandler
ChannelPipeline 提供了 ChannelHandler 链的容器。以服务端程序为例,客户端发送过来的数据要接收,读取处理,我们称数据是入站的,需要经过一系列Handler处理后;如果服务器想向客户端写回数据,也需要经过一系列Handler处理,我们称数据是出站的。
ChannelHandler 分类
对于数据的出站和入站,有着不同的ChannelHandler类型与之对应:
- ChannelInboundHandler 入站事件处理器
- ChannelOutBoundHandler 出站事件处理器
- ChannelHandlerAdapter提供了一些方法的默认实现,可减少用户对于ChannelHandler的编写
- ChannelDuplexHandler:混合型,既能处理入站事件又能处理出站事件。
(注意看:HeadContext既是入站处理器,又是出站处理器;TailContext仅作为入站处理器)
- inbound入站事件处理顺序(方向)是由链表的头到链表尾,outbound事件的处理顺序是由链表尾到链表头。
- inbound入站事件由netty内部触发,最终由netty外部的代码消费。
- outbound事件由netty外部的代码触发,最终由netty内部消费。