Channel和ChannelPipeline

Channel的声明周期

Channel的生命周期分为如下状态:

  1. ChannelUnregistered:Channel被创建还未注册到EventLoop。
  2. ChannelRegisterted:Channel已经注册到EventLoop。
  3. ChannelActive:Channel处于活动状态(已经连接到它的远程节点),可以接收和发送数据了。
  4. ChannelInActive:Channel没有连接到远程节点。

当这些状态发生变化时,将会产生对应的事件。这些事件会被转发给ChannelHandler。

ChannelHandler的生命周期

如下为ChannelHandler定义的生命周期操作,当ChannelHandler被添加到ChannelPipeline或者从ChannelPipeline中移除时会调用这些操作:

  1. handlerAdded:当把ChannelHandler添加ChannelPipeline时调用
  2. handlerRemoved:当把ChannelHandler移除ChannelPipeline时调用
  3. exceptionCaught:处理中有异常时调用

ChannelInboundHandler

ChannelInboundHandler的生命周期方法:

  1. channelRegistered
  2. channelUnregistered
  3. channelActive
  4. channelInactive
  5. channelReadComplete:当Channel上的一个读操作完成时被调用(如何理解)
  6. channelRead:当从Channel读取数据时被调用
  7. channelWritabilityChanged(如何理解)
  8. userEventTriggered(如何理解)

对channelReadComplete的理解:当所有可读的字节都已经从Channel中读取之后,将会调用该回调方法。所以在channelReadComplete被调用之前能看到多次channelRead的调用。

理解channelWritabilityChanged

当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。可以通过调用Channel的isWritable方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置。

(需要结合案例理解)

理解userEventTriggered

当ChannelInboundHandler.fireUserEventTriggered()方法被调用时调用,因为一个POJO被传经了ChannelPipeLine。

(需要结合案例理解)

显示释放和池化

当实现ChannelInBoundHandler时并重写channelRead方法时,需要在方法中显示释放并池化ByteBuf实例相关的内容:

1
2
3
4
5

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ReferenceCountUtil.release(msg);
}

Netty使用WARN级别的日志记录未释放的资源。

ChannelOutboundHandler

ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法类处理请求。例如,如果到远程节点的写入被暂停了,可以推迟冲刷操作并在稍后继续。

  • bind:
  • connect:
  • disconnect:
  • close:
  • deregister:
  • read:
  • flush:
  • write:

在书中找到的ChannelOutboundHandler案例:

1
2
3
4
5
6
7
8
9

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ReferenceCountUtils.release(msg);
        promise.setSuccess();
    }
}

ChannelPromise与ChannelFuture

ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类,其定义了一些可写的方法,如setSuccess和setFailure,从而是ChannelFuture不可变。

ChannelHandler适配器

在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法体现了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到ChannelPipeline中的下一个ChannelHandler中

(什么意思呀)

资源管理

Netty挺了ResourceLeakDetector来诊断潜在的资源泄漏问题。

(目前没有接触过)

1
2
3
4
5
6
7
8
9

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        ReferenceCountUtils.release(msg);
        promise.setSuccess();
    }
}

重要的是不仅要释放资源,还要通知ChannelPromise,否则可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的情况。

总之,如果一个消息被消费了或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountutils.release()。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

ChannelPipeline

每个新创建的Channel都会被分配一个新的ChannelPipeline。这项关联时永久性的,Channel既不能附加另外一个ChannelPipeline也不能分离器当前的。在Netty组件中的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后通过调用ChannelHandlerContext实现,它将被转发给统一超类型的下一个ChannelHandler。

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态的修改所属的ChannelPipeline。ChannelHandlerContext具有丰富的处理事件和执行IO操作的API。

ChannelPipeline提供了通过ChannelPipeline本身传播事件的方法,如果一个入站事件被触发,它将被从ChannelPipelien的头部一直被传播到ChannelPipeline的尾端。(怎么通过ChannelPipeline制造一个入站事件?)

ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的传播方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个(这个测试怎怎么进行的呢),直到它找到和该事件所期望的方向相匹配的为止。如果我直接实现ChannelHandler接口,这个判断工作又是怎么进行的呢?

ChannelHandler可以通过添加、删除或者替换其他的ChannelHandler来实时地修改ChannelPipeline的布局(它也可以将它自己从ChannelPipeline中移除)。这是ChannelHandler最重要的能力之一。

  • addFirst
  • addLast
  • addBefore
  • addAfter
  • remove
  • replace

重组ChannelHandler的这种能力使我们可以用它来轻松地实现及其灵活的逻辑。

ChannelPipeline用于方法ChannelHandler的操作:

  1. get:通过类型或者名称返回ChannelHandler
  2. context:返回和ChannelHandler绑定的ChannelHandlerContext
  3. names:返回ChannelPipeline中所有的ChannelHandler的名称

(能不能通过上面的技术实现跳channelHandler发送消息,感觉应该不行)

ChannelHandler的执行与阻塞

通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop来处理传递给它的事件,所以至关重要的是不要阻塞这个线程,因为这会对整体的IO处理产生负面的影响。

但是有时候需要与那些使用阻塞API的遗留代码进行交互,对于这种情况,ChannelPiepline有一些接受EventExecutorGroup的add方法。如果一个时间被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本生的EventLoop中移除。

(需要看一下案例代码)

触发事件

入站操作,同于通知ChannelInboundHandler在ChannelPipeline中所发生的事件:

fireChannelRegistered
fireChannelUnregistered
fireChannelActive
fireChannelInactive
fireChannelExceptionCaught
fireChannelEventTriggered
fireChannelChannelRead
fireChannelChannelReadComplete
fireChannelChannelWritabilityChanged

出站发生的事件:

  • bind
  • connect
  • disconnect
  • close
  • deregister
  • flush
  • wirte
  • writeAndFlush
  • read

我为什么对出站事件这么陌生,很正常,目前这方面接触的还非常的少。

ChannelHandlerContext

每当有ChannelHandlr添加到ChannelPipeline,就会创建一个ChannelHandlerContext。ChannelContextHandler的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler交互。

ChannelHandlerContext有很多方法,其中有一些方法也存在于Channel和ChannelPipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播,而调用位于ChannelHandlerContext上相同的方法,则将从所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler。

bind:绑定到给定的SocketAddress,并返回ChannelFuture(为什么能在ChannelHandler中进行绑定,很奇怪)
connect:同样的,不是很好理解这个API

从某个特定的ChannelHandler开始处理

为什么要从ChannelPipeline中的某个特定点开始传播事件?

  1. 为了减少将事件传经对它不感兴趣的ChannelHandler所带来的开销
  2. 为了避免将事件传经那些可能会对它感兴趣的ChannelHandler

要想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在该ChannelHandler之前的ChannelHandler所关联的ChannelHandlerContext。这个ChannelHandlerContext将调用和它关联的ChannelHandler之后的ChannelHandler。

(貌似有点复杂哦,不知道能玩出怎样的花)

一些奇怪的理解

1
2
3
4

ChannelHandlerContext ctx = ...;
ctx.write(Unpooled.copiedBuffer("Netty In Action", CharsetUtil.UTF_8))

2021-08-10-18-11-18

从图中可以看到,第二个ChannelHandler调用了write后,消息并不是往左走,而是往右走,这个和我的理解是有冲突的。

(这部分书中的内容有问题,在当前分类下可以找到我实验的代码)

高级用法

可以缓存ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler方法之外,甚至来自不同的线程。

一个ChannelHandler可以属于多个ChannelPipeline,所以它们可以绑定到多个ChannelHandlerContext实例。用于这种用法的ChannelHandlr必须要使用@Shareable注解,否则,试图将它添加到多个ChannelPipeline时将会触发异常。

在多个ChannelPipeline中安装同一个ChannleHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

异常处理

处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制:

  1. 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener将在操作完成时被通知该操作是成功了还是出错了。

  2. 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例,作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。但是ChannelPromise还具有立即通知的可写方法:

1
2
3
4

ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

如下代码使用这种方式来添加ChannelFutureListener:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12

ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f){
        if(!f.isSuccess()){
            f.cause().printStackTrace();
            f.channel().close();
        }
    }
})

第二种方式是将ChannelFutureListener添加到即将作为参数传递给ChannelOutboundHandler的方法的ChannelPromise(这是说ChannelPromise即将作为参数传递给ChannelOutboundHandler):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdatapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) {
                if(!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        })
    }
}

谈谈我自己的理解

wirte继承方法中的promise和write中返回的ChannelFuture

A(ChannelOutboundHandler) -> B(ChannelInboundHandler)

如图所示的pipeline,如果我在B的中调用write,得到一个ChannelFuture,我认为这个ChannelFuture和我在A的write方法中得到的ChannelPromise是同一个对象,我的验证代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18

ch.pipeline()
        .addLast(new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("Running 1");
                System.out.println(promise);
            }
        })
        .addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("Running 2");
                ChannelFuture writeChannelFuture = ctx.write(msg);
                System.out.println(writeChannelFuture);
            }
        });

输出的结果如下,和我假设的一致:


Running 2
Running 1
DefaultChannelPromise@7c903a86(incomplete)
DefaultChannelPromise@7c903a86(incomplete)

什么时候用ChannelFuture什么时候用ChannelPromise

从目前学习到的知识总结出这样的结论,如果在ChannelInboundHandler之类的Handler中直接写数据,此时要回调则使用ChannelFuture,如果在ChannelOutboundHandler之类的Handler中实现write之类的方法,则使用ChannelPromise。