From 2cae3dced6aa56b31116c7bf761a1d99d93d1c9f Mon Sep 17 00:00:00 2001 From: Yang Hanlin Date: Wed, 8 Jul 2020 23:48:38 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=BC=95=E5=85=A5=20Web=20Socket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 ++ .../ccs/config/WebSocketConfig.java | 13 ++++ .../ccs/websocket/ExampleServer.java | 64 +++++++++++++++++++ .../ccs/websocket/WebSocketSessionPool.java | 62 ++++++++++++++++++ 4 files changed, 144 insertions(+) create mode 100644 src/main/java/com/cloudservice/ccs/config/WebSocketConfig.java create mode 100644 src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java create mode 100644 src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java diff --git a/pom.xml b/pom.xml index a8c8b22..5201b4e 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,11 @@ pagehelper-spring-boot-starter 1.2.13 + + org.springframework.boot + spring-boot-starter-websocket + 2.3.1.RELEASE + diff --git a/src/main/java/com/cloudservice/ccs/config/WebSocketConfig.java b/src/main/java/com/cloudservice/ccs/config/WebSocketConfig.java new file mode 100644 index 0000000..4b74417 --- /dev/null +++ b/src/main/java/com/cloudservice/ccs/config/WebSocketConfig.java @@ -0,0 +1,13 @@ +package com.cloudservice.ccs.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.server.standard.ServerEndpointExporter; + +@Configuration +public class WebSocketConfig { + @Bean + public ServerEndpointExporter serverEndpoint() { + return new ServerEndpointExporter(); + } +} diff --git a/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java b/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java new file mode 100644 index 0000000..a8386dd --- /dev/null +++ b/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java @@ -0,0 +1,64 @@ +package com.cloudservice.ccs.websocket; + +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; + +@ServerEndpoint("/example/{username}") +@Component +public class ExampleServer { + + private static WebSocketSessionPool sessionPool = new WebSocketSessionPool<>(); + + @OnOpen + public void onOpen(@PathParam("username") String username, Session session) { + sessionPool.addSession(username, session); + try { + sessionPool.broadcast("The server welcomes " + username + " into the chat room! " + + "Currently there are " + sessionPool.getOnlineCount() + " member(s)."); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @OnClose + public void onClose(@PathParam("username") String username) { + sessionPool.removeSession(username); + try { + sessionPool.broadcast(username + " exits the chat room. " + + "Currently there are " + sessionPool.getOnlineCount() + " member(s)."); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @OnMessage + public void onMessage(@PathParam("username") String username, String message) { + String receiver = null; + if (message != null && message.contains("@")) { + int receiverIndex = message.indexOf("@"); + receiver = message.substring(receiverIndex + 1); + message = message.substring(0, receiverIndex); + } + try { + if (receiver == null) { + sessionPool.broadcast("[" + username + " to all] " + message); + } else if (sessionPool.getOnlineSessions().containsKey(receiver)) { + sessionPool.sendMessage(receiver, "[" + username + " to " + receiver + "] " + message); + } else { + sessionPool.sendMessage(username, "[" + username + " to " + receiver + " (not found)] " + message); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + @OnError + public void onError(Session session, Throwable throwable) { + throwable.printStackTrace(); + } + +} diff --git a/src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java b/src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java new file mode 100644 index 0000000..18f089b --- /dev/null +++ b/src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java @@ -0,0 +1,62 @@ +package com.cloudservice.ccs.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; + +import javax.websocket.Session; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +@Data +public class WebSocketSessionPool { + + private AtomicInteger onlineCount = new AtomicInteger(); + + private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(); + + public void addSession(T client, Session session) { + boolean alreadyExists = onlineSessions.containsKey(client); + onlineSessions.put(client, session); + if (!alreadyExists) { + onlineCount.incrementAndGet(); + } + } + + public void removeSession(T client) { + if (onlineSessions.containsKey(client)) { + onlineSessions.remove(client); + onlineCount.decrementAndGet(); + } + } + + public void broadcast(String message) throws IOException { + for (Session session : onlineSessions.values()) { + sendMessage(session, message); + } + } + + public void broadcast(Object message) throws IOException { + for (Session session : onlineSessions.values()) { + sendMessage(session, new ObjectMapper().writeValueAsString(message)); + } + } + + public void sendMessage(T receiver, String message) throws IOException { + Session session = onlineSessions.get(receiver); + sendMessage(session, message); + } + + public void sendMessage(T receiver, Object message) throws IOException { + Session session = onlineSessions.get(receiver); + sendMessage(session, new ObjectMapper().writeValueAsString(message)); + } + + private void sendMessage(Session session, String message) throws IOException { + if (session != null) { + synchronized (session) { + session.getBasicRemote().sendText(message); + } + } + } +} -- Gitee From 61497fe042a32cfba8022d75c8710e02e6c79ff7 Mon Sep 17 00:00:00 2001 From: Yang Hanlin Date: Thu, 9 Jul 2020 16:36:40 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=B8=80=E4=BA=9B=20WebS?= =?UTF-8?q?ocket=20=E7=9B=B8=E5=85=B3=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocketSessionPool.java | 33 ++++++++++++++++++- .../ccs/websocket/ExampleServer.java | 6 ++++ .../ccs/websocket/NoticeServer.java | 8 +++++ 3 files changed, 46 insertions(+), 1 deletion(-) rename src/main/java/com/cloudservice/ccs/{websocket => util}/WebSocketSessionPool.java (63%) create mode 100644 src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java diff --git a/src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java b/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java similarity index 63% rename from src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java rename to src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java index 18f089b..6ef3226 100644 --- a/src/main/java/com/cloudservice/ccs/websocket/WebSocketSessionPool.java +++ b/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java @@ -1,4 +1,4 @@ -package com.cloudservice.ccs.websocket; +package com.cloudservice.ccs.util; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Data; @@ -8,6 +8,10 @@ import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +/** + * Web Socket 会话池 + * @param 用于辨别用户的字段的数据类型(通常即为 id 的数据类型) + */ @Data public class WebSocketSessionPool { @@ -15,6 +19,11 @@ public class WebSocketSessionPool { private ConcurrentHashMap onlineSessions = new ConcurrentHashMap<>(); + /** + * 向会话池中添加会话(通常为开启 socket 时使用)。 + * @param client 用于辨别用户的字段值(通常即为 id) + * @param session 会话对象 + */ public void addSession(T client, Session session) { boolean alreadyExists = onlineSessions.containsKey(client); onlineSessions.put(client, session); @@ -23,6 +32,10 @@ public class WebSocketSessionPool { } } + /** + * 从会话池中移除会话。 + * @param client 用于辨别用户的字段值(通常即为 id) + */ public void removeSession(T client) { if (onlineSessions.containsKey(client)) { onlineSessions.remove(client); @@ -30,23 +43,41 @@ public class WebSocketSessionPool { } } + /** + * 向会话池中的所有会话发送文本消息。 + * @param message 发送的文本消息 + */ public void broadcast(String message) throws IOException { for (Session session : onlineSessions.values()) { sendMessage(session, message); } } + /** + * 向会话池中的所有会话发送消息(一个对象,用 JSON 形式发送) + * @param message 发送的消息 + */ public void broadcast(Object message) throws IOException { for (Session session : onlineSessions.values()) { sendMessage(session, new ObjectMapper().writeValueAsString(message)); } } + /** + * 向特定用户发送文本消息。 + * @param receiver 消息的接收者 + * @param message 发送的文本消息 + */ public void sendMessage(T receiver, String message) throws IOException { Session session = onlineSessions.get(receiver); sendMessage(session, message); } + /** + * 向特定用户发送消息(一个对象,以 JSON 形式发送) + * @param receiver 消息的接收者 + * @param message 发送的文本消息 + */ public void sendMessage(T receiver, Object message) throws IOException { Session session = onlineSessions.get(receiver); sendMessage(session, new ObjectMapper().writeValueAsString(message)); diff --git a/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java b/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java index a8386dd..d6e0e27 100644 --- a/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java +++ b/src/main/java/com/cloudservice/ccs/websocket/ExampleServer.java @@ -1,5 +1,7 @@ package com.cloudservice.ccs.websocket; +import com.cloudservice.ccs.model.ApiResult; +import com.cloudservice.ccs.util.WebSocketSessionPool; import org.springframework.stereotype.Component; import javax.websocket.*; @@ -13,6 +15,10 @@ public class ExampleServer { private static WebSocketSessionPool sessionPool = new WebSocketSessionPool<>(); + public static WebSocketSessionPool getSessionPool() { + return sessionPool; + } + @OnOpen public void onOpen(@PathParam("username") String username, Session session) { sessionPool.addSession(username, session); diff --git a/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java b/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java new file mode 100644 index 0000000..64dbdce --- /dev/null +++ b/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java @@ -0,0 +1,8 @@ +package com.cloudservice.ccs.websocket; + +import javax.websocket.server.ServerEndpoint; + +@ServerEndpoint("/notice/") +public class NoticeServer { + +} -- Gitee From 4025c39ca9e34613823bbbf845a937eab67a3a05 Mon Sep 17 00:00:00 2001 From: Yang Hanlin Date: Thu, 9 Jul 2020 18:07:33 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=B8=8B=E5=8F=91?= =?UTF-8?q?=E9=80=9A=E7=9F=A5=E4=B8=8E=E7=B3=BB=E7=BB=9F=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E7=9A=84=20WebSocket=20endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cloudservice/ccs/biz/impl/NoticeImpl.java | 25 +++++++++----- .../ccs/biz/impl/SystemMessageImpl.java | 17 +++++++++- .../ccs/util/WebSocketSessionPool.java | 11 +++++++ .../NoticeAndSystemMessageServer.java | 33 +++++++++++++++++++ .../ccs/websocket/NoticeServer.java | 8 ----- 5 files changed, 76 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/cloudservice/ccs/websocket/NoticeAndSystemMessageServer.java delete mode 100644 src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java diff --git a/src/main/java/com/cloudservice/ccs/biz/impl/NoticeImpl.java b/src/main/java/com/cloudservice/ccs/biz/impl/NoticeImpl.java index cd108dd..984891b 100644 --- a/src/main/java/com/cloudservice/ccs/biz/impl/NoticeImpl.java +++ b/src/main/java/com/cloudservice/ccs/biz/impl/NoticeImpl.java @@ -2,18 +2,25 @@ package com.cloudservice.ccs.biz.impl; import com.cloudservice.ccs.biz.INoticeBiz; import com.cloudservice.ccs.entity.Notice; -import com.cloudservice.ccs.exception.CustomerServiceException; import com.cloudservice.ccs.exception.NoticeException; import com.cloudservice.ccs.mapper.NoticeMapper; +import com.cloudservice.ccs.model.ApiResult; import com.cloudservice.ccs.model.NoticeConnectQuery; +import com.cloudservice.ccs.websocket.NoticeAndSystemMessageServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.List; import java.util.Map; @Service public class NoticeImpl implements INoticeBiz { + + private final Logger logger = LoggerFactory.getLogger(NoticeImpl.class); + @Autowired private NoticeMapper noticeMapper; @@ -23,6 +30,14 @@ public class NoticeImpl implements INoticeBiz { if (result == 0) { throw new NoticeException("Create Notice entity error"); } + try { + NoticeAndSystemMessageServer.getSessionPool().broadcast(ApiResult.successWithData(notice)); + } catch (IOException e) { + if (logger.isWarnEnabled()) { + logger.warn("Error when broadcasting new notice: " + e.getClass().getName() + ": " + e.getMessage()); + e.printStackTrace(); + } + } } @Override @@ -63,14 +78,6 @@ public class NoticeImpl implements INoticeBiz { return noticeMapper.getAll(); } - /*@Override - public Object queryCustomerService(Integer id) { - Object object= noticeMapper.queryCustomerService(id); - if(object == null){ - throw new NoticeException("Get Notice entity error: no such entity"); - } - return object; - }*/ @Override public NoticeConnectQuery queryCustomerService(Integer id) { NoticeConnectQuery noticeConnectQuery = noticeMapper.queryCustomerService(id); diff --git a/src/main/java/com/cloudservice/ccs/biz/impl/SystemMessageImpl.java b/src/main/java/com/cloudservice/ccs/biz/impl/SystemMessageImpl.java index 7b5d2b3..50d95f7 100644 --- a/src/main/java/com/cloudservice/ccs/biz/impl/SystemMessageImpl.java +++ b/src/main/java/com/cloudservice/ccs/biz/impl/SystemMessageImpl.java @@ -2,17 +2,24 @@ package com.cloudservice.ccs.biz.impl; import com.cloudservice.ccs.biz.ISystemMessageBiz; import com.cloudservice.ccs.entity.SystemMessage; -import com.cloudservice.ccs.exception.RoleManageException; import com.cloudservice.ccs.exception.SystemMessageException; import com.cloudservice.ccs.mapper.SystemMessageMapper; +import com.cloudservice.ccs.model.ApiResult; +import com.cloudservice.ccs.websocket.NoticeAndSystemMessageServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; + +import java.io.IOException; import java.util.List; import java.util.Map; @Service public class SystemMessageImpl implements ISystemMessageBiz { + private final Logger logger = LoggerFactory.getLogger(SystemMessageImpl.class); + @Autowired SystemMessageMapper systemMessageMapper; @@ -22,6 +29,14 @@ public class SystemMessageImpl implements ISystemMessageBiz { if (result == 0) { throw new SystemMessageException("Create SystemMessage entity error"); } + try { + NoticeAndSystemMessageServer.getSessionPool().broadcast(ApiResult.successWithData(systemMessage)); + } catch (IOException e) { + if (logger.isWarnEnabled()) { + logger.warn("Error when broadcasting new system message: " + e.getClass().getName() + ": " + e.getMessage()); + e.printStackTrace(); + } + } } @Override diff --git a/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java b/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java index 6ef3226..8598281 100644 --- a/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java +++ b/src/main/java/com/cloudservice/ccs/util/WebSocketSessionPool.java @@ -43,6 +43,17 @@ public class WebSocketSessionPool { } } + /** + * 从会话池中移除会话。只有在无法使用 client 移除时才应该使用此方法。 + * @param session 要移除的会话。 + */ + public void removeSession(Session session) { + for (T client : onlineSessions.keySet(session)) { + onlineSessions.remove(client); + onlineCount.decrementAndGet(); + } + } + /** * 向会话池中的所有会话发送文本消息。 * @param message 发送的文本消息 diff --git a/src/main/java/com/cloudservice/ccs/websocket/NoticeAndSystemMessageServer.java b/src/main/java/com/cloudservice/ccs/websocket/NoticeAndSystemMessageServer.java new file mode 100644 index 0000000..7e5671d --- /dev/null +++ b/src/main/java/com/cloudservice/ccs/websocket/NoticeAndSystemMessageServer.java @@ -0,0 +1,33 @@ +package com.cloudservice.ccs.websocket; + +import com.cloudservice.ccs.util.WebSocketSessionPool; +import org.springframework.stereotype.Component; + +import javax.websocket.*; +import javax.websocket.server.ServerEndpoint; + +@ServerEndpoint("/notice-and-system-message/") +@Component +public class NoticeAndSystemMessageServer { + + private static WebSocketSessionPool sessionPool = new WebSocketSessionPool<>(); + + public static WebSocketSessionPool getSessionPool() { + return sessionPool; + } + + @OnOpen + public void onOpen(Session session) { + sessionPool.addSession(sessionPool.getOnlineCount().intValue(), session); + } + + @OnClose + public void onClose(Session session) { + sessionPool.removeSession(session); + } + + @OnError + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } +} diff --git a/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java b/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java deleted file mode 100644 index 64dbdce..0000000 --- a/src/main/java/com/cloudservice/ccs/websocket/NoticeServer.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.cloudservice.ccs.websocket; - -import javax.websocket.server.ServerEndpoint; - -@ServerEndpoint("/notice/") -public class NoticeServer { - -} -- Gitee