WebSocketHandler.java 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package com.zhentao.information.handler;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zhentao.groups.MongoDB.pojo.GroupMessage;
  4. import com.zhentao.groups.MongoDB.pojo.Message;
  5. import com.zhentao.information.entity.ChatMessage;
  6. import com.zhentao.information.repository.ChatMessageRepository;
  7. import com.zhentao.information.service.WebSocketService;
  8. import com.zhentao.tool.TokenUtils;
  9. import io.netty.channel.ChannelHandler;
  10. import io.netty.channel.ChannelHandlerContext;
  11. import io.netty.channel.SimpleChannelInboundHandler;
  12. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.stereotype.Component;
  15. import javax.annotation.Resource;
  16. import java.util.Date;
  17. /**
  18. * WebSocket消息处理器
  19. * 处理WebSocket连接、消息接收和发送
  20. */
  21. @Slf4j
  22. @Component
  23. @ChannelHandler.Sharable
  24. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  25. @Resource
  26. private ChatMessageRepository chatMessageRepository;
  27. @Resource
  28. private WebSocketService webSocketService;
  29. /**
  30. * 处理接收到的WebSocket消息
  31. */
  32. @Override
  33. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  34. String text = msg.text();
  35. log.info("收到消息:{}", text);
  36. try {
  37. Message message = JSON.parseObject(text, Message.class);
  38. log.info("接收到的消息:{}", message);
  39. // 如果是连接消息,处理token
  40. if ("connect".equals(message.getType())) {
  41. String userId = webSocketService.handleUserLogin(message.getContent(), ctx);
  42. log.info("用户 {} 登录成功", userId);
  43. if (userId != null) {
  44. // 用户登录成功后,自动加入所有群聊
  45. webSocketService.joinAllGroups(userId);
  46. // 发送连接成功消息
  47. Message response = new Message();
  48. response.setType("connect_success");
  49. response.setContent("连接成功");
  50. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(response)));
  51. }
  52. return;
  53. }
  54. // 如果是心跳消息
  55. if ("ping".equals(message.getType())) {
  56. Message pongMessage = new Message();
  57. pongMessage.setType("pong");
  58. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(pongMessage)));
  59. return;
  60. }
  61. // 如果是群聊消息
  62. if (message.getGroupId() != null) {
  63. handleGroupMessage(message);
  64. return;
  65. }
  66. // 处理普通消息
  67. handleMessage(message);
  68. } catch (Exception e) {
  69. log.error("处理消息失败", e);
  70. // 发送错误消息给客户端
  71. Message errorMessage = new Message();
  72. errorMessage.setType("error");
  73. errorMessage.setContent("消息处理失败");
  74. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage)));
  75. }
  76. }
  77. /**
  78. * 处理群聊消息
  79. */
  80. private void handleGroupMessage(Message message) {
  81. // 设置群聊消息类型
  82. message.setType("group_chat");
  83. // 广播消息给群内所有成员
  84. boolean sent = webSocketService.handleGroupMessage(message);
  85. if (sent) {
  86. // 发送消息确认
  87. Message ackMessage = new Message();
  88. ackMessage.setType("message_ack");
  89. ackMessage.setContent("群消息已发送");
  90. webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
  91. }
  92. }
  93. /**
  94. * 处理普通消息
  95. */
  96. private void handleMessage(Message message) {
  97. // 生成聊天ID
  98. String chatId = generateChatId(message.getFromUserId(), message.getToUserId());
  99. // 创建MongoDB消息对象
  100. ChatMessage chatMessage = new ChatMessage();
  101. chatMessage.setFromUserId(message.getFromUserId());
  102. chatMessage.setToUserId(message.getToUserId());
  103. chatMessage.setContent(message.getContent());
  104. chatMessage.setType(message.getType());
  105. chatMessage.setTimestamp(System.currentTimeMillis());
  106. chatMessage.setIsRead(false);
  107. chatMessage.setChatId(chatId);
  108. // 保存消息到MongoDB
  109. chatMessageRepository.save(chatMessage);
  110. // 发送消息给接收者
  111. boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message);
  112. if (sent) {
  113. log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent());
  114. // 发送消息确认给发送者
  115. Message ackMessage = new Message();
  116. ackMessage.setType("message_ack");
  117. ackMessage.setContent("消息已发送");
  118. webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
  119. } else {
  120. log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId());
  121. // 发送消息未送达通知给发送者
  122. Message offlineMessage = new Message();
  123. offlineMessage.setType("message_offline");
  124. offlineMessage.setContent("对方不在线,消息已保存");
  125. webSocketService.sendMessageToUser(message.getFromUserId(), offlineMessage);
  126. }
  127. }
  128. /**
  129. * 当新的WebSocket连接建立时调用
  130. */
  131. @Override
  132. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  133. log.info("新的连接:{}", ctx.channel().id().asLongText());
  134. }
  135. /**
  136. * 当WebSocket连接断开时调用
  137. */
  138. @Override
  139. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  140. log.info("连接断开:{}", ctx.channel().id().asLongText());
  141. // 清理用户连接
  142. webSocketService.removeUserConnection(ctx);
  143. }
  144. /**
  145. * 处理异常情况
  146. */
  147. @Override
  148. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  149. log.error("WebSocket异常", cause);
  150. ctx.close();
  151. }
  152. /**
  153. * 生成聊天ID
  154. */
  155. private String generateChatId(String userId1, String userId2) {
  156. // 确保两个用户之间的聊天ID唯一
  157. return userId1.compareTo(userId2) < 0 ?
  158. userId1 + "_" + userId2 :
  159. userId2 + "_" + userId1;
  160. }
  161. }