WebSocketHandler.java 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package com.zhentao.information.handler;
  2. import com.alibaba.fastjson.JSON;
  3. import com.zhentao.information.entity.Message;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import io.netty.channel.SimpleChannelInboundHandler;
  6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Map;
  10. import java.util.concurrent.ConcurrentHashMap;
  11. /**
  12. * WebSocket消息处理器
  13. */
  14. @Slf4j
  15. @Component
  16. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  17. // 用户ID和Channel的映射关系
  18. private static final Map<String, ChannelHandlerContext> USER_CHANNEL_MAP = new ConcurrentHashMap<>();
  19. /**
  20. * 获取用户Channel映射
  21. * @return 用户Channel映射
  22. */
  23. public Map<String, ChannelHandlerContext> getUserChannelMap() {
  24. return USER_CHANNEL_MAP;
  25. }
  26. @Override
  27. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
  28. String message = msg.text();
  29. Message messageObj = JSON.parseObject(message, Message.class);
  30. // 存储用户连接
  31. USER_CHANNEL_MAP.put(messageObj.getFromUserId(), ctx);
  32. // 获取接收者的Channel
  33. ChannelHandlerContext toUserCtx = USER_CHANNEL_MAP.get(messageObj.getToUserId());
  34. if (toUserCtx != null) {
  35. // 发送消息给接收者
  36. toUserCtx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(messageObj)));
  37. log.info("消息已发送给用户: {}, 内容: {}", messageObj.getToUserId(), messageObj.getContent());
  38. } else {
  39. log.info("用户 {} 不在线", messageObj.getToUserId());
  40. }
  41. }
  42. @Override
  43. public void handlerRemoved(ChannelHandlerContext ctx) {
  44. // 用户断开连接时,移除映射关系
  45. USER_CHANNEL_MAP.entrySet().removeIf(entry -> entry.getValue().equals(ctx));
  46. }
  47. }