Browse Source

Merge branch 'jc' of http://git.workervip.com/JC2407A/neko into yv

p裴秀宇 1 month ago
parent
commit
e4949a5747

+ 63 - 1
pom.xml

@@ -12,6 +12,8 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
         <spring-boot.version>2.6.13</spring-boot.version>
+        <kafka.version>2.6.6</kafka.version>
+        <kafka.client.version>2.5.1</kafka.client.version>
     </properties>
     <dependencies>
         <dependency>
@@ -39,7 +41,15 @@
             <artifactId>mybatis-plus-boot-starter</artifactId>
             <version>3.5.9</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.14.4</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
@@ -99,6 +109,58 @@
             <artifactId>spring-boot-starter-websocket</artifactId>
         </dependency>
 
+<!--        新增-->
+        <dependency>
+            <groupId>io.swagger.core.v3</groupId>
+            <artifactId>swagger-annotations-jakarta</artifactId>
+            <version>2.2.19</version>
+        </dependency>
+        <dependency>
+            <groupId>io.swagger.core.v3</groupId>
+            <artifactId>swagger-core-jakarta</artifactId>
+            <version>2.2.19</version>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.validation</groupId>
+            <artifactId>jakarta.validation-api</artifactId>
+            <version>3.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.minio</groupId>
+            <artifactId>minio</artifactId>
+            <version>7.1.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hibernate.validator</groupId>
+            <artifactId>hibernate-validator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.coobird</groupId>
+            <artifactId>thumbnailator</artifactId>
+            <version>0.4.8</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-clients</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+
+
     </dependencies>
     <dependencyManagement>
         <dependencies>

+ 132 - 0
src/main/java/com/neko/chat/WebSocketServer.java

@@ -0,0 +1,132 @@
+package com.neko.chat;
+
+import com.alibaba.fastjson2.JSON;
+import com.neko.config.SpringConfigurator;
+import com.neko.domain.dto.messageDto.SendMessageContentDto;
+import com.neko.service.MessageService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+@Component
+@ServerEndpoint(value = "/ws/{receiverId}/{userId}",configurator = SpringConfigurator.class)
+public class WebSocketServer {
+    private String receiverId;
+    private String token;
+    @Autowired
+    MessageService chatService;
+    //存放会话对象
+    public static ConcurrentMap<String, Session> sessionMap = new ConcurrentHashMap<>();
+
+
+
+    /**
+     * 连接建立成功调用的方法
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("userId") String userId) {
+        System.out.println("客户端:" + userId + "建立连接");
+        sessionMap.put(userId, session);
+        System.out.println("当前在线人数:" + sessionMap.size());
+    }
+
+    /**
+     * 收到客户端消息后调用的方法
+     *
+     * @param message 客户端发送过来的消息
+     */
+    @OnMessage
+    public void onMessage(String message, @PathParam("userId") String userId,@PathParam("receiverId") String receiverId) {
+//        sendToClient(userId,friendId,message);
+        System.out.println("客服端接受到用户:" + userId + "向好友:"+receiverId+"发送的消息:" + message);
+        Map<String, String> map = JSON.parseObject(message, Map.class);
+        SendMessageContentDto sendMessageContent =  new SendMessageContentDto(userId, receiverId, map.get("content"));
+
+        boolean success = chatService.sendMessage(sendMessageContent);
+        if (success){
+            System.out.println("发送消息成功");
+        }else {
+            System.out.println("发送消息失败");
+        }
+    }
+
+    /**
+     * 连接关闭调用的方法
+     *
+     * @param userId
+     */
+    @OnClose
+    public void onClose(@PathParam("userId") Integer userId) {
+        System.out.println("客户端:" + userId + "关闭连接");
+        sessionMap.remove(userId);
+    }
+
+    /**
+     * 群发
+     *
+     * @param message
+     */
+    public void sendToAllClient(String message) {
+        Collection<Session> sessions = sessionMap.values();
+        for (Session session : sessions) {
+            try {
+                System.out.println("服务端向所有客户端发送消息:" + message);
+                session.getBasicRemote().sendText(message);
+            } catch (Exception e) {}
+        }
+    }
+
+    /**
+     * 私发
+     *
+     * @param message
+     */
+    public static void sendToClient(Integer userId,Integer friendId,String message) {
+        Session session = sessionMap.get(friendId);
+        if (session != null){
+            try {
+                System.out.println("用户:" + userId + "向好友:"+friendId+"发送了消息:" + message);
+                Map<String, Object> map = new HashMap<>();
+                map.put("message",message);
+                SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+                map.put("time",sdf.format(new Date()));
+                String jsonString = JSON.toJSONString(map);
+                session.getBasicRemote().sendText(jsonString);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /**
+     * AI发送的信息
+     */
+    public static void aiSendToClient(Integer userId,String message) {
+        Session session = sessionMap.get(userId);
+        if (session != null){
+            try {
+                System.out.println("AI向:" + userId +"发送了消息:" + message);
+                Map<String, Object> map = new HashMap<>();
+                map.put("message",message);
+                map.put("type","waring");
+                String jsonString = JSON.toJSONString(map);
+                session.getBasicRemote().sendText(jsonString);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 24 - 0
src/main/java/com/neko/config/SpringConfigurator.java

@@ -0,0 +1,24 @@
+package com.neko.config;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+import javax.websocket.server.ServerEndpointConfig;
+
+@Component
+public class SpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware {
+
+    private static ApplicationContext applicationContext;
+
+    @Override
+    public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
+        return applicationContext.getBean(endpointClass);
+    }
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        SpringConfigurator.applicationContext = applicationContext;
+    }
+}

+ 46 - 0
src/main/java/com/neko/config/WebSocketConfig.java

@@ -0,0 +1,46 @@
+package com.neko.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;
+
+/**
+ * @Title: WebSocketConfig
+ * @Package: com.jiayuan.common.config
+ * @Description: websocket配置
+ * @Author: xmc
+ * @Date: 创建时间 2024-04-24
+ */
+@Configuration
+public class WebSocketConfig {
+
+    /**
+     * 自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
+     *
+     * @return
+     */
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter() {
+        return new ServerEndpointExporter();
+    }
+
+    /**
+     * 通信文本消息和二进制缓存区大小
+     * 避免对接 第三方 报文过大时,Websocket 1009 错误
+     *
+     * @return
+     */
+
+    @Bean
+    public ServletServerContainerFactoryBean createWebSocketContainer() {
+        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
+        // 在此处设置bufferSize
+        container.setMaxTextMessageBufferSize(10240000);
+        container.setMaxBinaryMessageBufferSize(10240000);
+        container.setMaxSessionIdleTimeout(15 * 60000L);
+        return container;
+    }
+}
+

+ 16 - 0
src/main/java/com/neko/consumer/Consumer.java

@@ -0,0 +1,16 @@
+package com.neko.consumer;
+
+import com.alibaba.fastjson2.JSON;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+@Component
+public class Consumer {
+    @KafkaListener(topics = "offending_user_topic")
+    public void job1(String msg){
+        System.out.println("消费了:"+msg);
+        Map map = JSON.parseObject(msg, Map.class);
+    }
+}

+ 7 - 2
src/main/java/com/neko/controller/MessageController.java

@@ -6,6 +6,7 @@ import com.neko.utils.ResultVo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
@@ -24,8 +25,12 @@ public class MessageController {
      * @return
      */
     @RequestMapping("sendMessage")
-    public ResultVo sendMessage(@RequestBody SendMessageContentDto dto){
+    public boolean sendMessage(@RequestBody SendMessageContentDto dto){
         return messageService.sendMessage(dto);
     }
-
+    @RequestMapping("chatHistory")
+    public ResultVo chatHistory(@RequestParam("userId") String userId,@RequestParam("receiverId") String receiverId){
+        System.err.println("userId:"+userId+"receiverId:"+receiverId);
+        return messageService.findAll(userId,  receiverId);
+    }
 }

+ 4 - 0
src/main/java/com/neko/domain/dto/messageDto/SendMessageContentDto.java

@@ -1,6 +1,8 @@
 package com.neko.domain.dto.messageDto;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.springframework.web.multipart.MultipartFile;
 
 /**
@@ -8,6 +10,8 @@ import org.springframework.web.multipart.MultipartFile;
  * @Author neko
  **/
 @Data
+@AllArgsConstructor
+@NoArgsConstructor
 public class SendMessageContentDto {
     private String senderId;//发送者
     private String receiverId;//接受者

+ 3 - 0
src/main/java/com/neko/domain/pojo/Message.java

@@ -73,4 +73,7 @@ public class Message implements Serializable {
 
     @TableField(exist = false)
     private static final long serialVersionUID = 1L;
+
+    @TableField(exist = false)
+    private MessageContentText messageContentText;
 }

+ 5 - 1
src/main/java/com/neko/service/MessageService.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.extension.service.IService;
 import com.neko.domain.dto.messageDto.SendMessageContentDto;
 import com.neko.domain.pojo.Message;
 import com.neko.utils.ResultVo;
+import org.apache.ibatis.annotations.Param;
 
 /**
 * @author 金聪
@@ -12,5 +13,8 @@ import com.neko.utils.ResultVo;
 */
 public interface MessageService extends IService<Message> {
 
-    ResultVo sendMessage(SendMessageContentDto dto);
+    boolean sendMessage(SendMessageContentDto dto);
+
+
+    ResultVo findAll(@Param("userId") String userId, @Param("receiverId") String receiverId);
 }

+ 23 - 2
src/main/java/com/neko/service/impl/MessageServiceImpl.java

@@ -1,5 +1,6 @@
 package com.neko.service.impl;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
@@ -17,6 +18,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -35,7 +37,7 @@ public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message>
     MessageContentTextMapper messageContentTextMapper;
 
     @Override
-    public ResultVo sendMessage(SendMessageContentDto dto) {
+    public boolean sendMessage(SendMessageContentDto dto) {
         Message message = new Message();
         message.setId(String.valueOf(SnowflakeUtil.nextId()));
         String uuid = UUID.randomUUID().toString().replaceAll("-","");
@@ -64,8 +66,27 @@ public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message>
         messageMapper.insert(message);
         messageContentTextMapper.insert(messageContentText);
 
-        return ResultVo.success("发送成功");
+        return true;
     }
+
+    @Override
+    public ResultVo findAll(String userId, String receiverId) {
+        List<Message> messages = messageMapper.selectList(new LambdaQueryWrapper<Message>()
+                .eq(Message::getSenderId, userId)
+                .eq(Message::getReceiverId, receiverId));
+        List<Message> messages1 = messageMapper.selectList(new LambdaQueryWrapper<Message>()
+                .eq(Message::getSenderId, receiverId)
+                .eq(Message::getReceiverId, userId));
+        for (Message msg : messages1){
+            messages.add(msg);
+        }
+        for (Message message : messages) {
+            message.setMessageContentText(messageContentTextMapper.selectById(message.getContentId()));
+        }
+        System.err.println("messages"+messages);
+        return ResultVo.success(messages);
+    }
+
 }
 
 

+ 9 - 0
src/main/resources/application.yml

@@ -16,6 +16,15 @@ spring:
   redis:
     host: 127.0.0.1
     port: 6379
+  kafka:
+    bootstrap-servers: 47.95.170.81:9092
+    producer:
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+    consumer:
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      group-id: nekomimi
 mybatis-plus:
   mapper-locations: classpath:/mapper/*.xml
   type-aliases-package: com.neko.domain.pojo