123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- package com.zhentao.information.handler;
- import com.alibaba.fastjson.JSON;
- import com.zhentao.groups.MongoDB.pojo.GroupMessage;
- import com.zhentao.groups.MongoDB.pojo.Message;
- import com.zhentao.information.entity.ChatMessage;
- import com.zhentao.information.repository.ChatMessageRepository;
- import com.zhentao.information.service.WebSocketService;
- import com.zhentao.tool.TokenUtils;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.Date;
- /**
- * WebSocket消息处理器
- * 处理WebSocket连接、消息接收和发送
- */
- @Slf4j
- @Component
- @ChannelHandler.Sharable
- public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
- @Resource
- private ChatMessageRepository chatMessageRepository;
- @Resource
- private WebSocketService webSocketService;
- /**
- * 处理接收到的WebSocket消息
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
- String text = msg.text();
- log.info("收到消息:{}", text);
- try {
- Message message = JSON.parseObject(text, Message.class);
- log.info("接收到的消息:{}", message);
-
- // 如果是连接消息,处理token
- if ("connect".equals(message.getType())) {
- String userId = webSocketService.handleUserLogin(message.getContent(), ctx);
- log.info("用户 {} 登录成功", userId);
- if (userId != null) {
- // 用户登录成功后,自动加入所有群聊
- webSocketService.joinAllGroups(userId);
- // 发送连接成功消息
- Message response = new Message();
- response.setType("connect_success");
- response.setContent("连接成功");
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(response)));
- }
- return;
- }
- // 如果是心跳消息
- if ("ping".equals(message.getType())) {
- Message pongMessage = new Message();
- pongMessage.setType("pong");
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(pongMessage)));
- return;
- }
- // 如果是群聊消息
- if (message.getGroupId() != null) {
- handleGroupMessage(message);
- return;
- }
- // 处理普通消息
- handleMessage(message);
- } catch (Exception e) {
- log.error("处理消息失败", e);
- // 发送错误消息给客户端
- Message errorMessage = new Message();
- errorMessage.setType("error");
- errorMessage.setContent("消息处理失败");
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage)));
- }
- }
- /**
- * 处理群聊消息
- */
- private void handleGroupMessage(Message message) {
- // 设置群聊消息类型
- message.setType("group_chat");
- // 广播消息给群内所有成员
- boolean sent = webSocketService.handleGroupMessage(message);
- if (sent) {
- // 发送消息确认
- Message ackMessage = new Message();
- ackMessage.setType("message_ack");
- ackMessage.setContent("群消息已发送");
- webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
- }
- }
- /**
- * 处理普通消息
- */
- private void handleMessage(Message message) {
- // 生成聊天ID
- String chatId = generateChatId(message.getFromUserId(), message.getToUserId());
- // 创建MongoDB消息对象
- ChatMessage chatMessage = new ChatMessage();
- chatMessage.setFromUserId(message.getFromUserId());
- chatMessage.setToUserId(message.getToUserId());
- chatMessage.setContent(message.getContent());
- chatMessage.setType(message.getType());
- chatMessage.setTimestamp(System.currentTimeMillis());
- chatMessage.setIsRead(false);
- chatMessage.setChatId(chatId);
- // 保存消息到MongoDB
- chatMessageRepository.save(chatMessage);
- // 发送消息给接收者
- boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message);
- if (sent) {
- log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent());
- // 发送消息确认给发送者
- Message ackMessage = new Message();
- ackMessage.setType("message_ack");
- ackMessage.setContent("消息已发送");
- webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
- } else {
- log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId());
- // 发送消息未送达通知给发送者
- Message offlineMessage = new Message();
- offlineMessage.setType("message_offline");
- offlineMessage.setContent("对方不在线,消息已保存");
- webSocketService.sendMessageToUser(message.getFromUserId(), offlineMessage);
- }
- }
- /**
- * 当新的WebSocket连接建立时调用
- */
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- log.info("新的连接:{}", ctx.channel().id().asLongText());
- }
- /**
- * 当WebSocket连接断开时调用
- */
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- log.info("连接断开:{}", ctx.channel().id().asLongText());
- // 清理用户连接
- webSocketService.removeUserConnection(ctx);
- }
- /**
- * 处理异常情况
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- log.error("WebSocket异常", cause);
- ctx.close();
- }
- /**
- * 生成聊天ID
- */
- private String generateChatId(String userId1, String userId2) {
- // 确保两个用户之间的聊天ID唯一
- return userId1.compareTo(userId2) < 0 ?
- userId1 + "_" + userId2 :
- userId2 + "_" + userId1;
- }
- }
|