NIO与Netty初识

1 Java NIO

在NIO 中有几个核心对象需要掌握:缓冲区(Buffer)、选择器(Selector)、通道(Channel)。

1.1 缓冲区–Buffer

1.1.1 基本API

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组,在NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,它也是写入到缓冲区中的;任何时候访问NIO 中的数据,都是将它放到缓冲区中。而在面向流I/O 系统中,所有数据都是直接写入或者直接将数据读取到Stream 对象中。在NIO 中,所有的缓冲区类型都继承于抽象类Buffer,最常用的就是ByteBuffer,对于Java 中的基本类型,基本都有一个具体Buffer 类型与之相对应。下面是一个简单的使用IntBuffer 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class IntBufferDemo {
public static void main(String[] args){
//分配新的int缓冲区,参数为缓冲区容量
//新缓冲区的当前位置为0,其界限(限制位置)为其容量,有一个底层实现数组,偏移量为0
IntBuffer buffer = IntBuffer.allocate(8);

for(int i = 0; i < buffer.capacity(); ++i){
int j = 2 * (i + 1);
//将给定整数写入此缓冲区的当前位置,当前位置递增
buffer.put(j);
}
//重设此缓冲区,将限制设置为当前位置,然后将当前位置设置为0
buffer.flip();
while (buffer.hasRemaining()){
//读取此缓冲区当前位置的整数,然后当前位置递增
int j = buffer.get();
System.out.println(j + " ");
}
}
}

1.1.2 Buffer的基本原理

缓冲区其实是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况,如果使用get()方法从缓冲区获取数据或者使用put()方法把数据写入缓冲区,都会引起缓冲区状态的变化。

在缓冲区中,最重要的属性有下面三个,它们一起合作完成对缓冲区内部状态的变化跟踪:

position:指定下一个将要被写入或者读取的元素索引,它的值由get()/put()方法自动更新,在新创建一个Buffer 对象时,position 被初始化为0。

limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。

capacity:指定了可以存储在缓冲区中的最大数据容量,实际上,它指定了底层数组的大小,或者至少是指定了准许我们使用的底层数组的容量。

以上三个属性值之间有一些相对大小的关系:0 <= position <= limit <= capacity。如果创建一个新的容量大小为10 的ByteBuffer 对象,在初始化的时候,position 设置为0,limit 和capacity 被设置为10,在以后使用ByteBuffer对象过程中,capacity 的值不会再发生变化,而其它两个将会随着使用而变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class BufferDemo {
public static void main(String[] args) throws Exception {
FileInputStream fin = new FileInputStream("E://test.txt");
//创建文件的操作管道
FileChannel fc = fin.getChannel();

//分配一个10个大小缓冲区,就是分配了一个10个大小的byte数组
ByteBuffer buffer = ByteBuffer.allocate(10);
output("初始化", buffer);
//先读一下
fc.read(buffer);
output("调用read()", buffer);
//准备操作之前,先锁定操作范围
buffer.flip();
output("调用flip()", buffer);
//判断有没有可读数据
while (buffer.remaining() > 0){
byte b = buffer.get();
}
output("调用get()", buffer);

//可以理解为解锁
buffer.clear();
output("调用clear()", buffer);

fin.close();
}

public static void output(String step, Buffer buffer){
System.out.println(step + " : ");
//容量,数组大小
System.out.print("capacity: " + buffer.capacity() + ", ");
//当前操作数据所在的位置,也可以叫做游标
System.out.print("position: " + buffer.position() + ", ");
//锁定值,flip,数据操作范围索引只能在position - limit之间
System.out.println("limit: " + buffer.limit());
System.out.println();
}
}

对以上运行代码进行图解,四个属性值分别如图所示:

可以从通道中读取一些数据到缓冲区中,注意从通道读取数据,相当于往缓冲区中写入数据。如果读取4 个字节的数据,则此时position 的值为4,即下一个将要被写入的字节索引为4,而limit 仍然是10,如下图所示:

下一步把读取的数据写入到输出通道中,相当于从缓冲区中读取数据,在此之前,必须调用flip()方法,该方法将会完成两件事情:

​ 1. 把limit 设置为当前的position 值

​ 2. 把position 设置为0

由于position 被设置为0,所以可以保证在下一步输出时读取到的是缓冲区中的第一个字节,而limit 被设置为当前的position,可以保证读取的数据正好是之前写入到缓冲区中的数据,如下图所示:

现在调用get()方法从缓冲区中读取数据写入到输出通道,这会导致position 的增加而limit 保持不变,但position 不会超过limit 的值,所以在读取之前写入到缓冲区中的4 个字节之后,position 和limit 的值都为4,如下图所示:

在从缓冲区中读取数据完毕后,limit 的值仍然保持在调用flip()方法时的值,调用clear()方法能够把所有的状态变化设置为初始化时的值,如下图所示:

1.1.3 缓冲区的分配

在创建一个缓冲区对象时,会调用静态方法allocate()来指定缓冲区的容量,其实调用allocate()相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。或者也可以直接将一个现有的数组,包装为缓冲区对象,如下示例代码所示:

1
2
3
4
5
6
7
8
public class BufferWrap{
public void myMethod(){
ByteBuffer buffer1 = ByteBuffer.allocate(10);

byte[] array = new byte[10];
ByteBuffer buffer2 = ByteBuffer.wrep(array);
}
}

1.1.4 缓冲区分片

在NIO 中,除了可以分配或者包装一个缓冲区对象外,还可以根据现有的缓冲区对象来创建一个子缓冲区,即在现有缓冲区上切出一片来作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的,也就是说,子缓冲区相当于是现有缓冲区的一个视图窗口。调用slice()方法可以创建一个子缓冲区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class BufferSlice{
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);

//缓冲区中的数据是0-9
for (int i = 0; i< buffer.capacity(); ++i){
buffer.put((byte) i);
}
//创建子缓冲区
buffer.position(3);
buffer.limit(7);
ByteBuffer slice = buffer.slice();
//改变子缓冲区的内容
for(int i = 0; i < slice.capacity(); ++i){
byte b = slice.get(i);
b *= 10;
slice.put(i, b);
}
buffer.position(0);
buffer.limit(buffer.capacity());
while(buffer.remaining() > 0){
System.out.println(buffer.get());
}
}
}

在该示例中,分配了一个容量大小为10 的缓冲区,并在其中放入了数据0-9,而在该缓冲区基础之上又创建了一个子缓冲区,并改变子缓冲区中的内容,从最后输出的结果来看,只有子缓冲区“可见的”那部分数据发生了变化,并且说明子缓冲区与原缓冲区是数据共享的。

1.1.5 只读缓冲区

只读缓冲区可以读取,但是不能写入数据。可以通过调用缓冲区的asReadOnlyBuffer()方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReadOnlyBuffer {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(10);
//缓冲区中的数据是0-9
for (int i = 0; i< buffer.capacity(); ++i){
buffer.put((byte) i);
}
//创建只读缓冲区
ByteBuffer readonly = buffer.asReadOnlyBuffer();
//改变原缓冲区的内容
for(int i = 0; i < buffer.capacity(); ++i){
byte b = buffer.get(i);
b *= 10;
buffer.put(i, b);
}
readonly.position(0);
readonly.limit(buffer.capacity());
//只读缓冲区的内容也随之改变
while(readonly.remaining() > 0){
System.out.println(readonly.get());
}
}
}

如果尝试修改只读缓冲区的内容,则会报ReadOnlyBufferException 异常。只读缓冲区对于保护数据很有用。在将缓冲区传递给某个对象的方法时,无法知道这个方法是否会修改缓冲区中的数据。创建一个只读的缓冲区可以保证该缓冲区不会被修改。只可以把常规缓冲区转换为只读缓冲区,而不能将只读的缓冲区转换为可写的缓冲区。

1.1.6 直接缓冲区

直接缓冲区是为加快I/O 速度,使用一种特殊方式为其分配内存的缓冲区,JDK 文档中的描述为:给定一个直接字节缓冲区,Java虚拟机将尽最大努力直接对它执行本机I/O 操作。也就是说,它会在每一次调用底层操作系统的本机I/O 操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中或者从一个中间缓冲区中拷贝数据。要分配直接缓冲区,需要调用allocateDirect()方法,而不是allocate()方法,使用方式与普通缓冲区并无区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DirectBuffer{
public static void main(String[] args) throws Exception{
String infile = "E://test.txt";
FileInputStream fin = new FileInputStream(infile);
FileChannel fcin = fin.getChannel();

//把刚才读取的内容写入到一个新的文件中
String outfile = String.format("E://testcopy.txt");
FileOutputStream fout = new FileOutputStream(outfile);
FileChannel fcout = fout.getChannel();

//使用allocateDirect
ByteBuffer buffer = ByteBuffere.allocateDirect(1024);
whilt(true){
buffer.clear();
int r = fcin.read(buffer);
if(r==-1){
break;
}
buffer.flip();
fcout.write(buffer);
}
}
}

1.1.7 内存映射

内存映射是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的I/O 快的多。内存映射文件I/O 是通过使文件中的数据出现为内存数组的内容来完成的,这其初听起来似乎不过就是将整个文件读到内存中,但是事实上并不是这样。一般来说,只有文件中实际读取或者写入的部分才会映射到内存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MappedBuffer{
static private final int start = 0;
static private final int size = 1024;

public static void main(String[] args) throws Exception {
RandomAccessFile raf = new RandomAccessFile("E://test.txt", "rw");
FileChannel fc = raf.getChannel();

//把缓冲区个文件系统进行一个映射关联
//只要操作缓冲区中的内容,文件内容也会随着改变
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, start, size);
mbb.put(0, (byte) 97);
mbb.put(1023, (byte) 122);
raf.close();
}
}

1.2 选择器–Selector

传统的 Server/Client 模式会基于 TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理 一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会增大服务器的开销。大多数的实现为了避免这个问题, 都采用了线程池模型,并设置线程池线程的最大数量,这又带来了新的问题,如果线程池中有 200 个线程,而有 200 个用户都在进行大文件下载,会导致第 201 个用户的请求无法及时处理,即便第 201 个用户只想请求一个几 KB 大小的页面。传统的 Server/Client 模式如下图所示:

NIO 中非阻塞 I/O 采用了基于 Reactor 模式的工作方式,I/O 调用不会被阻塞,相反是注册感兴趣的特定 I/O 事件,如可读数据到达,新的套接字连接等等,在发生特定事件时,系统再通知。NIO 中实现非阻塞 I/O 的核心对象就是 Selector,Selector 就是注册各种 I/O 事件地方,而且当那些事件发生时,就是这个对通知所发生的事件,如下图所示:

从图中可以看出,当有读或写等任何注册的事件发生时,可以从Selector中获得相应的SelectionKey,同时从SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:

  1. 向 Selector对象注册感兴趣的事件。

  2. 从 Selector中获取感兴趣的事件。

  3. 根据不同的事件进行相应的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
//向Selector对象注册感兴趣的事件
private Selector getSelector() throws IOException {
//创建Selector对象
Selector sel = Selector.open();
//创建可选择通道,并配置为非阻塞模式
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
//绑定通道到指定端口
ServerSocket socket = server.socket();
InetSocketAddress address = new InetSocketAddress(port);
socket.bind(address);
//向Selector中注册感兴趣的事件
server.register(sel, SelectionKey.OP_ACCEPT);
return sel;
}

//从Selector中获取感兴趣的事件,即开始监听,进入内部循环
public void listen() {
System.out.println("listen on: " + port);
try {
while(true){
//该调用会阻塞,直到至少有一个事件发生
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey) iter.next();
iter.remove();
process(key);
}
}
} catch (IOException e){
e.printStackTrace();
}
}

//根据不同的事件,编写相应的处理代码
private void process(SelectionKey key) throws Exception{
//接收请求
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
//读请求
else if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
int len = channel.read(buffer);
if (len > 0){
buffer.flip();
contest = new String(buffer.array(), 0, len);
SelectionKey sKey = channel.register(selector, SelectionKey_OP_WRITE);
} else {
channel.close();
}
buffer.clear();
}
//写事件
else if(key.isWritable()){
SocketChannel channel = (SocketChannel) key.channel();
String content = (String) key.attachment();
ByteBuffer block = ByteBuffer.wrap(("输出内容:" + content).getBytes());
if(block != null){
channel.write(block);
} else {
channel.close();
}
}
}

1.3 通道–Channel

通道是一个对象,通过它可以读取和写入数据,当然了所有数据都通过 Buffer 对象来处理。永远不会将字节直接写入通道中,相反是将数据写入包含一个或者多个字节的缓冲区。同样不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

1.3.1 读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class FileInputDemo{
public static void main(String[] args) throws Exception{
FileInputStream fin = new FileInputStream("E://test.txt");
//获取通道
FileChannel fc = fin.getChannel();
//创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取数据到缓冲区
fc.read(buffer);
buffer.flip();
while(buffer.remaining() > 0){
byte b = buffer.get();
System.out.println((char) b);
}
fin.close();
}
}

1.3.2 写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class FileOutputDemo {
private static final byte message[] = {83,11,109,101,323,98,121,116,101,116,46};

public static void main(String[] args) throws Exception{
FileOutputStream fout = new FileOutputStream("E://test.txt");
FilChannel fc = fout.getChannel();
ByteBUffer buffer = ByteBuffer.allocate(1024);
for(int i = 0; i < message.length; ++i){
buffer.put(message[i]);
}
buffer.flip();
fc.write(buffer);
fout.close();
}
}

1.3.3 IO多路复用

目前流行的多路复用 IO 实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较:

IO模型 相对性能 关键思路 操作系统 JAVA支持
select 较高 Reactor windows/linux 支持,Reacotr模式(反应器设计模式)。Linux操作系统的kernels2.4内核版本之前,默认使用select,而目前windows下对同步IO的支持,都是select模型。
poll 较高 Reactor linux Linux下的JAVA NIO模型,Linux kernels 2.6内核版本之前使用poll进行支持,也是使用的Reactor模型。
epoll Reactor/Proactor linux linux kernels2.6内核版本及以后使用epoll进行支持,由于Linux下没有windows下的IOCP技术提供真正的异步IO支持,所以Linux下使用epoll模拟异步IO。
kqueue Proactor linux 目前JAVA版本不支持。

select/poll模型:

epoll模型

2 NIO源码

3 Netty初识

按照定义来说,Netty是一个异步、事件驱动的用来做高性能、高可靠性的网络应用框架。主要的优点:

  1. 框架设计优雅,底层模型随意切换适应不同的网络协议要求。

  2. 提供很多标准的协议、安全、编码解码的支持。

  3. 解决了很多NIO不易用的问题。

  4. 社区更为活跃,在很多开源框架中使用,如Dubbo、RocketMQ、Spark等。

Netty采用NIO而非AIO的理由

1.Netty不看重Windows上的使用,在Linux系统上,AIO的底层实现仍使用EPOLL,没有很好实现AIO,因此在性能上没有明显的优势,而且被JDK封装了一层不容易深度优化

2.Netty整体架构是reactor模型, 而AIO是proactor模型, 混合在一起会非常混乱,把AIO也改造成 reactor模型看起来是把epoll绕个弯又绕回来

3.AIO 还有个缺点是接收数据需要预先分配缓存, 而不是NIO那种需要接收时才需要分配缓存, 所以对连接数量非常大但流量小的情况, 内存浪费很多

4.Linux上AIO不够成熟,处理回调结果速度跟不到处理需求,比如外卖员太少,顾客太多,供不应求,造成处理速度有瓶颈


NIO与Netty初识
http://www.zivjie.cn/2023/07/16/网络通信/netty/NIO与Netty初识/
作者
Francis
发布于
2023年7月16日
许可协议