研究ChannelInitializer实现

这是Pipeline的第一个ChannelHandler,有一定的研究价值。ChannelHandler继承了ChannelInboundHandlerAdapter,ChannelInboundHandlerAdapter中并没有什么有价值的东东,它存在的唯一目的是可以避免让你实现那些你不关注的东西,且它的实现并不会被Pipeline调用到。

ChannelInitializer实现了exceptionCaught、handlerAdded、channelRegistered方法,其中exceptionCaught比较简单,没有什么研究价值。

channelRegistered在channel注册到NioEventLoop时调用,额,有点难受,那此时调用这个方法的究竟是哪个线程呢,是bossLoop还是workerLoop呢?我认为是workerLoop。

handlerAdded在handler被添加到channel的pipeline时调用。我现在真的不知道channelRegistered和handlerAdded谁会先被调用。网上找了一篇博文说是handlerAdded先于channelRegistered调用。

我简单的研究了一下handlerAdded属于ChannelHandler的方法,ChannelHandler仅有如下的方法,这些方法都很基础,可以想象这些方法应该是先于其他方法的调用。

1
2
3
4
5
6
7
8

void handlerAdded(ChannelHandlerContext ctx) throws Exception;

void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

为了进一步验证,我进行了如下实验:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

BootstrapUtils.runServer(new ChannelInboundHandler() {

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered: " + Thread.currentThread().getName());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelUnregistered: " + Thread.currentThread().getName());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive: " + Thread.currentThread().getName());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive: " + Thread.currentThread().getName());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead: " + Thread.currentThread().getName());
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete: " + Thread.currentThread().getName());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("userEventTriggered: " + Thread.currentThread().getName());
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelWritabilityChanged: " + Thread.currentThread().getName());
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerAdded: " + Thread.currentThread().getName());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved: " + Thread.currentThread().getName());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exceptionCaught: " + Thread.currentThread().getName());
    }
});

其输出如下:


handlerAdded: nioEventLoopGroup-3-1
channelRegistered: nioEventLoopGroup-3-1
channelActive: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1
channelRead: nioEventLoopGroup-3-1
channelReadComplete: nioEventLoopGroup-3-1

channelRegistered和handlerAdded方法实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    if (initChannel(ctx)) {
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();

        // We are done with init the Channel, removing all the state for the Channel now.
        removeState(ctx);
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.
        ctx.fireChannelRegistered();
    }
}


@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx);
        }
    }
}

其中用到的辅助方法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

private void removeState(final ChannelHandlerContext ctx) {
    // The removal may happen in an async fashion if the EventExecutor we use does something funky.
    if (ctx.isRemoved()) {
        initMap.remove(ctx);
    } else {
        // The context is not removed yet which is most likely the case because a custom EventExecutor is used.
        // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
        ctx.executor().execute(new Runnable() {
            @Override
            public void run() {
                initMap.remove(ctx);
            }
        });
    }
}

在断点调试中发现其实只有handlerAdded运行了。因为运行handlerAdded的线程已经为workerLoop中的线程,所以此时ctx.channel().isRegistered()的值应该为true。

小结(错误)

(这个小结是我在研究了很多东西后总结的)

ChannelInitializer上是标注了@Sharable的,也就是说这个类某种程度上是一个单例,理论上我们的实现也应该是支持@Sharable的。

为什么建议我们的实现也是@Sharable呢,因为在ChannelInitializer中有一个Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap<ChannelHandlerContext, Boolean>()),我对initMap的理解是:当每个channel的某个ChannelInitializer首次被调用时,会经其ctx放入到initMap中,如果该channel再次调用该ChannelInitializer,因为initMap.add(ctx)返回的值为false,就可以防止多次运行同一个ChannelInitializer,也就防止了多次用同一个ChannelInitializer对Pipeline进行了支持。

如果我们的ChannelInitializer不是@Sharable,也就以为这我们会使用addLast(new xxxChannelInitializer)的方式添加ChannelInitializer,这样很有可能同一个ChannelInitializer被多次添加,然后引发了一些不不要的问题。测试代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

private static class InnerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("Running");
    }
}

public static void main(String[] args) {
    BootstrapUtils.runServer(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new InnerChannelInitializer());
            ch.pipeline().addLast(new InnerChannelInitializer());
            ch.pipeline().addLast(new InnerChannelInitializer());
        }
    });
}

小结2

上面的理解是错误的,我用如下代码发现Running运行了三次:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public class Server2 {

    @ChannelHandler.Sharable
    private static class InnerChannelInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("Running");
        }
    }

    public static void main(String[] args) {

        InnerChannelInitializer innerChannelInitializer = new InnerChannelInitializer();

        BootstrapUtils.runServer(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(innerChannelInitializer);
                ch.pipeline().addLast(innerChannelInitializer);
                ch.pipeline().addLast(innerChannelInitializer);
            }
        });
    }
}

我忽视了ChannelInitializer在初始化Pipeline后会进行移除,我现在是完全不知道initMap有什么存在的价值了。

我为什么认为initMap没有价值呢,因为我的实现思路是这样的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    if(ctx.channel().isRegistered()) {

        initChannel((C) ctx.channel());

        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
    
    }
}

可能理解initMap还是有点难度,因为我了解的应用场景有限,所以暂时不花费精力用来理解这些知识了。