第一章 IO基础模型
1.Linux网络 I/O 模型
-
阻塞IO模型
-
非阻塞IO模型
应用层到内核,如果缓存区没有数据 则直接返回错误
- I/O复用模型
- 信号驱动I/O模型
- 异步I/O
2.IO多路复用
第二章 NIO入门
2.1 BIO模型
BIO服务端通讯模型,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求后为每一个客户端创建一个新的线程进行链路处理
缺点:当并发量增大,系统会发现线程堆栈溢出,创建线程失败等问题
2.2 伪异步I/O编程
伪异步IO:将客户端的Socket封装成一个Task投递到后端的线程池中进行处理,JDK的线程池维护一个队列和N个活跃线程。
优点:资源占用是可控的,无论多少个并发,都不会造成资源的耗尽和宕机
弊端:
NIO:即非阻塞IO
原理:NIO 是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题,应用的扩展能力有了非常大的提高。
NIO服务端序列图:
NIO客户端序列图:
优点:
1、客户端发起的连接操作都是异步,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
2、SocketChannel 读写都是异步的,如果没有可读数据它不会异步等待,直接返回。
3、线程模型的优化。由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制
2.4 AIO编程
NIO 2.0引入新的异步通道,并提供了异步文件通道和异步套接字通道的实现
*CompletionHandler* 接口的实现类作为操作完成的回调
NIO2.0 的异步套接字通道是真正的异步非阻塞I/O
2.5 4中I/O的对比
1、异步非阻塞I/O
NIO2.0 添加了异步的套接字通道 真正实现了异步I/O
2、多路复用器Selector
Java NIO 的核心是多路复用I/O技术,多路复用的核心是通过在Selector中轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行I/O操作。
3、伪异步I/O
通过在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离I/O线程和业务间的直接访问,这样业务线程就不会被I/O线程堵塞。
第三章 Netty入门应用
3.1 Netty服务器开发
/**
* 基于netty的时间服务器
*
* @author 陈一锋
* @date 2021/1/6 14:09
**/
public class TimeServer {
public static void main(String[] args) throws Exception {
int port = 9094;
new TimeServer().bind(port);
}
private void bind(int port) throws Exception {
//配置服务器NIO线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
//绑定端口
ChannelFuture f = b.bind(port).sync();
//等待服务器监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
}
/**
* @author 陈一锋
* @date 2021/1/6 14:17
**/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req, UTF_8);
System.out.println("The time server receive order:" + body);
String currTime = "QUERY THE ORDER".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString() :
"BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
3.2 客户端开发
/**
* @author 陈一锋
* @date 2021/1/6 14:43
**/
public class TimeClient {
public static void main(String[] args) throws Exception {
int port = 9094;
new TimeClient().connect(port, "127.0.0.1");
}
private void connect(final int port, final String host) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class TimeClientHandler extends ChannelInboundHandlerAdapter {
public final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName());
private final ByteBuf firstMessages;
private TimeClientHandler() {
byte[] req = "QUERY THE ORDER".getBytes();
firstMessages = Unpooled.buffer(req.length);
firstMessages.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//这里是建立TCP连接后发送消息
ctx.writeAndFlush(firstMessages);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, UTF_8);
System.out.println("Now is:" + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.warning("Unexpected exception:" + cause.getMessage());
ctx.close();
}
}
}
第4章 TCP粘包/拆包问题的解决之道
4.1.1 TCP粘包/拆包问题说明
4.1.2发生的原因:
1、应用程序write写入的字节大小大于套接口发送缓存区大小。
2、进行MSS大小的TCP分段
3、以太网帧的payload大于MTU进行IP分片
4.1.3 粘包问题的解决策略
4.3 利用LineBasedFrameDecoder解决粘包问题
服务器
public static void main(String[] args) throws Exception {
int port = 9095;
new TimeServer().bind(port);
}
private void bind(int port) throws Exception {
//配置服务器NIO线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//绑定端口
ChannelFuture f = b.bind(port).sync();
//等待服务器监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加这两个解码器 LineBasedFrameDecoder StringDecoder
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
}
/**
* 消息处理器
*/
class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//这种情况会发送粘包 拆包
// ByteBuf byteBuf = (ByteBuf) msg;
// byte[] req = new byte[byteBuf.readableBytes()];
// byteBuf.readBytes(req);
// String body = new String(req, UTF_8).substring(0, req.length - System.getProperty("line.separator").length());
//使用编码器后接收的为msg就是删除回车换行符后的请求消息
String body = (String) msg;
System.out.println("The time server receive order:" + body + ";the counter is :" + ++count);
String currTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString() :
"BAD ORDER";
currTime = currTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
/**
* @author 陈一锋
* @date 2021/1/6 14:43
**/
public class TimeClient {
public static void main(String[] args) throws Exception {
int port = 9095;
new TimeClient().connect(port, "127.0.0.1");
}
private void connect(int port, String host) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//这里添加解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
//发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class TimeClientHandler extends ChannelInboundHandlerAdapter {
public final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName());
private byte[] req;
private int counter;
private TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf messages = null;
for (int i = 0; i < 100; i++) {
messages = Unpooled.buffer(req.length);
messages.writeBytes(req);
ctx.writeAndFlush(messages);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//注释的为发生 粘包拆包时的代码
// ByteBuf buf = (ByteBuf) msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, UTF_8);
String body = (String) msg;
System.out.println("Now is:" + body + "; the counter is :" + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.warning("Unexpected exception:" + cause.getMessage());
ctx.close();
}
}
}
LineBaseFrameDecoder工作原理时它依次遍历ByteBuf中的可读字节,判断是否有“\n” "\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。
StringDecoder功能非常简单,将接受i到的对象转化成字符串,然后调用后面的Handler。
LineBaseFrameDecoder 和SringDecoder 组合就是换行切换的文本解码器。
第五章 分隔符和定长解码器的应用
TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,往往采用如下4种方式。
- 消息长度固定,累计读到长度总和为定长LEN的报文后,就认为读取到了一个完整的消息;将计数器置位,重新开始读取下一个数据报。
- 将回车换行符作为消息结束符,例如FTP协议。
- 将特殊的分隔符作为消息的结束标志。
- 通过在消息头中定义长度字段来标识消息的总长度。
5.1 DelimiterBasedFrameDecoder应用开发
通过对DelimiterBasedFrameDecoder,我们可以自动完成以分隔符为码流结束标识的消息的解码。
EchoServer服务器开发
public class EchoServer {
public static final int PORT = 9096;
public static void main(String[] args) throws Exception {
new EchoServer().bind(PORT);
}
public void bind(int port) throws Exception {
//配置线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
//假如注释掉这个解码器 则会出现TCP粘包问题
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//此为固定长度解码器 如果消息为固定 则可以使用此解码器
//ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口
ChannelFuture f = b.bind(port).sync();
//等待服务器监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class EchoServerHandler extends ChannelInboundHandlerAdapter {
int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is " + ++counter + " times receives client:[" + body + "]");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
客户端开发
/**
* @author 陈一锋
* @date 2021/1/7 11:48
**/
public class EchoClient {
public static void main(String[] args) throws Exception {
new EchoClient().connect(EchoServer.PORT, "127.0.0.1");
}
private void connect(int port, String host) throws Exception {
//创建线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
//定长解码器
//ch.pipeline().addLast(new FixedLengthFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
static final String ECHO_REQ = "Hi,cyf.Welcome to Netty.$_";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("This is " + ++counter + " times receiver server:[" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
第六章 编解码技术
6.1 Java序列化的缺点
- 无法跨语言。java序列化后的字节数组,别的语言无法进行反序列化。
- 序列化后的码流太大。
- 序列化性能太低
java序列化对比ByteBuffer通用二进制解码技术
/**
* @author 陈一锋
* @date 2021/1/7 14:13
**/
public class TestUserInfo {
public static void main(String[] args) {
UserInfo user = new UserInfo();
user.buildUserId(11).buildUsername("welcome to netty");
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(out);) {
objectOutputStream.writeObject(user);
objectOutputStream.flush();
byte[] b = out.toByteArray();
// jdk serialize result length is 124
System.out.println("jdk serializable length is:" + b.length);
} catch (IOException e) {
e.printStackTrace();
}
// byteBuffer result is 24
System.out.println("the byte array serializable length is:" + user.codeC().length);
}
}
@Getter
@Setter
public class UserInfo implements Serializable {
private String userName;
private int userId;
public UserInfo buildUsername(String userName) {
this.userName = userName;
return this;
}
public UserInfo buildUserId(int userId) {
this.userId = userId;
return this;
}
/**
* 编码
*
* @return /
*/
public byte[] codeC() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
byte[] value = this.userName.getBytes();
buffer.putInt(value.length);
buffer.put(value);
buffer.putInt(this.userId);
buffer.flip();
value = null;
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
}