# netty **Repository Path**: congjinlai/netty ## Basic Information - **Project Name**: netty - **Description**: netty-websocket - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-07-30 - **Last Updated**: 2020-12-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # netty #### 介绍 springboot集成netty-websocket #### 1.添加依赖 ``` io.netty netty-all 4.1.2.Final ``` #### 2.创建启动类 线程交予线程池处理 ```java @Slf4j @Service public class NettyWebSocketServer implements Runnable { private EventLoopGroup bossGroup = new NioEventLoopGroup(); private EventLoopGroup workerGroup = new NioEventLoopGroup(); private ServerBootstrap serverBootstrap = new ServerBootstrap(); private static final int RECREATE_ALLOCATOR_SIZE = 592048; private static final Integer PORT = 1235; private ChannelFuture serverChannelFuture; private final NettyWebsocketChildHandlerInitializer childChannelHandler; NettyWebSocketServer(NettyWebsocketChildHandlerInitializer childChannelHandler) { this.childChannelHandler = childChannelHandler; } @Override public void run() { build(); } /** * 启动NettyWebSocket */ private void build() { Long beginTime = System.currentTimeMillis(); serverBootstrap.group(bossGroup, workerGroup) // 指定是Nio通信服务 .channel(NioServerSocketChannel.class) // TCP参数配置 握手字符串长度设置 .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP NO_DELAY 算法 尽量发送大文件包 .option(ChannelOption.TCP_NODELAY, true) // 开启心跳模式 .childOption(ChannelOption.SO_KEEPALIVE, true) // 配置固定长度接收缓存内存分配 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(RECREATE_ALLOCATOR_SIZE)) .childHandler(childChannelHandler); try { serverChannelFuture = serverBootstrap.bind(PORT).sync(); Long endTime = System.currentTimeMillis(); log.info("服务器启动完成,耗时:[{}]毫秒,已经在端口:[{}]进行阻塞等待", endTime - beginTime, PORT); } catch (InterruptedException e) { log.error(e.getMessage()); // 优雅关闭连接 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); e.printStackTrace(); } } @PreDestroy public void close() { try { // 监听服务器关闭监听 if (serverChannelFuture != null) { serverChannelFuture.channel().closeFuture().sync(); } } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public ChannelHandler getChildChannelHandler() { return childChannelHandler; } } ``` #### 3.创建NettyWebSocketServerHandler核心处理类 ```java @Slf4j @Component @ChannelHandler.Sharable public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler { /** * 在线通道集合 目前没用 常量类里面的onlineUserMap */ public static Set sets = new HashSet<>(); @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // 传统的HTTP接入 if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } // WebSocket接入 else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { if (frame instanceof CloseWebSocketFrame) { WebSocketServerHandshaker handsShaker = Constant.webSocketServerHandshakesMap.get(ctx.channel().id().asLongText()); if (null == handsShaker) { sendErrorMessage(ctx, "该用户已经离线或者不存在该连接"); } else { handsShaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain()); } return; } // ping请求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { sendErrorMessage(ctx, "不支持二进制文件"); } Channel channel = ctx.channel(); channel.writeAndFlush(new TextWebSocketFrame("aaa")); } /** * Http协议和转换 * * @param context 处理上下文 * @param request 消息请求 */ private void handleHttpRequest(ChannelHandlerContext context, FullHttpRequest request) { if (!request.decoderResult().isSuccess()) { sendHttpResponse(context, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // 协议升级 WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws:/" + context.channel() + "/websocket", null, false); WebSocketServerHandshaker handsShaker = factory.newHandshaker(request); // 存储握手信息 Constant.webSocketServerHandshakesMap.put(context.channel().id().asLongText(), handsShaker); if (handsShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel()); } else { // 表示握手成功 handsShaker.handshake(context.channel(), request); log.info("Http-websocket握手协议升级成功啦"); } } /** * 消息处理失败 发送一个失败请求 应答客户端 * * @param context 处理上下文 * @param request 请求 * @param defaultFullHttpResponse 默认的Http响应 */ private void sendHttpResponse(ChannelHandlerContext context, FullHttpRequest request, DefaultFullHttpResponse defaultFullHttpResponse) { if (defaultFullHttpResponse.status().code() != HttpResponseStatus.OK.code()) { ByteBuf buf = Unpooled.copiedBuffer(defaultFullHttpResponse.status().toString(), CharsetUtil.UTF_8); defaultFullHttpResponse.content().writeBytes(buf); buf.release(); } // 如果长连接好存在 关闭长连接 boolean keepLive = HttpUtil.isKeepAlive(request); ChannelFuture future = context.channel().writeAndFlush(request); if (!keepLive) { future.addListener(ChannelFutureListener.CLOSE); } } /** * 客户端断开连接之后触发 * * @param ctx 处理和上下文 * @throws Exception 捕获异常 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { sets.remove(ctx); } /** * 出现移异常后触发 * * @param ctx 处理上下文 * @param cause 异常类 * @throws Exception 异常 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 出现不可抗拒因素发送错误消息给客户端 * * @param context 处理上下文 * @param message 消息文字 */ public void sendErrorMessage(ChannelHandlerContext context, String message) { context.channel().writeAndFlush(new TextWebSocketFrame(message)); } } ``` 处理ws一下几种情况: - channelActive与客户端建立连接 - channelInactive与客户端断开连接 - channelRead0客户端发送消息处理 #### 4.创建NettyWebsocketChildHandlerInitializer 该类集成成自ChannelInitializer - ChannelInitializer继承ChannelHandlerAdapter,是一个比较特殊的handler,它的使命仅仅是为了初始化Server端新接入的SocketChannel对象,往往我们也只是初始化新接入SocketChannel的pipeline,在做完初始化工作之后,该Handler就会将自己从pipeline中移除,也就是说SocketChannel的初始化工作只会做一次。 - 我们一般在初始化ServerBootStrap或者BootStrap时,创建子类并实现initChannel方法 - 详细参考NettyWebSocketServer类面的初始化ServerBootStrap的.childHandler()方法 ```java @Component public class NettyWebsocketChildHandlerInitializer extends ChannelInitializer { public NettyWebsocketChildHandlerInitializer() { super(); } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // HTTP编解码器 socketChannel.pipeline().addLast("http-codec", new HttpServerCodec()); // HTTP头和body拼接成完整请求体 socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); // 大文件传输策略 socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); socketChannel.pipeline().addLast("websocket-handler",new NettyWebSocketServerHandler()) ; } } ``` 这里要注意这四个Handler,HttpServerCodec、ChunkedWriteHandler、HttpObjectAggregator、NettyWebSocketServerHandler,其中HttpServerCodec用于对HttpObject消息进行编码和解码,但是HTTP请求和响应可以有很多消息数据,你需要处理不同的部分,可能也需要聚合这些消息数据,这是很麻烦的。为了解决这个问题,Netty提供了一个聚合器,它将消息部分合并到FullHttpRequest和FullHttpResponse,因此不需要担心接收碎片消息数据,这就是HttpObjectAggregator的作用;ChunkedWriteHandler,允许通过处理ChunkedInput来写大的数据块;而NettyWebSocketServerHandler是Netty封装好的WebSocket协议处理类,有了它可以少写很多步骤,包括握手的过程,以及url的定义(这里的/ws其实就定义了url指定的后缀)。 #### 5.创建ApplicationContext程序上下文,用于管理线程启动,销毁 ```java @Component @Slf4j public class ApplicationContext { private final NettyWebSocketServer webSocketServer; /** * newFixedThreadPool和newSingleThreadExecutor:  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。 * newCachedThreadPool和newScheduledThreadPool:  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。 */ private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-netty_chat-runner-%d").build(); private ExecutorService executor = new ThreadPoolExecutor(Constant.THEAD_SIZE, Constant.THEAD_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), namedThreadFactory); public ApplicationContext(NettyWebSocketServer webSocketServer) { this.webSocketServer = webSocketServer; } /** * 程序运行线程池启动netty服务 */ @PostConstruct public void init() { executor.submit(webSocketServer); log.info("netty线程已经开启"); } /** * 描述:Tomcat服务器关闭前需要手动关闭Netty Websocket相关资源,否则会造成内存泄漏。 * 1. 释放Netty Websocket相关连接; * 2. 关闭Netty Websocket服务器线程。 */ @PreDestroy public void close() { log.info("正在释放Netty Websocket相关连接..."); executor.shutdown(); } } ```