zhentao 3 days ago
parent
commit
78b56ebfb0

+ 3 - 2
pom.xml

@@ -14,10 +14,11 @@
         <spring-boot.version>2.6.13</spring-boot.version>
     </properties>
     <dependencies>
-        <dependency>
+        <!-- 移除Spring WebSocket依赖,使用Netty处理WebSocket -->
+        <!-- <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-websocket</artifactId>
-        </dependency>
+        </dependency> -->
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>dashscope-sdk-java</artifactId>

+ 0 - 137
src/main/java/com/zhentao/Ai/advertisement/DeepSeekWebSocketHandler.java

@@ -1,137 +0,0 @@
-package com.zhentao.Ai.advertisement;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.zhentao.Ai.dto.DeeseekRequest;
-import org.springframework.web.socket.TextMessage;
-import org.springframework.web.socket.WebSocketSession;
-import org.springframework.web.socket.handler.TextWebSocketHandler;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.SocketTimeoutException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class DeepSeekWebSocketHandler extends TextWebSocketHandler {
-
-    private static final Gson gson = new Gson();
-    private static final String DEEPSEEK_API_URL = "https://api.deepseek.com/chat/completions";
-    private static final String API_KEY = "sk-df51dab7323441998d41f18494098ddc"; // 替换为你的API密钥
-
-        // ...其他成员变量...
-
-    @Override
-    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
-        String question = message.getPayload();
-        if (question == null || question.trim().isEmpty()) {
-            sendErrorMessage(session, "问题不能为空");
-            return;
-        }
-
-        // 使用线程池管理请求(避免频繁创建线程)
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        executor.submit(() -> processStreamingResponse(session, question));
-        executor.shutdown();
-    }
-
-    private void processStreamingResponse(WebSocketSession session, String question) {
-        HttpURLConnection connection = null;
-        try {
-            URL url = new URL(DEEPSEEK_API_URL);
-            connection = (HttpURLConnection) url.openConnection();
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", "application/json");
-            connection.setRequestProperty("Authorization", "Bearer " + API_KEY);
-            connection.setRequestProperty("Accept", "text/event-stream");
-            connection.setDoOutput(true);
-            connection.setDoInput(true);
-            connection.setUseCaches(false);
-            connection.setConnectTimeout(5000); // 5秒连接超时
-            connection.setReadTimeout(30000);  // 30秒读取超时
-
-            // 发送请求体
-            try (OutputStream os = connection.getOutputStream()) {
-                os.write(buildRequestBody(question).getBytes(StandardCharsets.UTF_8));
-                os.flush();
-            }
-
-            // 处理流式响应
-            try (BufferedReader reader = new BufferedReader(
-                    new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
-
-                String line;
-                while ((line = reader.readLine()) != null && session.isOpen()) {
-                    if (line.startsWith("data: ") && !line.equals("data: [DONE]")) {
-                        String content = parseContent(line.substring(6));
-                        if (content != null) {
-                            session.sendMessage(new TextMessage(content));
-                        }
-                    }
-                }
-            }
-        } catch (SocketTimeoutException e) {
-            sendErrorMessage(session, "API响应超时,请重试");
-        } catch (IOException e) {
-            sendErrorMessage(session, "网络错误: " + e.getMessage());
-        } finally {
-            if (connection != null) connection.disconnect();
-        }
-    }
-
-        private String buildRequestBody(String question) {
-            List<DeeseekRequest.Message> messages = new ArrayList<>();
-            messages.add(DeeseekRequest.Message.builder()
-                    .role("system")
-                    .content("你是一个佳佳聊天小助手,请用中文回答")
-                    .build());
-            messages.add(DeeseekRequest.Message.builder()
-                    .role("user")
-                    .content(question)
-                    .build());
-
-            return gson.toJson(DeeseekRequest.builder()
-                    .model("deepseek-chat")
-                    .messages(messages)
-                    .stream(true)
-                    .build());
-        }
-
-        private String parseContent(String json) {
-            try {
-                JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
-                if (obj.has("choices")) {
-                    JsonObject delta = obj.getAsJsonArray("choices")
-                            .get(0).getAsJsonObject()
-                            .getAsJsonObject("delta");
-                    if (delta.has("content")) {
-                        System.err.println(delta.get("content").getAsString());
-                        return delta.get("content").getAsString();
-                    }
-                }
-                return null;
-            } catch (Exception e) {
-                return null;
-            }
-        }
-
-        private void sendErrorMessage(WebSocketSession session, String message) {
-            try {
-                if (session.isOpen()) {
-                    session.sendMessage(new TextMessage(
-                            "{\"error\": \"" + message + "\"}"
-                    ));
-                }
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }

+ 0 - 24
src/main/java/com/zhentao/Ai/config/WebSocketConfig.java

@@ -1,24 +0,0 @@
-package com.zhentao.Ai.config;
-
-import com.zhentao.Ai.advertisement.DeepSeekWebSocketHandler;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.web.socket.config.annotation.EnableWebSocket;
-import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
-import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
-import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
-import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
-
-// WebSocketConfig.java
-@Configuration
-@EnableWebSocket
-public class WebSocketConfig implements WebSocketConfigurer {
-
-    @Override
-    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
-        registry.addHandler(new DeepSeekWebSocketHandler(), "/ws")
-                .setAllowedOrigins("*")
-                .setHandshakeHandler(new DefaultHandshakeHandler())
-                .addInterceptors(new HttpSessionHandshakeInterceptor())
-                .setAllowedOriginPatterns("*"); // 更灵活的跨域控制
-    }
-}

+ 41 - 12
src/main/java/com/zhentao/information/config/NettyConfig.java

@@ -18,12 +18,13 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Netty服务器配置类
- * 配置WebSocket服务器的启动参数和处理器链
+ * 配置WebSocket服务器的启动参数和处理器链,集成AI功能
  */
 @Slf4j
 @Configuration
@@ -32,7 +33,7 @@ public class NettyConfig {
     /**
      * WebSocket服务器端口
      */
-    @Value("${netty.websocket.port}")
+    @Value("${netty.websocket.port:8888}")
     private int port;
 
     /**
@@ -42,6 +43,12 @@ public class NettyConfig {
     private WebSocketHandler webSocketHandler;
 
     /**
+     * 线程组,用于优雅关闭
+     */
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    /**
      * 配置并启动Netty服务器
      * @return ServerBootstrap实例
      */
@@ -49,9 +56,9 @@ public class NettyConfig {
     public ServerBootstrap serverBootstrap() {
         // 创建主从线程组
         // bossGroup用于接收客户端连接
-        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
-        // workerGroup用于处理客户端数据
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        bossGroup = new NioEventLoopGroup(1);
+        // workerGroup用于处理客户端数据,使用CPU核心数的2倍
+        workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
         
         // 创建服务器启动对象
         ServerBootstrap bootstrap = new ServerBootstrap();
@@ -59,11 +66,15 @@ public class NettyConfig {
                 // 设置服务器通道实现
                 .channel(NioServerSocketChannel.class)
                 // 设置线程队列等待连接个数
-                .option(ChannelOption.SO_BACKLOG, 128)
+                .option(ChannelOption.SO_BACKLOG, 1024)
                 // 设置保持活动连接状态
                 .childOption(ChannelOption.SO_KEEPALIVE, true)
                 // 禁用Nagle算法,减少延迟
                 .childOption(ChannelOption.TCP_NODELAY, true)
+                // 设置接收缓冲区大小
+                .childOption(ChannelOption.SO_RCVBUF, 65536)
+                // 设置发送缓冲区大小
+                .childOption(ChannelOption.SO_SNDBUF, 65536)
                 // 设置处理器
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
@@ -74,12 +85,12 @@ public class NettyConfig {
                                 .addLast(new HttpServerCodec())
                                 // 支持大数据流
                                 .addLast(new ChunkedWriteHandler())
-                                // HTTP消息聚合器
-                                .addLast(new HttpObjectAggregator(65536))
-                                // 调整心跳检测时间:30秒没有收到消息就触发
-                                .addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS))
-                                // WebSocket协议处理器
-                                .addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536))
+                                // HTTP消息聚合器,增加缓冲区大小以支持AI流式响应
+                                .addLast(new HttpObjectAggregator(1024 * 1024))
+                                // 调整心跳检测时间:60秒没有收到消息就触发
+                                .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
+                                // WebSocket协议处理器,支持AI流式输出
+                                .addLast(new WebSocketServerProtocolHandler("/ws", null, true, 1024 * 1024))
                                 // 自定义消息处理器
                                 .addLast(webSocketHandler);
                     }
@@ -89,11 +100,29 @@ public class NettyConfig {
             // 绑定端口并启动服务器
             bootstrap.bind(port).sync();
             log.info("Netty WebSocket服务器启动成功,端口:{}", port);
+            log.info("AI功能已集成,发送消息给用户ID: 1933707308387405824 将触发AI对话");
         } catch (InterruptedException e) {
             log.error("Netty WebSocket服务器启动失败", e);
             Thread.currentThread().interrupt();
+            // 优雅关闭线程组
+            shutdownGracefully();
         }
         
         return bootstrap;
     }
+
+    /**
+     * 应用关闭时优雅关闭Netty服务器
+     */
+    @PreDestroy
+    public void shutdownGracefully() {
+        log.info("正在关闭Netty WebSocket服务器...");
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+        log.info("Netty WebSocket服务器已关闭");
+    }
 } 

+ 250 - 11
src/main/java/com/zhentao/information/handler/WebSocketHandler.java

@@ -1,8 +1,13 @@
 package com.zhentao.information.handler;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.zhentao.Ai.dto.DeeseekRequest;
 import com.zhentao.groups.MongoDB.pojo.GroupMessage;
-import com.zhentao.groups.MongoDB.pojo.Message;
+import com.zhentao.information.entity.Message;
 import com.zhentao.information.entity.ChatMessage;
 import com.zhentao.information.repository.ChatMessageRepository;
 import com.zhentao.information.service.WebSocketService;
@@ -15,17 +20,34 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * WebSocket消息处理器
- * 处理WebSocket连接、消息接收和发送
+ * 处理WebSocket连接、消息接收和发送,集成AI功能
  */
 @Slf4j
 @Component
 @ChannelHandler.Sharable
 public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 
+    private static final Gson gson = new Gson();
+    private static final String DEEPSEEK_API_URL = "https://api.deepseek.com/chat/completions";
+    private static final String API_KEY = "sk-df51dab7323441998d41f18494098ddc";
+    private static final String AI_USER_ID = "1933707308387405824";
+
     @Resource
     private ChatMessageRepository chatMessageRepository;
 
@@ -39,8 +61,23 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
         String text = msg.text();
         log.info("收到消息:{}", text);
+        
+        // 检查消息是否为空或无效
+        if (text == null || text.trim().isEmpty()) {
+            log.warn("收到空消息或无效消息");
+            sendErrorMessage(ctx, "消息不能为空");
+            return;
+        }
+        
         try {
-            Message message = JSON.parseObject(text, Message.class);
+            // 尝试解析JSON消息
+            Message message = parseMessage(text);
+            if (message == null) {
+                log.error("消息解析失败,原始消息:{}", text);
+                sendErrorMessage(ctx, "消息格式错误");
+                return;
+            }
+            
             log.info("接收到的消息:{}", message);
 
             // 如果是连接消息,处理token
@@ -73,33 +110,234 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
                 return;
             }
 
+            // 检查是否是发送给AI的消息
+            if (AI_USER_ID.equals(message.getToUserId())) {
+                handleAIMessage(message, ctx);
+                return;
+            }
+
             // 处理普通消息
             handleMessage(message);
 
+        } catch (JSONException e) {
+            log.error("JSON解析失败,原始消息:{},错误:{}", text, e.getMessage());
+            sendErrorMessage(ctx, "消息格式错误:" + e.getMessage());
         } catch (Exception e) {
-            log.error("处理消息失败", e);
+            log.error("处理消息失败,原始消息:{},错误:{}", text, e.getMessage(), e);
             // 发送错误消息给客户端
             Message errorMessage = new Message();
             errorMessage.setType("error");
-            errorMessage.setContent("消息处理失败");
+            errorMessage.setContent("消息处理失败" + e.getMessage());
             ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage)));
         }
     }
 
     /**
+     * 解析消息,增加错误处理
+     */
+    private Message parseMessage(String text) {
+        try {
+            // 首先验证JSON格式
+            if (!text.trim().startsWith("{")) {
+                log.error("消息不是有效的JSON格式,期望以{开头,实际:{}", text);
+                return null;
+            }
+            
+            Message message = JSON.parseObject(text, Message.class);
+            
+            // 验证必要字段
+            if (message.getType() == null) {
+                log.warn("消息缺少type字段:{}", text);
+            }
+            
+            return message;
+        } catch (JSONException e) {
+            log.error("JSON解析异常:{},原始消息:{}", e.getMessage(), text);
+            return null;
+        } catch (Exception e) {
+            log.error("消息解析异常:{},原始消息:{}", e.getMessage(), text);
+            return null;
+        }
+    }
+
+    /**
+     * 将information包的Message转换为MongoDB包的Message
+     */
+    private com.zhentao.groups.MongoDB.pojo.Message convertToMongoMessage(Message message) {
+        com.zhentao.groups.MongoDB.pojo.Message mongoMessage = new com.zhentao.groups.MongoDB.pojo.Message();
+        mongoMessage.setType(message.getType());
+        mongoMessage.setFromUserId(message.getFromUserId());
+        mongoMessage.setToUserId(message.getToUserId());
+        mongoMessage.setGroupId(message.getGroupId());
+        mongoMessage.setContent(message.getContent());
+        mongoMessage.setTimestamp(message.getTimestamp());
+        mongoMessage.setFileUrl(message.getFileUrl());
+        mongoMessage.setFileName(message.getFileName());
+        mongoMessage.setFileType(message.getFileType());
+        mongoMessage.setFileSize(message.getFileSize());
+        mongoMessage.setAvatar(message.getAvatar());
+        return mongoMessage;
+    }
+
+    /**
+     * 处理AI消息
+     */
+    private void handleAIMessage(Message message, ChannelHandlerContext ctx) {
+        String question = message.getContent();
+        if (question == null || question.trim().isEmpty()) {
+            sendErrorMessage(ctx, "问题不能为空");
+            return;
+        }
+
+        // 发送AI开始处理的消息
+        Message aiStartMessage = new Message();
+        aiStartMessage.setType("ai_start");
+        aiStartMessage.setContent("AI正在思考中...");
+        aiStartMessage.setFromUserId(AI_USER_ID);
+        aiStartMessage.setToUserId(message.getFromUserId());
+        ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(aiStartMessage)));
+
+        // 使用线程池处理AI请求
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        executor.submit(() -> processAIStreamingResponse(message, ctx));
+        executor.shutdown();
+    }
+
+    /**
+     * 处理AI流式响应
+     */
+    private void processAIStreamingResponse(Message originalMessage, ChannelHandlerContext ctx) {
+        HttpURLConnection connection = null;
+        try {
+            URL url = new URL(DEEPSEEK_API_URL);
+            connection = (HttpURLConnection) url.openConnection();
+            connection.setRequestMethod("POST");
+            connection.setRequestProperty("Content-Type", "application/json");
+            connection.setRequestProperty("Authorization", "Bearer " + API_KEY);
+            connection.setRequestProperty("Accept", "text/event-stream");
+            connection.setDoOutput(true);
+            connection.setDoInput(true);
+            connection.setUseCaches(false);
+            connection.setConnectTimeout(5000);
+            connection.setReadTimeout(30000);
+
+            // 发送请求体
+            try (OutputStream os = connection.getOutputStream()) {
+                os.write(buildAIRequestBody(originalMessage.getContent()).getBytes(StandardCharsets.UTF_8));
+                os.flush();
+            }
+
+            // 处理流式响应
+            try (BufferedReader reader = new BufferedReader(
+                    new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {
+
+                String line;
+                while ((line = reader.readLine()) != null && ctx.channel().isActive()) {
+                    if (line.startsWith("data: ") && !line.equals("data: [DONE]")) {
+                        String content = parseAIContent(line.substring(6));
+                        if (content != null) {
+                            // 发送AI流式响应
+                            Message aiResponse = new Message();
+                            aiResponse.setType("ai_stream");
+                            aiResponse.setContent(content);
+                            aiResponse.setFromUserId(AI_USER_ID);
+                            aiResponse.setToUserId(originalMessage.getFromUserId());
+                            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(aiResponse)));
+                        }
+                    }
+                }
+            }
+
+            // 发送AI完成消息
+            Message aiCompleteMessage = new Message();
+            aiCompleteMessage.setType("ai_complete");
+            aiCompleteMessage.setContent("AI回答完成");
+            aiCompleteMessage.setFromUserId(AI_USER_ID);
+            aiCompleteMessage.setToUserId(originalMessage.getFromUserId());
+            ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(aiCompleteMessage)));
+
+        } catch (SocketTimeoutException e) {
+            sendErrorMessage(ctx, "AI响应超时,请重试");
+        } catch (IOException e) {
+            sendErrorMessage(ctx, "AI网络错误: " + e.getMessage());
+        } finally {
+            if (connection != null) connection.disconnect();
+        }
+    }
+
+    /**
+     * 构建AI请求体
+     */
+    private String buildAIRequestBody(String question) {
+        List<DeeseekRequest.Message> messages = new ArrayList<>();
+        messages.add(DeeseekRequest.Message.builder()
+                .role("system")
+                .content("你是一个佳佳聊天小助手,请用中文回答")
+                .build());
+        messages.add(DeeseekRequest.Message.builder()
+                .role("user")
+                .content(question)
+                .build());
+
+        return gson.toJson(DeeseekRequest.builder()
+                .model("deepseek-chat")
+                .messages(messages)
+                .stream(true)
+                .build());
+    }
+
+    /**
+     * 解析AI响应内容
+     */
+    private String parseAIContent(String json) {
+        try {
+            JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
+            if (obj.has("choices")) {
+                JsonObject delta = obj.getAsJsonArray("choices")
+                        .get(0).getAsJsonObject()
+                        .getAsJsonObject("delta");
+                if (delta.has("content")) {
+                    return delta.get("content").getAsString();
+                }
+            }
+            return null;
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    /**
+     * 发送错误消息
+     */
+    private void sendErrorMessage(ChannelHandlerContext ctx, String message) {
+        try {
+            if (ctx.channel().isActive()) {
+                Message errorMessage = new Message();
+                errorMessage.setType("ai_error");
+                errorMessage.setContent(message);
+                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(errorMessage)));
+            }
+        } catch (Exception e) {
+            log.error("发送错误消息失败", e);
+        }
+    }
+
+    /**
      * 处理群聊消息
      */
     private void handleGroupMessage(Message message) {
         // 设置群聊消息类型
         message.setType(message.getType() != null ? message.getType() : "group_chat");
+        // 转换为MongoDB Message类型
+        com.zhentao.groups.MongoDB.pojo.Message mongoMessage = convertToMongoMessage(message);
         // 广播消息给群内所有成员
-        boolean sent = webSocketService.handleGroupMessage(message);
+        boolean sent = webSocketService.handleGroupMessage(mongoMessage);
         if (sent) {
             // 发送消息确认
             Message ackMessage = new Message();
             ackMessage.setType("message_ack");
             ackMessage.setContent("群消息已发送");
-            webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
+            webSocketService.sendMessageToUser(message.getFromUserId(), convertToMongoMessage(ackMessage));
         }
     }
 
@@ -131,22 +369,23 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
         // 保存消息到MongoDB
         chatMessageRepository.save(chatMessage);
 
-        // 发送消息给接收者
-        boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), message);
+        // 转换为MongoDB Message类型并发送消息给接收者
+        com.zhentao.groups.MongoDB.pojo.Message mongoMessage = convertToMongoMessage(message);
+        boolean sent = webSocketService.sendMessageToUser(message.getToUserId(), mongoMessage);
         if (sent) {
             log.info("消息已发送给用户: {}, 内容: {}", message.getToUserId(), message.getContent());
             // 发送消息确认给发送者
             Message ackMessage = new Message();
             ackMessage.setType("message_ack");
             ackMessage.setContent("消息已发送");
-            webSocketService.sendMessageToUser(message.getFromUserId(), ackMessage);
+            webSocketService.sendMessageToUser(message.getFromUserId(), convertToMongoMessage(ackMessage));
         } else {
             log.info("用户 {} 不在线,消息已保存到MongoDB", message.getToUserId());
             // 发送消息未送达通知给发送者
             Message offlineMessage = new Message();
             offlineMessage.setType("message_offline");
             offlineMessage.setContent("对方不在线,消息已保存");
-            webSocketService.sendMessageToUser(message.getFromUserId(), offlineMessage);
+            webSocketService.sendMessageToUser(message.getFromUserId(), convertToMongoMessage(offlineMessage));
         }
     }
 

+ 0 - 55
src/main/java/com/zhentao/information/netty/NettyServer.java

@@ -1,55 +0,0 @@
-package com.zhentao.information.netty;
-
-import com.zhentao.information.handler.WebSocketHandler;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-
-/**
- * Netty服务器启动类
- * 启动动一个基于 Netty 的 WebSocket 服务器
- * 启动 WebSocket 服务器:
- * 它会配置一个 Netty 的 ServerBootstrap,并绑定到指定的端口(默认是 8888)。
- * 它会初始化一个 Netty 的通道管道(ChannelPipeline),并添加各种处理器来处理 WebSocket 连接和消息。
- */
-@Slf4j
-@Component
-public class NettyServer {
-
-    @Value("${netty.port:8888}")
-    private int port;
-
-    @Resource
-    private ServerBootstrap serverBootstrap;
-
-    @Resource
-    private WebSocketHandler webSocketHandler;
-
-    @PostConstruct
-    public void start() throws Exception {
-        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(SocketChannel ch) {
-                ch.pipeline()
-                        .addLast(new HttpServerCodec())
-                        .addLast(new ChunkedWriteHandler())
-                        .addLast(new HttpObjectAggregator(65536))
-                        .addLast(new WebSocketServerProtocolHandler("/ws"))
-                        .addLast(webSocketHandler);
-            }
-        });
-
-        serverBootstrap.bind(port).sync();
-        log.info("Netty服务器启动成功,端口:{}", port);
-    }
-}

+ 1 - 0
src/main/java/com/zhentao/user/service/impl/UserLoginServiceImpl.java

@@ -215,6 +215,7 @@ public class UserLoginServiceImpl extends ServiceImpl<UserLoginMapper, UserLogin
                 Map<String,Object> map = new HashMap<>();
                 map.put("token",jwtToken);
                 map.put("userId",one.getId()+"");
+                map.put("image",one.getUserName()+"");
                 //用户上线
                 onlineStatusService.userGoOnline(one.getId());