目录
前言
一、Netty中的Reactor实现
Netty线程模型是基于Reactor模型实现的,对Reactor三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型。
1、工作流程
2、Netty 线程模型其他事项
二、Pipeline 和 Handler
ChannelPipeline
ChannelHandler
outbound事件由netty外部的代码触发,最终由netty内部消费。
入站事件传播 :入站事件向后传递,两种写法
super.channelActive(ctx);
ctx.fireChannelActive();
inbound/outbound 加载顺序和执行顺序
InboundHandler是按照Pipleline的加载顺序(addLast),顺序执行
OutboundHandler是按照Pipeline的加载顺序(addLast),逆序执行
回写数据事件流转规则
如果是通过Channel对象进行数据回写,事件会从pipeline尾部流向头部
如果是通过ChannelHandlerContext对象进行数据回写,事件会从当前handler流向头部
问题:OutboundHandler和InboundHandler的先后顺序是否有要求?才能保证所有outboundHandler能被执行
如果想让所有的OutboundHandler都能被执行到,可以选择把OutboundHandler放在最后一个有效的InboundHandler之前
有一种做法是通过addFirst加载所有OutboundHandler,再通过addLast加载所有InboundHandler;另外也推荐:使用addLast先加载所有OutboundHandler,然后加载所有InboundHandler(注意考虑加载顺序和执行顺序)
出站事件传播和outboundHandler中的数据修改
在outboundhandler中最好不要再通过Channel写数据,会导致事件再次从尾部流动到头部,造成类似递归问题 ,可以在事件向前传播出去之后通过ChannelHandlerContext写数据
ChannelHandler复用
使用@ChannelHandler.Sharable,然后在ChannelInitializer构造器外层实例化次Handler,pipeline.addLast(inboundHandler);
注解只是标注可以被复用,至于线程安全问题需要开发者自行保证。
三、hello world
1、服务端
private void start(int port) {
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求 对于boss group,我们其实也只用到了其中的一个线程,因 为服务端一般只会绑定一个端口启动
NioEventLoopGroup boss = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
//工作线程,处理注册其上Channel的I/O事件及其他Task;核心线程数默认:cpu核数*2 核心线程数在创建时可通过构造函数指定
EventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
//业务线程池
EventExecutorGroup business = new UnorderedThreadPoolEventExecutor(NettyRuntime.availableProcessors() * 2, new DefaultThreadFactory("business"));
// ServerInboundHandler1 serverInboundHandler1 = new ServerInboundHandler1();
//入站事件处理器
ServerInboundHandler2 inboundHandler2 = new ServerInboundHandler2();
// 基于netty引导整个服务端程序的启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
//当客户端 SocketChannel初始化时回调该方法,添加handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ChannelHandler 链的容器
ChannelPipeline pipeline = ch.pipeline();
/*pipeline.addLast(new ServerOutboundHandler());
pipeline.addLast(new ServerInboundHandler1());
// pipeline.addLast(new ServerInboundHandler2());
pipeline.addLast(inboundHandler2);
pipeline.addLast(new MySimpleChannelInboudHandler());*/
//pipeline.addLast(new FixedLengthFrameDecoder(1024));
// pipeline.addLast(new LineBasedFrameDecoder(65536));
/*ByteBuf buf = ch.alloc().buffer().writeBytes("$".getBytes(StandardCharsets.UTF_8));
pipeline.addLast(new DelimiterBasedFrameDecoder(65536,buf));*/
//日志
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
//空闲检测
pipeline.addLast(new ServerReadIdleHandler());
//通过FixedLengthFrameDecoder 定长解码器来解决定长消息的黏包问题;
pipeline.addLast(new LengthFieldPrepender(4));
//pipeline.addLast(new StringEncoder());
//pipeline.addLast(new ProtobufEncoder());
//编码器
pipeline.addLast(new ProtoStuffEncoder());
//通过LengthFieldBasedFrameDecoder 自定义长度解码器解决TCP黏包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
//解码器
pipeline.addLast("protostuffdecoder", new ProtoStuffDecoder());
pipeline.addLast(business, "tcptesthandler", new TcpStickHalfHandler1());
}
});
//绑定端口并启动
try {
//服务程序 同步等待 绑定端口号
ChannelFuture future = serverBootstrap.bind(port).sync();
//监听端口的关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//清理一些资源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
2、客户端
public void start(String host,int port) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientWriterIdleHandler());
pipeline.addLast(new LengthFieldPrepender(4));
//pipeline.addLast(new StringEncoder());
//pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtoStuffEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4));
//pipeline.addLast(new StringDecoder());
//pipeline.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
pipeline.addLast(new ProtoStuffDecoder());
// pipeline.addLast();//解码器
pipeline.addLast(new ClientInboundHandler());
}
});
//连接服务端
try {
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
//客户端可以向服务端写数据了
/* Channel channel = future.channel();
ByteBuf buf = channel.alloc().buffer();
buf.writeBytes("hello nettyserver,i am nettyclient".getBytes(StandardCharsets.UTF_8));
channel.writeAndFlush(buf);*/
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
3、测试结果
总结
篇幅有限,先写这么多,hello world的示例仅展示了主要方法,仅作参考。到此基本上netty的主要流程已经讲完了,欢迎小伙伴们围观,拍砖。