lzy 2 weeks ago
parent
commit
c4ce450196

+ 19 - 11
pom.xml

@@ -14,6 +14,21 @@
         <spring-boot.version>2.6.13</spring-boot.version>
     </properties>
     <dependencies>
+
+
+            <dependency>
+                <groupId>io.netty</groupId>
+                <artifactId>netty-all</artifactId>
+                <version>4.1.85.Final</version>
+            </dependency>
+            <dependency>
+                <groupId>com.alibaba.fastjson2</groupId>
+                <artifactId>fastjson2</artifactId>
+                <version>2.0.18</version>
+            </dependency>
+
+
+
         <dependency>
             <groupId>com.aliyun.oss</groupId>
             <artifactId>aliyun-sdk-oss</artifactId>
@@ -69,23 +84,16 @@
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
-            <version>4.1.86.Final</version>
-        </dependency>
+
+
 
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
-            <optional>true</optional>
+            <version>1.18.20</version>
+            <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-            <version>1.2.83</version>
-        </dependency>
 
         <!-- MySQL依赖 -->
         <dependency>

+ 9 - 0
src/main/java/com/zhentao/groups/ImApplication.java

@@ -0,0 +1,9 @@
+package com.zhentao.groups;
+
+import com.zhentao.groups.im.ImServer;
+
+public class ImApplication {
+    public static void main(String[] args) {
+        ImServer.start();
+    }
+}

+ 0 - 56
src/main/java/com/zhentao/groups/MongoDB/controller/GroupMessageController.java

@@ -1,56 +0,0 @@
-package com.zhentao.groups.MongoDB.controller;
-
-import com.zhentao.config.NullLogin;
-import com.zhentao.groups.MongoDB.pojo.GroupMessage;
-import com.zhentao.groups.MongoDB.pojo.Message;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.mongodb.core.MongoTemplate;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.StringRedisTemplate;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.util.Date;
-
-@RequestMapping("/groupMessage")
-@RestController
-public class GroupMessageController {
-
-    @Autowired
-    RedisTemplate<String,String> redisTemplate;
-
-    @Autowired
-    private MongoTemplate mongoTemplate;
-
-
-    @PostMapping("/cunchugroupMessage")
-    public String groupMessage(@RequestBody Message message)
-    {
-        GroupMessage groupMessage = new GroupMessage();
-        groupMessage.setGroup_id(message.getGroup_id());
-        groupMessage.setSender_id(message.getSender_id());
-        groupMessage.setContent(message.getContent());
-        groupMessage.setContent_type(message.getContent_type());
-        groupMessage.setCreated_at(new Date());
-        mongoTemplate.save(groupMessage);
-        return "success";
-    }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-}

+ 3 - 0
src/main/java/com/zhentao/groups/controller/groupsController.java

@@ -63,4 +63,7 @@ public class groupsController {
 
 
 
+
+
+
 }

+ 24 - 0
src/main/java/com/zhentao/groups/im/Command.java

@@ -0,0 +1,24 @@
+package com.zhentao.groups.im;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class Command {
+
+//    建立信息编码
+    private Integer code;
+//    private Long id;
+
+//    发送名字
+    private Long id;
+
+//    群id
+    private String groupId;
+
+
+
+}

+ 26 - 0
src/main/java/com/zhentao/groups/im/CommandType.java

@@ -0,0 +1,26 @@
+package com.zhentao.groups.im;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@AllArgsConstructor
+@Getter
+public enum CommandType {
+//  建立连接
+    CONNECTION(10001),
+//    私聊
+    CHAT(10002),
+//  加入群聊
+    JSON_GROUP(10003),
+    ERROR(-1);
+    private Integer code;
+
+    public static CommandType match(Integer code) {
+        for (CommandType value : CommandType.values()) {
+            if (value.getCode().equals(code)) {
+                return value;
+            }
+        }
+        return ERROR;
+    }
+}

+ 56 - 0
src/main/java/com/zhentao/groups/im/ImServer.java

@@ -0,0 +1,56 @@
+package com.zhentao.groups.im;
+
+
+
+import com.zhentao.groups.im.handler.WebSocketHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ImServer {
+
+    public static final Map<Long,Channel> USERS=new ConcurrentHashMap<>();
+
+//    public static final ChannelGroup GROUP=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    public static final Map<String,ChannelGroup> GROUP=new ConcurrentHashMap<>();
+    public static void start(){
+        EventLoopGroup boss=new NioEventLoopGroup();
+
+        EventLoopGroup worker=new NioEventLoopGroup();
+
+//        监听端口
+        ServerBootstrap bootstrap=new ServerBootstrap();
+
+        bootstrap.group(boss,worker)
+                .channel(NioServerSocketChannel.class)
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel socketChannel) throws Exception {
+                        ChannelPipeline pipeline = socketChannel.pipeline();
+                        //  添加http编码解码器
+                        pipeline.addLast(new HttpServerCodec())
+                                // 支持大数据
+                                .addLast(new ChunkedWriteHandler())
+                                //  对http消息做聚合操作
+                                .addLast(new HttpObjectAggregator(1024*64))
+                                .addLast(new WebSocketServerProtocolHandler("/"))
+                                // 自定义
+                                .addLast(new WebSocketHandler());
+                    }
+                });
+
+        ChannelFuture future=bootstrap.bind(8888);
+    }
+
+}

+ 27 - 0
src/main/java/com/zhentao/groups/im/MessageType.java

@@ -0,0 +1,27 @@
+package com.zhentao.groups.im;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public enum MessageType {
+
+//    私聊
+    PRIVATE(1),
+//    群聊
+    GROUP(2),
+//    不支持类型
+    ERROR(3);
+
+    public  Integer type;
+
+    public static MessageType match(Integer type) {
+        for (MessageType value : MessageType.values()) {
+            if (value.getType().equals(type)) {
+                return value;
+            }
+        }
+        return ERROR;
+    }
+}

+ 33 - 0
src/main/java/com/zhentao/groups/im/Results.java

@@ -0,0 +1,33 @@
+package com.zhentao.groups.im;
+
+import com.alibaba.fastjson2.JSON;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class Results {
+
+    private String name;
+    private LocalDateTime time;
+    private String message;
+
+    public static TextWebSocketFrame fail(String message){
+        return new TextWebSocketFrame(JSON.toJSONString((new Results("系统消息", LocalDateTime.now(), message))));
+    }
+
+    public static TextWebSocketFrame success(String message){
+        return new TextWebSocketFrame(JSON.toJSONString((new Results("系统消息", LocalDateTime.now(), message))));
+    }
+    public static TextWebSocketFrame success(String user,String message){
+        return new TextWebSocketFrame(JSON.toJSONString((new Results(user, LocalDateTime.now(), message))));
+    }
+
+
+
+}

+ 23 - 0
src/main/java/com/zhentao/groups/im/command/ChatMessage.java

@@ -0,0 +1,23 @@
+package com.zhentao.groups.im.command;
+
+
+
+import com.zhentao.groups.im.Command;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ChatMessage extends Command {
+
+//    消息类型
+    private Integer type;
+
+//  接受人
+    private Long target;
+
+//  内容
+    private String content;
+}

+ 62 - 0
src/main/java/com/zhentao/groups/im/handler/ChatHandler.java

@@ -0,0 +1,62 @@
+package com.zhentao.groups.im.handler;
+
+import com.alibaba.fastjson2.JSON;
+import com.zhentao.groups.im.ImServer;
+import com.zhentao.groups.im.MessageType;
+import com.zhentao.groups.im.Results;
+import com.zhentao.groups.im.command.ChatMessage;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.internal.StringUtil;
+
+public class ChatHandler {
+
+
+    public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame){
+        try{
+            ChatMessage chat = JSON.parseObject(frame.text(),ChatMessage.class);
+            System.err.println(chat);
+            switch (MessageType.match(chat.getType())){
+                case PRIVATE:
+                        if (chat.getTarget()==null){
+                            ctx.channel().writeAndFlush(Results.fail("发送消息失败,发送消息请选择发送的人"));
+                            return;
+                        }
+
+                        Channel channel= ImServer.USERS.get(chat.getTarget());
+                    System.err.println(channel);
+                        if (channel == null || !channel.isActive()){
+                            ctx.channel().writeAndFlush(Results.fail("消息发送失败,对方")+chat.getTarget().toString()+"不在线");
+                        }else {
+
+                            channel.writeAndFlush(Results.success("私聊消息("+chat.getId()+")", chat.getContent()));
+
+                        }
+                    break;
+                case GROUP:
+                    if (StringUtil.isNullOrEmpty(chat.getGroupId())) {
+                        ctx.channel().writeAndFlush(Results.fail("发送群聊消息失败,请指定群名称"));
+                        return;
+                    }
+                    ChannelGroup group = ImServer.GROUP.get(chat.getGroupId());
+                    System.err.println(group);
+                    if (group == null) {
+                        ctx.channel().writeAndFlush(Results.fail("消息发送失败,群 " + chat.getId() + " 不存在"));
+                    } else {
+                        group.writeAndFlush(Results.success("群聊消息 发送者(" + chat.getId()+ ")", chat.getContent()));
+                    }
+                    break;
+                default:
+                    ctx.channel().writeAndFlush(Results.fail("不支持的消息类型"));
+                    break;
+            }
+
+
+        }catch (Exception e){
+            ctx.channel().writeAndFlush(Results.fail("发送消息格式错误"));
+        }
+
+    }
+}

+ 27 - 0
src/main/java/com/zhentao/groups/im/handler/ConnectionHandler.java

@@ -0,0 +1,27 @@
+package com.zhentao.groups.im.handler;
+
+import com.alibaba.fastjson2.JSON;
+import com.zhentao.groups.im.Command;
+import com.zhentao.groups.im.ImServer;
+import com.zhentao.groups.im.Results;
+import io.netty.channel.ChannelHandlerContext;
+
+public class ConnectionHandler {
+
+    public static void execute(ChannelHandlerContext ctx, Command command){
+        if (ImServer.USERS.containsKey(command.getId())){
+            ctx.channel().writeAndFlush(Results.fail("改用已经在线,请换个用户"));
+            ctx.channel().disconnect();
+            return;
+        }
+
+        ImServer.USERS.put(command.getId(),ctx.channel());
+
+        ctx.channel().writeAndFlush(Results.success("与服务端链接成功"));
+        ctx.channel().writeAndFlush(Results.success(JSON.toJSONString(ImServer.USERS.size())));
+
+
+
+    }
+
+}

+ 20 - 0
src/main/java/com/zhentao/groups/im/handler/JoinGroupHandler.java

@@ -0,0 +1,20 @@
+package com.zhentao.groups.im.handler;
+
+
+import com.zhentao.groups.im.ImServer;
+import com.zhentao.groups.im.Results;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+public class JoinGroupHandler {
+    public static void execute(ChannelHandlerContext ctx,String groupId) {
+
+        ImServer.GROUP.computeIfAbsent(groupId, k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
+
+        ImServer.GROUP.get(groupId).add(ctx.channel());
+
+        ctx.channel().writeAndFlush(Results.success("加入群聊成功"));
+    }
+
+}

+ 41 - 0
src/main/java/com/zhentao/groups/im/handler/WebSocketHandler.java

@@ -0,0 +1,41 @@
+package com.zhentao.groups.im.handler;
+
+import com.alibaba.fastjson2.JSON;
+import com.zhentao.groups.im.Command;
+import com.zhentao.groups.im.CommandType;
+import com.zhentao.groups.im.Results;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+//    接受信息
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
+        try{
+            //解析
+            Command command= JSON.parseObject(frame.text(),Command.class);
+            System.err.println("command="+command);
+            switch (CommandType.match(command.getCode())) {
+//                建立连接
+                case CONNECTION:
+                    ConnectionHandler.execute(ctx,command);
+                    break;
+//                    私聊
+                case  CHAT:
+                    ChatHandler.execute(ctx,frame);
+//                    群聊
+                case JSON_GROUP:
+                    JoinGroupHandler.execute(ctx,command.getGroupId());
+                default:
+                    ctx.channel().writeAndFlush(Results.fail("不支持CODE"));
+                    break;
+            }
+        }catch (Exception e){
+            ctx.channel().writeAndFlush(Results.fail(e.getMessage()));
+        }
+
+
+
+    }
+}

+ 2 - 2
src/main/java/com/zhentao/groups/pojo/GroupMembers.java

@@ -1,7 +1,7 @@
 package com.zhentao.groups.pojo;
 
-import com.alibaba.fastjson.annotation.JSONField;
-import com.baomidou.mybatisplus.annotation.IdType;
+
+
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;

+ 2 - 1
src/main/java/com/zhentao/information/controller/MessageController.java

@@ -1,6 +1,7 @@
 package com.zhentao.information.controller;
 
-import com.alibaba.fastjson.JSON;
+
+import com.alibaba.fastjson2.JSON;
 import com.zhentao.config.NullLogin;
 import com.zhentao.information.entity.ChatMessage;
 import com.zhentao.information.entity.Message;

+ 2 - 1
src/main/java/com/zhentao/information/handler/WebSocketHandler.java

@@ -1,6 +1,7 @@
 package com.zhentao.information.handler;
 
-import com.alibaba.fastjson.JSON;
+
+import com.alibaba.fastjson2.JSON;
 import com.zhentao.information.entity.Message;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;

+ 51 - 51
src/main/java/com/zhentao/information/netty/NettyServer.java

@@ -1,51 +1,51 @@
-package com.zhentao.information.netty;
-
-import com.zhentao.information.handler.WebSocketHandler;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpServerCodec;
-import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.Resource;
-
-/**
- * Netty服务器启动类
- */
-@Slf4j
-@Component
-public class NettyServer {
-
-    @Value("${netty.port:8888}")
-    private int port;
-
-    @Resource
-    private ServerBootstrap serverBootstrap;
-
-    @Resource
-    private WebSocketHandler webSocketHandler;
-
-    @PostConstruct
-    public void start() throws Exception {
-        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-            @Override
-            protected void initChannel(SocketChannel ch) {
-                ch.pipeline()
-                        .addLast(new HttpServerCodec())
-                        .addLast(new ChunkedWriteHandler())
-                        .addLast(new HttpObjectAggregator(65536))
-                        .addLast(new WebSocketServerProtocolHandler("/ws"))
-                        .addLast(webSocketHandler);
-            }
-        });
-
-        serverBootstrap.bind(port).sync();
-        log.info("Netty服务器启动成功,端口:{}", port);
-    }
-}
+//package com.zhentao.information.netty;
+//
+//import com.zhentao.information.handler.WebSocketHandler;
+//import io.netty.bootstrap.ServerBootstrap;
+//import io.netty.channel.ChannelInitializer;
+//import io.netty.channel.socket.SocketChannel;
+//import io.netty.handler.codec.http.HttpObjectAggregator;
+//import io.netty.handler.codec.http.HttpServerCodec;
+//import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+//import io.netty.handler.stream.ChunkedWriteHandler;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.PostConstruct;
+//import javax.annotation.Resource;
+//
+///**
+// * Netty服务器启动类
+// */
+//@Slf4j
+//@Component
+//public class NettyServer {
+//
+//    @Value("${netty.port:8888}")
+//    private int port;
+//
+//    @Resource
+//    private ServerBootstrap serverBootstrap;
+//
+//    @Resource
+//    private WebSocketHandler webSocketHandler;
+//
+//    @PostConstruct
+//    public void start() throws Exception {
+//        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+//            @Override
+//            protected void initChannel(SocketChannel ch) {
+//                ch.pipeline()
+//                        .addLast(new HttpServerCodec())
+//                        .addLast(new ChunkedWriteHandler())
+//                        .addLast(new HttpObjectAggregator(65536))
+//                        .addLast(new WebSocketServerProtocolHandler("/ws"))
+//                        .addLast(webSocketHandler);
+//            }
+//        });
+//
+//        serverBootstrap.bind(port).sync();
+//        log.info("Netty服务器启动成功,端口:{}", port);
+//    }
+//}

+ 1 - 1
src/main/resources/application.yml

@@ -38,4 +38,4 @@ aliyun:
     endpoint: https://oss-cn-beijing.aliyuncs.com
     accessKeyId: LTAI5tH3XWv25v5LyeapQq1K
     accessKeySecret: pEP2P1ezDkPZJwuMFkdrqVNRTlATok
-    bucketName: wangyongchun
+    bucketName: wangyongchun