|
@@ -1,57 +1,119 @@
|
|
|
package com.zhentao.information.handler;
|
|
|
|
|
|
-
|
|
|
-import com.alibaba.fastjson2.JSON;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.zhentao.information.entity.ChatMessage;
|
|
|
import com.zhentao.information.entity.Message;
|
|
|
+import com.zhentao.information.repository.ChatMessageRepository;
|
|
|
+import com.zhentao.information.service.WebSocketService;
|
|
|
+import io.netty.channel.ChannelHandler;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.SimpleChannelInboundHandler;
|
|
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.Map;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
|
/**
|
|
|
* WebSocket消息处理器
|
|
|
+ * 处理WebSocket连接、消息接收和发送
|
|
|
*/
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
+@ChannelHandler.Sharable
|
|
|
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
|
|
|
|
|
- // 用户ID和Channel的映射关系
|
|
|
- private static final Map<String, ChannelHandlerContext> USER_CHANNEL_MAP = new ConcurrentHashMap<>();
|
|
|
+ @Resource
|
|
|
+ private ChatMessageRepository chatMessageRepository;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private WebSocketService webSocketService;
|
|
|
|
|
|
/**
|
|
|
- * 获取用户Channel映射
|
|
|
- * @return 用户Channel映射
|
|
|
+ * 处理接收到的WebSocket消息
|
|
|
*/
|
|
|
- public Map<String, ChannelHandlerContext> getUserChannelMap() {
|
|
|
- return USER_CHANNEL_MAP;
|
|
|
+ @Override
|
|
|
+ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
|
|
|
+ String text = msg.text();
|
|
|
+ log.info("收到消息:{}", text);
|
|
|
+ try {
|
|
|
+ Message message = JSON.parseObject(text, Message.class);
|
|
|
+
|
|
|
+ // 如果是连接消息,处理token
|
|
|
+ if ("connect".equals(message.getType())) {
|
|
|
+ webSocketService.handleUserLogin(message.getContent(), ctx);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 处理普通消息
|
|
|
+ handleMessage(message);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理消息失败", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
|
|
|
- String message = msg.text();
|
|
|
- Message messageObj = JSON.parseObject(message, Message.class);
|
|
|
-
|
|
|
- // 存储用户连接
|
|
|
- USER_CHANNEL_MAP.put(messageObj.getFromUserId(), ctx);
|
|
|
-
|
|
|
- // 获取接收者的Channel
|
|
|
- ChannelHandlerContext toUserCtx = USER_CHANNEL_MAP.get(messageObj.getToUserId());
|
|
|
- if (toUserCtx != null) {
|
|
|
- // 发送消息给接收者
|
|
|
- toUserCtx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(messageObj)));
|
|
|
- log.info("消息已发送给用户: {}, 内容: {}", messageObj.getToUserId(), messageObj.getContent());
|
|
|
+ /**
|
|
|
+ * 处理普通消息
|
|
|
+ */
|
|
|
+ private void handleMessage(Message message) {
|
|
|
+ // 生成聊天ID
|
|
|
+ String chatId = generateChatId(message.getFromUserId(), message.getToUserId());
|
|
|
+
|
|
|
+ // 创建MongoDB消息对象
|
|
|
+ ChatMessage chatMessage = new ChatMessage();
|
|
|
+ chatMessage.setFromUserId(message.getFromUserId());
|
|
|
+ chatMessage.setToUserId(message.getToUserId());
|
|
|
+ chatMessage.setContent(message.getContent());
|
|
|
+ chatMessage.setType(String.valueOf(message.getType()));
|
|
|
+ chatMessage.setTimestamp(System.currentTimeMillis());
|
|
|
+ chatMessage.setIsRead(false);
|
|
|
+ chatMessage.setChatId(chatId);
|
|
|
+
|
|
|
+ // 保存消息到MongoDB
|
|
|
+ chatMessageRepository.save(chatMessage);
|
|
|
+
|
|
|
+ // 发送消息给接收者
|
|
|
+ boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message);
|
|
|
+ System.err.println("判断对方用户是否在线"+sent);
|
|
|
+ if (sent) {
|
|
|
+ log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent());
|
|
|
} else {
|
|
|
- log.info("用户 {} 不在线", messageObj.getToUserId());
|
|
|
+ log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当新的WebSocket连接建立时调用
|
|
|
+ */
|
|
|
@Override
|
|
|
- public void handlerRemoved(ChannelHandlerContext ctx) {
|
|
|
- // 用户断开连接时,移除映射关系
|
|
|
- USER_CHANNEL_MAP.entrySet().removeIf(entry -> entry.getValue().equals(ctx));
|
|
|
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ log.info("新的连接:{}", ctx.channel().id().asLongText());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 当WebSocket连接断开时调用
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ log.info("连接断开:{}", ctx.channel().id().asLongText());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理异常情况
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
|
|
+ log.error("WebSocket异常", cause);
|
|
|
+ ctx.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成聊天ID
|
|
|
+ */
|
|
|
+ private String generateChatId(String userId1, String userId2) {
|
|
|
+ return userId1.compareTo(userId2) < 0 ?
|
|
|
+ userId1 + "_" + userId2 :
|
|
|
+ userId2 + "_" + userId1;
|
|
|
}
|
|
|
}
|