WebSocketHandler.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package com.zhentao.information.handler;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zhentao.groups.MongoDB.pojo.GroupMessage;
  4. import com.zhentao.groups.MongoDB.pojo.Message;
  5. import com.zhentao.information.entity.ChatMessage;
  6. import com.zhentao.information.repository.ChatMessageRepository;
  7. import com.zhentao.information.service.WebSocketService;
  8. import com.zhentao.tool.TokenUtils;
  9. import io.netty.channel.ChannelHandler;
  10. import io.netty.channel.ChannelHandlerContext;
  11. import io.netty.channel.SimpleChannelInboundHandler;
  12. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.springframework.stereotype.Component;
  15. import javax.annotation.Resource;
  16. import java.util.Date;
  17. /**
  18. * WebSocket消息处理器
  19. * 处理WebSocket连接、消息接收和发送
  20. */
  21. @Slf4j
  22. @Component
  23. @ChannelHandler.Sharable
  24. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  25. @Resource
  26. private ChatMessageRepository chatMessageRepository;
  27. @Resource
  28. private WebSocketService webSocketService;
  29. /**
  30. * 处理接收到的WebSocket消息
  31. */
  32. @Override
  33. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  34. String text = msg.text();
  35. log.info("收到消息:{}", text);
  36. try {
  37. Message message = JSON.parseObject(text, Message.class);
  38. log.info("接收到的消息:{}", message);
  39. // 如果是连接消息,处理token
  40. if ("connect".equals(message.getType())) {
  41. String userId = webSocketService.handleUserLogin(message.getContent(), ctx);
  42. log.info("用户 {} 登录成功", userId);
  43. if (userId != null) {
  44. // 用户登录成功后,自动加入所有群聊
  45. webSocketService.joinAllGroups(userId);
  46. // 发送连接成功消息
  47. Message response = new Message();
  48. response.setType("connect_success");
  49. response.setContent("连接成功");
  50. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(response)));
  51. }
  52. return;
  53. }
  54. // 如果是心跳消息
  55. if ("ping".equals(message.getType())) {
  56. Message pongMessage = new Message();
  57. pongMessage.setType("pong");
  58. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(pongMessage)));
  59. return;
  60. }
  61. // 如果是群聊消息
  62. if (message.getGroupId() != null) {
  63. handleGroupMessage(message);
  64. return;
  65. }
  66. // 处理普通消息
  67. handleMessage(message);
  68. } catch (Exception e) {
  69. log.error("处理消息失败", e);
  70. // 发送错误消息给客户端
  71. Message errorMessage = new Message();
  72. errorMessage.setType("error");
  73. errorMessage.setContent("消息处理失败");
  74. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage)));
  75. }
  76. }
  77. /**
  78. * 处理群聊消息
  79. */
  80. private void handleGroupMessage(Message message) {
  81. // 设置群聊消息类型
  82. message.setType(message.getType() != null ? message.getType() : "group_chat");
  83. // 广播消息给群内所有成员
  84. boolean sent = webSocketService.handleGroupMessage(message);
  85. if (sent) {
  86. // 发送消息确认
  87. Message ackMessage = new Message();
  88. ackMessage.setType("message_ack");
  89. ackMessage.setContent("群消息已发送");
  90. webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
  91. }
  92. }
  93. /**
  94. * 处理普通消息
  95. */
  96. private void handleMessage(Message message) {
  97. // 生成聊天ID
  98. String chatId = generateChatId(message.getFromUserId(), message.getToUserId());
  99. ChatMessage chatMessage = new ChatMessage();
  100. // 新增文件/图片/视频元数据保存
  101. if (message.getFileUrl() != null) {
  102. chatMessage.setFileUrl(message.getFileUrl());
  103. chatMessage.setFileName(message.getFileName());
  104. chatMessage.setFileType(message.getFileType());
  105. chatMessage.setFileSize(message.getFileSize());
  106. }
  107. // 新增:保存发送者头像
  108. chatMessage.setAvatar(message.getAvatar());
  109. // 创建MongoDB消息对象
  110. chatMessage.setFromUserId(message.getFromUserId());
  111. chatMessage.setToUserId(message.getToUserId());
  112. chatMessage.setContent(message.getContent());
  113. chatMessage.setType(message.getType());
  114. chatMessage.setTimestamp(System.currentTimeMillis());
  115. chatMessage.setIsRead(false);
  116. chatMessage.setChatId(chatId);
  117. // 保存消息到MongoDB
  118. chatMessageRepository.save(chatMessage);
  119. // 发送消息给接收者
  120. boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message);
  121. if (sent) {
  122. log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent());
  123. // 发送消息确认给发送者
  124. Message ackMessage = new Message();
  125. ackMessage.setType("message_ack");
  126. ackMessage.setContent("消息已发送");
  127. webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
  128. } else {
  129. log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId());
  130. // 发送消息未送达通知给发送者
  131. Message offlineMessage = new Message();
  132. offlineMessage.setType("message_offline");
  133. offlineMessage.setContent("对方不在线,消息已保存");
  134. webSocketService.sendMessageToUser(message.getFromUserId(), offlineMessage);
  135. }
  136. }
  137. /**
  138. * 当新的WebSocket连接建立时调用
  139. */
  140. @Override
  141. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  142. log.info("新的连接:{}", ctx.channel().id().asLongText());
  143. }
  144. /**
  145. * 当WebSocket连接断开时调用
  146. */
  147. @Override
  148. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  149. log.info("连接断开:{}", ctx.channel().id().asLongText());
  150. // 清理用户连接
  151. webSocketService.removeUserConnection(ctx);
  152. }
  153. /**
  154. * 处理异常情况
  155. */
  156. @Override
  157. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  158. log.error("WebSocket异常", cause);
  159. ctx.close();
  160. }
  161. /**
  162. * 生成聊天ID
  163. */
  164. private String generateChatId(String userId1, String userId2) {
  165. // 确保两个用户之间的聊天ID唯一
  166. return userId1.compareTo(userId2) < 0 ?
  167. userId1 + "_" + userId2 :
  168. userId2 + "_" + userId1;
  169. }
  170. }