这是Pipeline的第一个ChannelHandler,有一定的研究价值。ChannelHandler继承了ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter中并没有什么有价值的东东,它存在的唯一目的是可以避免让你实现那些你不关注的东西,且它的实现并不会被Pipeline调用到。
ChannelInitializer实现了exceptionCaught、handlerAdded、channelRegistered方法,其中exceptionCaught比较简单,没有什么研究价值。
channelRegistered在channel注册到NioEventLoop时调用,额,有点难受,那此时调用这个方法的究竟是哪个线程呢,是bossLoop还是workerLoop呢?我认为是workerLoop。
handlerAdded在handler被添加到channel的pipeline时调用。我现在真的不知道channelRegistered和handlerAdded谁会先被调用。网上找了一篇博文说是handlerAdded先于channelRegistered调用。
我简单的研究了一下handlerAdded属于ChannelHandler的方法,ChannelHandler仅有如下的方法,这些方法都很基础,可以想象这些方法应该是先于其他方法的调用。
1
2
3
4
5
6
7
8
|
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
|
为了进一步验证,我进行了如下实验:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
BootstrapUtils.runServer(new ChannelInboundHandler() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered: " + Thread.currentThread().getName());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered: " + Thread.currentThread().getName());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive: " + Thread.currentThread().getName());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive: " + Thread.currentThread().getName());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead: " + Thread.currentThread().getName());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete: " + Thread.currentThread().getName());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("userEventTriggered: " + Thread.currentThread().getName());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelWritabilityChanged: " + Thread.currentThread().getName());
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded: " + Thread.currentThread().getName());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved: " + Thread.currentThread().getName());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught: " + Thread.currentThread().getName());
}
});
|
其输出如下:
handlerAdded: nioEventLoopGroup-3-1
channelRegistered: nioEventLoopGroup-3-1
channelActive: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1
channelRegistered和handlerAdded方法实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
// We are done with init the Channel, removing all the state for the Channel now.
removeState(ctx);
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
|
其中用到的辅助方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}
private void removeState(final ChannelHandlerContext ctx) {
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
if (ctx.isRemoved()) {
initMap.remove(ctx);
} else {
// The context is not removed yet which is most likely the case because a custom EventExecutor is used.
// Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
ctx.executor().execute(new Runnable() {
@Override
public void run() {
initMap.remove(ctx);
}
});
}
}
|
在断点调试中发现其实只有handlerAdded运行了。因为运行handlerAdded的线程已经为workerLoop中的线程,所以此时ctx.channel().isRegistered()
的值应该为true。
小结(错误)
(这个小结是我在研究了很多东西后总结的)
ChannelInitializer上是标注了@Sharable的,也就是说这个类某种程度上是一个单例,理论上我们的实现也应该是支持@Sharable的。
为什么建议我们的实现也是@Sharable呢,因为在ChannelInitializer中有一个Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap<ChannelHandlerContext, Boolean>())
,我对initMap的理解是:当每个channel的某个ChannelInitializer首次被调用时,会经其ctx放入到initMap中,如果该channel再次调用该ChannelInitializer,因为initMap.add(ctx)
返回的值为false,就可以防止多次运行同一个ChannelInitializer,也就防止了多次用同一个ChannelInitializer对Pipeline进行了支持。
如果我们的ChannelInitializer不是@Sharable,也就以为这我们会使用addLast(new xxxChannelInitializer)的方式添加ChannelInitializer,这样很有可能同一个ChannelInitializer被多次添加,然后引发了一些不不要的问题。测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
private static class InnerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("Running");
}
}
public static void main(String[] args) {
BootstrapUtils.runServer(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InnerChannelInitializer());
ch.pipeline().addLast(new InnerChannelInitializer());
ch.pipeline().addLast(new InnerChannelInitializer());
}
});
}
|
小结2
上面的理解是错误的,我用如下代码发现Running运行了三次:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
public class Server2 {
@ChannelHandler.Sharable
private static class InnerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("Running");
}
}
public static void main(String[] args) {
InnerChannelInitializer innerChannelInitializer = new InnerChannelInitializer();
BootstrapUtils.runServer(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(innerChannelInitializer);
ch.pipeline().addLast(innerChannelInitializer);
ch.pipeline().addLast(innerChannelInitializer);
}
});
}
}
|
我忽视了ChannelInitializer在初始化Pipeline后会进行移除,我现在是完全不知道initMap有什么存在的价值了。
我为什么认为initMap没有价值呢,因为我的实现思路是这样的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if(ctx.channel().isRegistered()) {
initChannel((C) ctx.channel());
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
}
|
可能理解initMap还是有点难度,因为我了解的应用场景有限,所以暂时不花费精力用来理解这些知识了。