package com.zhentao.information.handler; import com.alibaba.fastjson.JSON; import com.zhentao.groups.MongoDB.pojo.GroupMessage; import com.zhentao.groups.MongoDB.pojo.Message; import com.zhentao.information.entity.ChatMessage; import com.zhentao.information.repository.ChatMessageRepository; import com.zhentao.information.service.WebSocketService; import com.zhentao.tool.TokenUtils; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Date; /** * WebSocket消息处理器 * 处理WebSocket连接、消息接收和发送 */ @Slf4j @Component @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler { @Resource private ChatMessageRepository chatMessageRepository; @Resource private WebSocketService webSocketService; /** * 处理接收到的WebSocket消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { String text = msg.text(); log.info("收到消息:{}", text); try { Message message = JSON.parseObject(text, Message.class); log.info("接收到的消息:{}", message); // 如果是连接消息,处理token if ("connect".equals(message.getType())) { String userId = webSocketService.handleUserLogin(message.getContent(), ctx); log.info("用户 {} 登录成功", userId); if (userId != null) { // 用户登录成功后,自动加入所有群聊 webSocketService.joinAllGroups(userId); // 发送连接成功消息 Message response = new Message(); response.setType("connect_success"); response.setContent("连接成功"); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(response))); } return; } // 如果是心跳消息 if ("ping".equals(message.getType())) { Message pongMessage = new Message(); pongMessage.setType("pong"); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(pongMessage))); return; } // 如果是群聊消息 if (message.getGroupId() != null) { handleGroupMessage(message); return; } // 处理普通消息 handleMessage(message); } catch (Exception e) { log.error("处理消息失败", e); // 发送错误消息给客户端 Message errorMessage = new Message(); errorMessage.setType("error"); errorMessage.setContent("消息处理失败"); ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage))); } } /** * 处理群聊消息 */ private void handleGroupMessage(Message message) { // 设置群聊消息类型 message.setType("group_chat"); // 广播消息给群内所有成员 boolean sent = webSocketService.handleGroupMessage(message); if (sent) { // 发送消息确认 Message ackMessage = new Message(); ackMessage.setType("message_ack"); ackMessage.setContent("群消息已发送"); webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage); } } /** * 处理普通消息 */ private void handleMessage(Message message) { // 生成聊天ID String chatId = generateChatId(message.getFromUserId(), message.getToUserId()); // 创建MongoDB消息对象 ChatMessage chatMessage = new ChatMessage(); chatMessage.setFromUserId(message.getFromUserId()); chatMessage.setToUserId(message.getToUserId()); chatMessage.setContent(message.getContent()); chatMessage.setType(message.getType()); chatMessage.setTimestamp(System.currentTimeMillis()); chatMessage.setIsRead(false); chatMessage.setChatId(chatId); // 保存消息到MongoDB chatMessageRepository.save(chatMessage); // 发送消息给接收者 boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message); if (sent) { log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent()); // 发送消息确认给发送者 Message ackMessage = new Message(); ackMessage.setType("message_ack"); ackMessage.setContent("消息已发送"); webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage); } else { log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId()); // 发送消息未送达通知给发送者 Message offlineMessage = new Message(); offlineMessage.setType("message_offline"); offlineMessage.setContent("对方不在线,消息已保存"); webSocketService.sendMessageToUser(message.getFromUserId(), offlineMessage); } } /** * 当新的WebSocket连接建立时调用 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("新的连接:{}", ctx.channel().id().asLongText()); } /** * 当WebSocket连接断开时调用 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("连接断开:{}", ctx.channel().id().asLongText()); // 清理用户连接 webSocketService.removeUserConnection(ctx); } /** * 处理异常情况 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("WebSocket异常", cause); ctx.close(); } /** * 生成聊天ID */ private String generateChatId(String userId1, String userId2) { // 确保两个用户之间的聊天ID唯一 return userId1.compareTo(userId2) < 0 ? userId1 + "_" + userId2 : userId2 + "_" + userId1; } }