Buffer、Channel与Selector

理解同步不阻塞模型

Java NIO(同步不阻塞):服务器实现模式为一个线程处理多个请求(链接),即客户端发送的连接请求会注册到多路复用器上,多路复用器轮询连接有IO请求就进行处理。

2021-07-22-10-04-33

应该就是c++的epoll模式的java实现吧。

NIO有如下知识点:

  1. NIO相关类都被放在了java.nio包及子包下,并且对原java.io包中的很多类进行改写。

  2. NIO有三大核心部分:Channel、Buffer、Selector

  3. NIO是面向缓冲区的,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩网络

  4. Java NIO的非阻塞模式,是一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等它完全写入(写到了Buffer里),这个线程同时可以去做别的事情。

  5. 通俗理解:NIO是可以做到用一个线程来处理多个操作。假设有10000个请求过来,根据实际情况,可以分配50或者100个线程来处理。不像BIO那样,非得分配10000个。

  6. HTTP2.0使用了多路复用的技术,做到同一个链接并发处理多个请求,并且并发请求的数量比HTTP1.1大了好几个数量级。

NIO与BIO的比较

  1. BIO以流的方式处理数据,而NIO以块的方式处理数据块,块IO的效率比流IO高很多

  2. BIO是阻塞的,NIO是非阻塞的

  3. BIO基于字节流和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件(比如链接请求、数据到达等),因此使用单个线程就可以监听多个客户端通道。

Select、Channel、Buffer关系

三者关系如下:

  1. 每个Channel都会对应一个Buffer

  2. Selector对应一个线程,一个线程对应多个Channel

  3. 可以多个Channel注册到同一个Selector中

  4. 程序切换到哪个Channel是由事件决定的,Event就是一个重要的概念

  5. Selector会根据不同的事件,在各个通道上切换

  6. Buffer就是一个内存块,底层是有一个数组

  7. 数据的读取写入是通过Buffer,这个和BIO不一样,BIO中要么是数据流要么是输出流,不能双向,但是NIO的Buffer是可以读也可以写的,需要通过flip方法切换。

  8. Channel是双向的,可以返回底层操作系统的情况,比如Linux底层的操作系统通道就是双向的

缓冲区

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成一个容器对象(含数组),该对象提供了一组方法,可以轻松地使用内存块。缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态的变化情况。Channel提供从文件、网络读取数据的渠道,但是读取和写入的数据都必须经由Buffer。

Buffer类及其子类

  1. 在NIO中,Buffer是一个顶层父类,它是一个抽象类,常用的Buffer子类如下:

    • ByteBuffer
    • ShortBuffer
    • CharBuffer
    • IntBuffer
    • LongBuffer
    • DoubleBuffer
    • FloatBuffer

IntBuffer的一个简单的案例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17

public class Main {
    public static void main(String[] args) {
        IntBuffer intBuffer = IntBuffer.allocate(5);

        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i);
        }

        intBuffer.flip();

        while (intBuffer.hasRemaining()) {
            System.out.println(intBuffer.get());
        }
    }
}

  1. Buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息。

2021-07-22-10-29-48

属性的含义如下:

  • Capacity:可以容纳的最大数据量,在缓冲区创建时被设定并且不能改变
  • Limit:表示缓冲区的当前终点,不能会缓冲区超过Limit的位置进行读写操作,且Limit可以被修改
  • Position:下一个要被读或写的元素的索引,每次读写缓冲区数据都会改变此值,为下次读写做准备
  • Mark:标记
  1. Buffer中常用的方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

// capacity、position、limit的设置与获取
public final int capacity();
public final int position();
public final Buffer position(int newPosition);
public final int limit();
public final Buffer limit(int newLimit);

// 在当前的位置设置标记
public final Buffer mark();

// 将当前的位置设置为标记的位置
public final Buffer reset();

// 清除当前Buffer,即将各个标记恢复到初始状态,但是数据并没有真正擦除
public final Buffer clear();

// 翻转当前Buffer
public final Buffer flip();

// 将position设置为0并丢弃mark
public final Buffer rewind();

// 返回当前Buffer当前还有多少元素可读
public final int remaining();

// 判断当前Buffer是否有元素可读
public final boolean hasRemaining();

// 判断当前Buffer是否为只读缓冲区
public abstract boolean isReadOnly();

// 判断当前Buffer是否具有可访问的底层实现数组
public abstract boolean hasArray();

// 返回当前Buffer的底层实现数组
public abstract Object array();

// 返回当前Buffer的底层实现数组中第一个缓冲区元素的偏移量
public abstract int arrayOffset();

// 判断是否为直接缓冲区
public abstract boolean isDirect();

ByteBuffer中最常用的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

// 创建直接缓冲区
public static ByteBuffer allocateDirect(int capacity);

// 设置缓冲区的初始容量
public static ByteBuffer allocate(int capacity);

// 把一个数组放到缓冲区中使用
public static ByteBuffer wrap(byte[] array, int offset, int length);

// 构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array);

Channel相关知识

Channel的基础知识:

  1. NIO的Channel类似于流,但是有如下区别:

    • Channel可以同时进行读写,而流只能读或者只能写
    • Channel可以实现异步读写数据
    • Channel可以从缓冲区读数据,也可以写数据到缓冲区
  2. 常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel、SockerChannel。

  3. FileChannel用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。

理解ServerSocketChannel与SocketChannel

如下:

2021-07-22-10-52-48

FileChannel案例:

FileChannel写案例:

1
2
3
4
5
6
7
8
9

FileOutputStream fos = new FileOutputStream("tmp.txt");

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("Hello, World!".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();

fos.getChannel().write(byteBuffer);

FileChannel读案例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10

File file = new File("tmp.txt");
FileInputStream fis = new FileInputStream(file);

ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());

fis.getChannel().read(byteBuffer);

System.out.println(new String(byteBuffer.array()));

FileChannel复制文件的案例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

File file = new File("tmp.txt");
FileInputStream fis = new FileInputStream(file);
FileOutputStream fos = new FileOutputStream("tmp_copy.txt");

// 方案一
// ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
// fis.getChannel().read(byteBuffer);
// byteBuffer.flip();

// fos.getChannel().write(byteBuffer);

// 方案二
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
while (true) {
    byteBuffer.clear();
    int read = fis.getChannel().read(byteBuffer);
    if (read == -1) {
        break;
    }
    byteBuffer.flip();
    fos.getChannel().write(byteBuffer);
}

FileChannel使用transformTo、transformFrom进行复制:

1
2
3
4
5
6
7
8

File file = new File("tmp.txt");
FileInputStream fis = new FileInputStream(file);
FileOutputStream fos = new FileOutputStream("tmp_copy3.txt");

// fis.getChannel().transferTo(0, file.length(), fos.getChannel());
fos.getChannel().transferFrom(fis.getChannel(), 0, file.length());

Buffer和Channel的注意事项和细节

  1. ByteBuffer支持类型化的put和get,put放入什么类型,get就应该使用相应的数据类型来取出,否则可能有BufferUnderflowException异常
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

ByteBuffer buffer = ByteBuffer.allocate(64);

buffer.putInt(100);
buffer.putLong(100);
buffer.putChar('你');
buffer.putShort((short) 100);

buffer.flip();

System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());

  1. 可以将一个普通的Buffer转成只读的Buffer
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

ByteBuffer buffer = ByteBuffer.allocate(64);

for (int i = 0; i < 64; i++) {
    buffer.put((byte) i);
}

buffer.flip();

ByteBuffer byteBuffer = buffer.asReadOnlyBuffer();
System.out.println(byteBuffer.getClass());

  1. NIO还提供了MappedByteBuffer,可以让文件直接在内存(堆外的内存中进行修改),而如何同步到文件由NIO来完成。(实验中需要避免Idea的缓存对时间结果的观察)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20

RandomAccessFile raf = new RandomAccessFile("tmp.txt", "rw");
FileChannel channel = raf.getChannel();

/*
    参数一:FileChannel.MapMode.READ_WRITE,使用读写模式
    参数二:0:可以直接修改的起始位置
    参数三:5:映射到内存的大小(单位字节)
    */
MappedByteBuffer mbf = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);

mbf.put(0, (byte) 'H');
mbf.put(1, (byte) 'H');
mbf.put(2, (byte) 'H');
mbf.put(3, (byte) 'H');
mbf.put(4, (byte) 'H');

channel.close();
raf.close();

  1. NIO还支持通过多个Buffer(即Buffer数组)完成读写操作,即Scattering和Gatering(实验的过程中,因为Telnet配置的问题,体验并不是太好)
  • Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入
  • Gathering:从buffer读取数据时,可以采用buffer数组,依次读取
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(inetSocketAddress);

ByteBuffer[] byteBuffers = new ByteBuffer[]{
        ByteBuffer.allocate(5),
        ByteBuffer.allocate(3)};

SocketChannel socketChannel = serverSocketChannel.accept();

// 客户端只发行8个字节(协商)
int messageLength = 8;
while (true) {
    // 读取数据
    int byteRead = 0;
    while (byteRead < messageLength) {
        long bytes = socketChannel.read(byteBuffers);
        byteRead += bytes;
        Arrays.asList(byteBuffers).forEach(buffer -> {
            System.out.printf("position=%4d, limit=%4d%n", buffer.position(), buffer.limit());
        });
    }

    Arrays.asList(byteBuffers).forEach(Buffer::flip);

    // 回显数据
    long byteWrite = 0;
    while (byteWrite < messageLength) {
        long bytes = socketChannel.write(byteBuffers);
        byteWrite += bytes;
    }


    Arrays.asList(byteBuffers).forEach(Buffer::clear);
}

Selector

  1. Netty的IO线程NioEventLoop聚合了Selector,可以同时并发处理成百上千个客户端连接。

  2. Selector中的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18

// 得到一个选择器对象
public static Selector open();


public abstract int selectNow() throws IOException;

// 监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectionKey加入到内部集合中并返回
// 参数用来设置超时时间
public abstract int select() throws IOException;
public abstract int select(long timeout) throws IOException;

// 从内部集合中得到所有的SelectionKey
public abstract Set<SelectionKey> selectedKeys();

// 唤醒Selector(不知道用于什么场景)
public abstract Selector wakeup();

  1. SelectionKey中的方法:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

// 得到与之关联的Selector
public abstract Selector selector();

// 得到与之关联的Channel
public abstract SelectableChannel channel();

// 设置或改变监听事件
public abstract int interestOps();
public abstract SelectionKey interestOps(int ops);

// 是否可以accept、read、write
public final boolean isAcceptable();
public final boolean isReadable();
public final boolean isWritable();

  1. ServerSocketChannel中的方法:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17

// 得到一个ServerSocketChannel通道
public static ServerSocketChannel open();

// 设置服务器端口号
public final ServerSocketChannel bind(SocketAddress local);
public abstract ServerSocketChannel bind(SocketAddress local, int backlog);

// 接受一个链接,返回达标这个链接的通道对象
public abstract SocketChannel accept();

// 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public final SelectableChannel configureBlocking(boolean block);

// 注册一个选择器并设置监听事件
public final SelectionKey register(Selector sel, int ops, Object att);

  1. SocketChannel中的方法:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17

// 链接服务器
public abstract boolean connect(SocketAddress remote);

// 如果connect方法链接失败,接下来就要通过该方法完成链接操作(不明所以)
public abstract boolean finishConnect();

// 从通道读数据
public abstract int read(ByteBuffer dst);
public final long read(ByteBuffer[] dsts);
public abstract long read(ByteBuffer[] dsts, int offset, int length);

// 往通道写数据
public abstract int write(ByteBuffer src);
public final long write(ByteBuffer[] srcs);
public abstract long write(ByteBuffer[] srcs, int offset, int length);

  1. Selector、SelectionKey、ServerSocketChannel、SocketChannel关系梳理:

    • 当客户端连接时,会通过ServerSocketChannel得到SocketChannel

    • 将socketChannel注册到Selector上,register(Selector selector,int ops),一个selector上可以注册多个SocketChannel

    • 注册后返回一个SelectionKey,会和该Selector关联(集合)

    • Selector进行监听select方法,返回有事件发生的通道的个数

    • 进一步得到个SelectionKey(有事件发生)

    • 在通过SelectionKey反向获取SocketChannel(channel方法)

    • 可以通过得到的channel,完成业务处理

AIO简介

  1. 在IO编程中,常用的两种模式:Reactor和Proactor,Java的NIO就是Reactor,当有事件触发时,服务端得到通知,进行相应的处理。

  2. AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先有操作系统完成后才通知服务端程序启动线程去处理,一般适用于链接数较多且连接时间较长的应用。

  3. AIO还没有广泛的应用,Netty也是基于NIO而不是AIO。

零拷贝

(暂时不整理了,核心在于transformTo、transformFrom就是零拷贝方法)