diff --git a/README.md b/README.md index 9e3c806d143e5c8b39c0cf8c274c7205bd290089..466b3694f9861fa9e2deaa33af3d9feadd4cfb9c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,9 @@ WebSocket #### 自定义简单的协议 1.建立链接:{"code":"0","con":"{'userId':'V7SnFNmm'}"} + 2.心跳发送:{"code":"1"} + 3.断开链接:{"end":"FMIstop"} #### 2021.12.13 更新 @@ -25,3 +27,6 @@ WebSocket #### 2021.12.20 更新 更新内容: 为SocketChannel保存属性userId信息 + +#### 2021.12.27 更新 +更新内容: 完成动态添加Handler和完成WebSocket握手功能 diff --git a/src/main/java/com/github/hollykunge/simu/config/Constant.java b/src/main/java/com/github/hollykunge/simu/config/Constant.java index c883442ac8401b39e7857e51250f38e5fabc7c5e..fd6c65531434c7bbc5d75a9bf338bc21fbe8f2bf 100644 --- a/src/main/java/com/github/hollykunge/simu/config/Constant.java +++ b/src/main/java/com/github/hollykunge/simu/config/Constant.java @@ -21,4 +21,12 @@ public class Constant { * 用户信息主键 */ public static final String USER_KEY = "userId"; + /** + * 打印处理 + */ + public static final String PRINT_DESCRIBE = "Does not conform to the existing communication protocol, only to print processing!\n"; + /** + * simulation心跳内容 + */ + public static final String HEART_BEAT_STRING = "心跳内容"; } diff --git a/src/main/java/com/github/hollykunge/simu/handler/event/ConnectionHandler.java b/src/main/java/com/github/hollykunge/simu/handler/event/ConnectionHandler.java index 13288c076f5433dbc360bd4d6adfae647081fe02..e973d52ce1cf48b1e8f3b95ab7abb5bd890783af 100644 --- a/src/main/java/com/github/hollykunge/simu/handler/event/ConnectionHandler.java +++ b/src/main/java/com/github/hollykunge/simu/handler/event/ConnectionHandler.java @@ -4,9 +4,10 @@ import com.github.hollykunge.simu.utils.ParamUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.util.Attribute; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -14,6 +15,7 @@ import org.springframework.stereotype.Component; import java.util.HashMap; import static com.github.hollykunge.simu.config.Constant.USER_KEY; +import static com.github.hollykunge.simu.utils.ConnUtils.handShaker; import static com.github.hollykunge.simu.websocket.NettyConfig.addUser; import static com.github.hollykunge.simu.websocket.NettyConfig.addUserChannel; @@ -31,23 +33,41 @@ public class ConnectionHandler extends ChannelInboundHandlerAdapter { if (null == msg) { return; } - if (msg instanceof DefaultHttpRequest) { - final DefaultHttpRequest request = (DefaultHttpRequest) msg; - getUserChannelKey(ctx, request.uri()); - } else if (msg instanceof FullHttpRequest) { + if (msg instanceof FullHttpRequest) { final FullHttpRequest request = (FullHttpRequest) msg; - getUserChannelKey(ctx, request.uri()); + getUserChannelKey(ctx, request.uri(), request); } super.channelRead(ctx, msg); } - private void getUserChannelKey(ChannelHandlerContext ctx, String uri) { + /** + * 带请求参数的http请求获取管道主键 + * + * @param ctx 管道上下文 + * @param uri 请求地址 + * @param request 请求对象 + */ + private void getUserChannelKey(ChannelHandlerContext ctx, String uri, HttpRequest request) { final HashMap urlParams = ParamUtils.getUrlParams(uri); - final Channel channel = ctx.channel(); - channel.attr(AttributeKey.valueOf(USER_KEY)).setIfAbsent(urlParams.get(USER_KEY)); - addUser(urlParams.get(USER_KEY), channel); - addUserChannel(urlParams.get(USER_KEY), channel); - System.out.println(urlParams); + if (urlParams.get(USER_KEY) != null) { + //请求信息与Channel绑定 + final Channel channel = ctx.channel(); + final Attribute attr = channel.attr(AttributeKey.valueOf(USER_KEY)); + attr.setIfAbsent(urlParams.get(USER_KEY)); + addUser(urlParams.get(USER_KEY), channel); + addUserChannel(urlParams.get(USER_KEY), channel); + System.out.println(urlParams); + //完成WebSocket握手 + handShaker(ctx, uri, request); + } else { + //解析不到url中的参数则证明是Model or MMS,使用默认的WebSocketHandler + ctx.pipeline().addAfter( + "ConnectionHandler", + "WebSocketServerProtocolHandler", + new WebSocketServerProtocolHandler("/")); + } } + + } diff --git a/src/main/java/com/github/hollykunge/simu/handler/event/HeartBeatHandler.java b/src/main/java/com/github/hollykunge/simu/handler/event/HeartBeatHandler.java index c535e6debaff858679fbcfe291fab95baa83658d..4789dfc0f9e1a0065de381ab3815107f4dc44cfd 100644 --- a/src/main/java/com/github/hollykunge/simu/handler/event/HeartBeatHandler.java +++ b/src/main/java/com/github/hollykunge/simu/handler/event/HeartBeatHandler.java @@ -17,7 +17,7 @@ import org.springframework.stereotype.Component; public class HeartBeatHandler extends ChannelInboundHandlerAdapter { /** - * 连接检测 + * 连接心跳检测 * * @param ctx 上下文对象 * @param evt 事件实体 @@ -31,11 +31,11 @@ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { if (evt instanceof IdleStateEvent) { final IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.ALL_IDLE) { - log.info("读写空闲..."); + log.info("Read and write free..."); } else if (IdleState.READER_IDLE == state) { - log.info("读空闲..."); + log.info("Read free..."); } else if (IdleState.WRITER_IDLE == state) { - log.info("写空闲..."); + log.info("Write free..."); } } super.userEventTriggered(ctx, evt); diff --git a/src/main/java/com/github/hollykunge/simu/handler/event/WebSocketHandler.java b/src/main/java/com/github/hollykunge/simu/handler/event/WebSocketHandler.java index 41f3360f9a79929cb9bcbfd922d50785bfdd87ee..c0f5b964c41f24588d434eb764f3837db52fb302 100644 --- a/src/main/java/com/github/hollykunge/simu/handler/event/WebSocketHandler.java +++ b/src/main/java/com/github/hollykunge/simu/handler/event/WebSocketHandler.java @@ -5,7 +5,6 @@ import com.github.hollykunge.simu.handler.msg.IMsgHandlerFactory; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -43,26 +42,20 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { } /** - * 消息对象向下转型 + * 没有任何WebSocket连接到Netty * - * @param msg msg - * @param - * @return WebSocketFrame + * @param ctx 连接上下文 */ - @SuppressWarnings("all") - private static T cast(Object msg) { - if (null == msg) { - return null; - } else { - return (T) msg; - } - } - @Override public void channelUnregistered(ChannelHandlerContext ctx) { log.info("Netty Server leisure ..."); } + /** + * 连接被客户端关闭 + * + * @param ctx 连接上下文 + */ @Override public void channelInactive(ChannelHandlerContext ctx) { try { @@ -75,6 +68,12 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { } } + /** + * 发生异常及时释放连接 + * + * @param ctx 通道上下文 + * @param cause 异常内容 + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { try { diff --git a/src/main/java/com/github/hollykunge/simu/handler/msg/MsgTextHandler.java b/src/main/java/com/github/hollykunge/simu/handler/msg/MsgTextHandler.java index 70df632b7ab73890d543f335b393c4526eb01aed..4d6fd282e2dc2a3ac5d4ac8d87026e3843d3372c 100644 --- a/src/main/java/com/github/hollykunge/simu/handler/msg/MsgTextHandler.java +++ b/src/main/java/com/github/hollykunge/simu/handler/msg/MsgTextHandler.java @@ -8,8 +8,7 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; -import static com.github.hollykunge.simu.config.Constant.CREATE_CONN; -import static com.github.hollykunge.simu.config.Constant.HEART_BEAT; +import static com.github.hollykunge.simu.config.Constant.*; import static com.github.hollykunge.simu.websocket.NettyConfig.*; import static com.github.hollykunge.simu.websocket.NettyConfig.getUserChannelSize; @@ -29,6 +28,10 @@ public class MsgTextHandler implements IMsgHandler { TextWebSocketFrame textFrame = (TextWebSocketFrame) msg; String message = textFrame.text(); + if (HEART_BEAT_STRING.equals(message)) { + log.info(HEART_BEAT_STRING +" from : Simulation"); + return; + } JSONObject parse = (JSONObject) JSONObject.parse(message); log.info("receiveMsg:" + parse); String con = parse.getString("con"); @@ -45,7 +48,7 @@ public class MsgTextHandler implements IMsgHandler { } else if (HEART_BEAT.equals(code)) { //自定义心跳,将客户端心跳直接返回 //ctx.writeAndFlush(HEART_BEAT_MSG); 客户端没有正确做出处理暂时注释 - log.info("Received a heartbeat"); + log.info(HEART_BEAT_STRING +"from : Model or MMS"); log.warn("channelGroup:" + getChannelSize() + ";" + "channelMap:" + getUserChannelSize()); } } diff --git a/src/main/java/com/github/hollykunge/simu/service/impl/CustomerServiceImpl.java b/src/main/java/com/github/hollykunge/simu/service/impl/CustomerServiceImpl.java index c056c6c8727dd3214a832a1e9310501979bea30e..384df07722849b2aed82c7d5d35a34d6d6e0a23d 100644 --- a/src/main/java/com/github/hollykunge/simu/service/impl/CustomerServiceImpl.java +++ b/src/main/java/com/github/hollykunge/simu/service/impl/CustomerServiceImpl.java @@ -15,6 +15,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import static com.github.hollykunge.simu.config.Constant.END_STATUS; +import static com.github.hollykunge.simu.config.Constant.PRINT_DESCRIBE; import static com.github.hollykunge.simu.websocket.NettyConfig.getUserChannel; /** @@ -36,22 +37,27 @@ public class CustomerServiceImpl implements CustomerService { /* * 数据解析 */ - final String payload = message.getPayload(); - final JSONObject parse = (JSONObject) JSONObject.parse(payload); - final String userId = (String) parse.get("userId"); - final String msg = (String) parse.get("msg"); - /* - *数据推送 - */ - final Channel userChannel = getUserChannel(userId); - if (null != userChannel) { - userChannel.writeAndFlush(new TextWebSocketFrame(msg)); - if (msg.equals(END_STATUS)) { - log.info("Message sending End... " + msg); - connUtils.removeConnection(userId); + try { + final String payload = message.getPayload(); + final JSONObject parse = (JSONObject) JSONObject.parse(payload); + final String userId = (String) parse.get("userId"); + final String msg = (String) parse.get("msg"); + /* + *数据推送 + */ + final Channel userChannel = getUserChannel(userId); + if (null != userChannel) { + userChannel.writeAndFlush(new TextWebSocketFrame(msg)); + if (msg.equals(END_STATUS)) { + log.info("Message sending End... " + msg); + connUtils.removeConnection(userId); + } + } else { + log.error(userId + "channel not exist!"); } - } else { - log.error(userId + "channel not exist!"); + } catch (Exception e) { + log.warn(PRINT_DESCRIBE + "content is as follows: " + message.getPayload()); } + } } diff --git a/src/main/java/com/github/hollykunge/simu/simulation/SimulationService.java b/src/main/java/com/github/hollykunge/simu/simulation/SimulationService.java new file mode 100644 index 0000000000000000000000000000000000000000..7bdbcf8b48058ecc53cb4122b000d104fe46b243 --- /dev/null +++ b/src/main/java/com/github/hollykunge/simu/simulation/SimulationService.java @@ -0,0 +1,33 @@ +package com.github.hollykunge.simu.simulation; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.messaging.Message; +import org.springframework.stereotype.Component; + +import static com.github.hollykunge.simu.config.Constant.PRINT_DESCRIBE; + +/** + * @author Mr.Liu + */ +@Slf4j +@Component +@EnableBinding(SimulationSink.class) +public class SimulationService { + + @StreamListener(SimulationSink.SIMULATION_INPUT) + public void input(Message message) { + log.info(SimulationSink.SIMULATION_INPUT + ":" + message.getPayload()); + final String payload = message.getPayload(); + try { + final JSONObject parse = (JSONObject) JSONObject.parse(payload); + log.warn(parse.getString("code")); + log.warn(parse.getString("address")); + log.warn(parse.getString("data")); + } catch (Exception e) { + log.error(PRINT_DESCRIBE + "\n" + payload); + } + } +} diff --git a/src/main/java/com/github/hollykunge/simu/simulation/SimulationSink.java b/src/main/java/com/github/hollykunge/simu/simulation/SimulationSink.java new file mode 100644 index 0000000000000000000000000000000000000000..6a44ac50f6c9a804aee7b69c86f047b09d88f23a --- /dev/null +++ b/src/main/java/com/github/hollykunge/simu/simulation/SimulationSink.java @@ -0,0 +1,21 @@ +package com.github.hollykunge.simu.simulation; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.messaging.SubscribableChannel; + +/** + * @author Mr.Liu + */ +public interface SimulationSink { + + String SIMULATION_INPUT="input2"; + + + + /** + * simulation监听 + * @return SubscribableChannel + */ + @Input(SimulationSink.SIMULATION_INPUT) + SubscribableChannel simulationInput(); +} diff --git a/src/main/java/com/github/hollykunge/simu/utils/ConnUtils.java b/src/main/java/com/github/hollykunge/simu/utils/ConnUtils.java index 045490ccc7ecf59acbb84951972e1013e2e30107..8bd255ee59c86a4a9fd45b7738afeba658892f6c 100644 --- a/src/main/java/com/github/hollykunge/simu/utils/ConnUtils.java +++ b/src/main/java/com/github/hollykunge/simu/utils/ConnUtils.java @@ -1,6 +1,11 @@ package com.github.hollykunge.simu.utils; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -31,4 +36,24 @@ public class ConnUtils { log.error(e.getMessage()); } } + + /** + * 处理 http 升级为 WebSocket,完成握手 + * + * @param ctx 管道上下文 + * @param uri 请求地址 + * @param request 请求对象 + */ + @SuppressWarnings("all") + public static void handShaker(ChannelHandlerContext ctx, String uri, HttpRequest request) { + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( + "ws://" + request.headers().get(HttpHeaders.Names.HOST) + uri, null, false); + final WebSocketServerHandshaker handShaker = wsFactory.newHandshaker(request); + + if (handShaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); + } else { + handShaker.handshake(ctx.channel(), request); + } + } } diff --git a/src/main/java/com/github/hollykunge/simu/websocket/NettyServer.java b/src/main/java/com/github/hollykunge/simu/websocket/NettyServer.java index a82d79637caf0b54dfddc7960309c34c5a224eb1..e61e83fd9bf652478ec60ea8a767d07155158246 100644 --- a/src/main/java/com/github/hollykunge/simu/websocket/NettyServer.java +++ b/src/main/java/com/github/hollykunge/simu/websocket/NettyServer.java @@ -10,7 +10,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -69,8 +68,7 @@ public class NettyServer { pipeline.addLast("IdleState", new IdleStateHandler(20, 30, 50, TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatHandler()); pipeline.addLast("aggregator", new HttpObjectAggregator(1024 * 1024 * 1024)); - pipeline.addLast(new WebSocketServerProtocolHandler("/")); - pipeline.addLast(new ConnectionHandler()); + pipeline.addLast("ConnectionHandler", new ConnectionHandler()); pipeline.addLast(webSocketHandler); } }); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index ed1b292b74e1d9989943ed1fd00a45f06e04a4b2..c45a4969ed40ceeda465869a94b87341ac4c732c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,24 +2,42 @@ server: port: 6789 spring: rabbitmq: - host: 10.11.24.136 - port: 5672 + virtual-host: / +# virtual-host: /simuMsg + host: 10.11.24.134 +# host: 10.11.24.136 username: hollykunge password: 123456 - virtual-host: /simuMsg + port: 5672 cloud: stream: binders: defaultRabbitmq: rabbitmq: - virtual-host: /simuMsg - host: 10.11.24.136 + virtual-host: / + # virtual-host: /simuMsg + host: 10.11.24.134 + # host: 10.11.24.136 port: 5672 username: hollykunge password: 123456 bindings: + # model && mms服务 input: destination: simuMsg content-type: application/json + # simulation 服务 + input2: + destination: exchange_simulation + content-type: application/json + # 如果不分组的话,队列将会是匿名的 binder: rabbit - + group: queue_simulation + rabbit: + bindings: + input2: + group: queue_simulation + consumer: + exchangeType: direct + queueNameGroupOnly: true + bindingRoutingKey: routekey_simulation diff --git a/src/main/resources/static/index_get.html b/src/main/resources/static/index_get.html new file mode 100644 index 0000000000000000000000000000000000000000..267b2084366ade91e4ce16d5ca6c4cbb35fd7b34 --- /dev/null +++ b/src/main/resources/static/index_get.html @@ -0,0 +1,49 @@ + + + + + Title + + + +
+ + +
+ +