|
@@ -0,0 +1,169 @@
|
|
|
+package com.slibra;
|
|
|
+
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+
|
|
|
+import javax.websocket.*;
|
|
|
+import javax.websocket.server.PathParam;
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+
|
|
|
+/**
|
|
|
+ * WebSocket的操作类
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+/**
|
|
|
+ * html页面与之关联的接口
|
|
|
+ * var reqUrl = "http://localhost:8080/websocket/" + cid;
|
|
|
+ * socket = new WebSocket(reqUrl.replace("http", "ws"));
|
|
|
+ */
|
|
|
+@ServerEndpoint("/websocket/{sid}")
|
|
|
+public class WebSocketServer {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 静态变量,用来记录当前在线连接数,线程安全的类。
|
|
|
+ */
|
|
|
+ private static AtomicInteger onlineSessionClientCount = new AtomicInteger(0);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 存放所有在线的客户端
|
|
|
+ */
|
|
|
+ private static Map<String, Session> onlineSessionClientMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接sid和连接会话
|
|
|
+ */
|
|
|
+ private String sid;
|
|
|
+ private Session session;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接建立成功调用的方法。由前端<code>new WebSocket</code>触发
|
|
|
+ *
|
|
|
+ * @param sid 每次页面建立连接时传入到服务端的id,比如用户id等。可以自定义。
|
|
|
+ * @param session 与某个客户端的连接会话,需要通过它来给客户端发送消息
|
|
|
+ */
|
|
|
+ @OnOpen
|
|
|
+ public void onOpen(@PathParam("sid") String sid, Session session) {
|
|
|
+ /**
|
|
|
+ * session.getId():当前session会话会自动生成一个id,从0开始累加的。
|
|
|
+ */
|
|
|
+ log.info("连接建立中 ==> session_id = {}, sid = {}", session.getId(), sid);
|
|
|
+ //加入 Map中。将页面的sid和session绑定或者session.getId()与session
|
|
|
+ //onlineSessionIdClientMap.put(session.getId(), session);
|
|
|
+ onlineSessionClientMap.put(sid, session);
|
|
|
+
|
|
|
+ //在线数加1
|
|
|
+ onlineSessionClientCount.incrementAndGet();
|
|
|
+ this.sid = sid;
|
|
|
+ this.session = session;
|
|
|
+ sendToOne(sid, "连接成功");
|
|
|
+ log.info("连接建立成功,当前在线数为:{} ==> 开始监听新连接:session_id = {}, sid = {},。", onlineSessionClientCount, session.getId(), sid);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接关闭调用的方法。由前端<code>socket.close()</code>触发
|
|
|
+ *
|
|
|
+ * @param sid
|
|
|
+ * @param session
|
|
|
+ */
|
|
|
+ @OnClose
|
|
|
+ public void onClose(@PathParam("sid") String sid, Session session) {
|
|
|
+ //onlineSessionIdClientMap.remove(session.getId());
|
|
|
+ // 从 Map中移除
|
|
|
+ onlineSessionClientMap.remove(sid);
|
|
|
+
|
|
|
+ //在线数减1
|
|
|
+ onlineSessionClientCount.decrementAndGet();
|
|
|
+ log.info("连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:session_id = {}, sid = {},。", onlineSessionClientCount, session.getId(), sid);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 收到客户端消息后调用的方法。由前端<code>socket.send</code>触发
|
|
|
+ * * 当服务端执行toSession.getAsyncRemote().sendText(xxx)后,前端的socket.onmessage得到监听。
|
|
|
+ *
|
|
|
+ * @param message
|
|
|
+ * @param session
|
|
|
+ */
|
|
|
+ @OnMessage
|
|
|
+ public void onMessage(String message, Session session) {
|
|
|
+ /**
|
|
|
+ * html界面传递来得数据格式,可以自定义.
|
|
|
+ * {"sid":"user-1","message":"hello websocket"}
|
|
|
+ */
|
|
|
+ JSONObject jsonObject = JSON.parseObject(message);
|
|
|
+ String toSid = jsonObject.getString("sid");
|
|
|
+ String msg = jsonObject.getString("message");
|
|
|
+ log.info("服务端收到客户端消息 ==> fromSid = {}, toSid = {}, message = {}", sid, toSid, message);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 模拟约定:如果未指定sid信息,则群发,否则就单独发送
|
|
|
+ */
|
|
|
+ if (toSid == null || toSid == "" || "".equalsIgnoreCase(toSid)) {
|
|
|
+ sendToAll(msg);
|
|
|
+ } else {
|
|
|
+ sendToOne(toSid, msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发生错误调用的方法
|
|
|
+ *
|
|
|
+ * @param session
|
|
|
+ * @param error
|
|
|
+ */
|
|
|
+ @OnError
|
|
|
+ public void onError(Session session, Throwable error) {
|
|
|
+ log.error("WebSocket发生错误,错误信息为:" + error.getMessage());
|
|
|
+ error.printStackTrace();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 群发消息
|
|
|
+ *
|
|
|
+ * @param message 消息
|
|
|
+ */
|
|
|
+ private void sendToAll(String message) {
|
|
|
+ // 遍历在线map集合
|
|
|
+ onlineSessionClientMap.forEach((onlineSid, toSession) -> {
|
|
|
+ // 排除掉自己
|
|
|
+ if (!sid.equalsIgnoreCase(onlineSid)) {
|
|
|
+ log.info("服务端给客户端群发消息 ==> sid = {}, toSid = {}, message = {}", sid, onlineSid, message);
|
|
|
+ toSession.getAsyncRemote().sendText(message);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 指定发送消息
|
|
|
+ *
|
|
|
+ * @param toSid
|
|
|
+ * @param message
|
|
|
+ */
|
|
|
+ private void sendToOne(String toSid, String message) {
|
|
|
+ // 通过sid查询map中是否存在
|
|
|
+ Session toSession = onlineSessionClientMap.get(toSid);
|
|
|
+ if (toSession == null) {
|
|
|
+ log.error("服务端给客户端发送消息 ==> toSid = {} 不存在, message = {}", toSid, message);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 异步发送
|
|
|
+ log.info("服务端给客户端发送消息 ==> toSid = {}, message = {}", toSid, message);
|
|
|
+ toSession.getAsyncRemote().sendText(message);
|
|
|
+ /*
|
|
|
+ // 同步发送
|
|
|
+ try {
|
|
|
+ toSession.getBasicRemote().sendText(message);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("发送消息失败,WebSocket IO异常");
|
|
|
+ e.printStackTrace();
|
|
|
+ }*/
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|