Kaynağa Gözat

Merge branch 'master' of http://116.63.33.55/git/dt2

xiaoyue5430 2 yıl önce
ebeveyn
işleme
1b80df1491

+ 16 - 0
cms/pom.xml

@@ -175,6 +175,22 @@
 			<artifactId>javassist</artifactId>
 			<version>3.28.0-GA</version>
 		</dependency>
+
+		<!-- netty相关依赖 -->
+		<dependency>
+			<groupId>io.netty</groupId>
+			<artifactId>netty-all</artifactId>
+			<version>4.1.6.Final</version>
+		</dependency>
+
+		<!-- lombok -->
+		<dependency>
+			<groupId>org.projectlombok</groupId>
+			<artifactId>lombok</artifactId>
+			<version>1.18.20</version>
+			<scope>provided</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

+ 7 - 2
cms/src/main/java/com/jd/cms/Application.java

@@ -1,13 +1,14 @@
 package com.jd.cms;
 
+import com.jd.cms.nettySocket.NioWebSocketServer;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.context.ApplicationContext;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 
-
 @SpringBootApplication
 @EnableScheduling
 @ServletComponentScan
@@ -15,7 +16,11 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 public class Application {
 
 	public static void main(String[] args) {
-		SpringApplication.run(Application.class, args);
+
+		ApplicationContext applicationContext = SpringApplication.run(Application.class, args);
+		//从ioc容器获取NioWebSocketServer并启动websocket服务器
+		NioWebSocketServer nioWebSocketServer = applicationContext.getBean("nioWebSocketServer",NioWebSocketServer.class);
+		nioWebSocketServer.run();
 	}
 
 }

+ 55 - 0
cms/src/main/java/com/jd/cms/nettySocket/ChannelManage.java

@@ -0,0 +1,55 @@
+package com.jd.cms.nettySocket;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ChannelManage {
+    private static ChannelGroup GlobalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+    private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();
+
+    /**
+     * 添加通道
+     *
+     * @param channel
+     */
+    public static void addChannel(Channel channel) {
+        GlobalGroup.add(channel);
+        ChannelMap.put(channel.id().asShortText(), channel.id());
+    }
+
+    /**
+     * 移除通道
+     *
+     * @param channel
+     */
+    public static void removeChannel(Channel channel) {
+        GlobalGroup.remove(channel);
+        ChannelMap.remove(channel.id().asShortText());
+    }
+
+    /**
+     * 查找通道
+     *
+     * @param id
+     * @return
+     */
+    public static Channel findChannel(String id) {
+        return GlobalGroup.find(ChannelMap.get(id));
+    }
+
+    /**
+     * 群发消息
+     * @param tws  消息内容
+     * @param id   发送消息的通道id。需要排除掉这个id,不向这个通道发送消息
+     */
+    public static void send2All(TextWebSocketFrame tws, ChannelId id) {
+        GlobalGroup.writeAndFlush(tws, channel -> !channel.id().equals(id));
+    }
+}

+ 113 - 0
cms/src/main/java/com/jd/cms/nettySocket/NioWebSocketHandler.java

@@ -0,0 +1,113 @@
+package com.jd.cms.nettySocket;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.websocketx.*;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NioWebSocketHandler extends SimpleChannelInboundHandler<Object> {
+
+    private WebSocketServerHandshaker handshaker;
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        //log.info("收到消息" + msg);
+        if (msg instanceof FullHttpRequest) {
+            //以http请求形式接入,但是走的是websocket
+            handleHttpRequest(ctx, (FullHttpRequest) msg);
+        } else if (msg instanceof WebSocketFrame) {
+            //处理websocket客户端的消息
+            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        //添加连接
+        log.info("客户端加入连接:" + ctx.channel());
+        ChannelManage.addChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        //断开连接
+        log.info("客户端断开连接:" + ctx.channel());
+        ChannelManage.removeChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        ctx.flush();
+    }
+
+    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+        // 判断是否关闭链路的指令
+        if (frame instanceof CloseWebSocketFrame) {
+            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+            return;
+        }
+        // 判断是否ping消息
+        if (frame instanceof PingWebSocketFrame) {
+            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
+            return;
+        }
+        // 判断是不是文本消息,不是文本抛异常
+        if (!(frame instanceof TextWebSocketFrame)) {
+            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
+        }
+        // 把服务器收到的消息发送到每个通道
+        String request = ((TextWebSocketFrame) frame).text();
+        log.info("服务端收到:" + request);
+        //封装消息主体
+        JSONObject msg = JSONUtil.createObj()
+                .set("date", DateUtil.now())
+                .set("msg", request);
+        TextWebSocketFrame tws = new TextWebSocketFrame(msg.toString());
+        // 群发,把当前通道传过去,那边排除掉
+        ChannelManage.send2All(tws, ctx.channel().id());
+    }
+
+    /**
+     * 唯一的一次http请求,用于创建websocket
+     */
+    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
+        //要求Upgrade为websocket,过滤掉get/Post
+        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
+            //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
+            return;
+        }
+        //WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://192.168.0.114:8088/websocket", null, true);
+        handshaker = new WebSocketServerHandshaker13("ws://23.37.100.81:8088/websocket", null, true, 1024 * 1024 * 6);
+        if (handshaker == null) {
+            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+        } else {
+            handshaker.handshake(ctx.channel(), req);
+        }
+    }
+
+    /**
+     * 拒绝不合法的请求,并返回错误信息
+     */
+    private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
+        // 返回应答给客户端
+        if (res.status().code() != 200) {
+            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
+            res.content().writeBytes(buf);
+            buf.release();
+        }
+        ChannelFuture f = ctx.channel().writeAndFlush(res);
+    }
+}

+ 52 - 0
cms/src/main/java/com/jd/cms/nettySocket/NioWebSocketServer.java

@@ -0,0 +1,52 @@
+package com.jd.cms.nettySocket;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+
+@Slf4j
+@Component
+public class NioWebSocketServer implements Runnable {
+
+    private Integer port = 8088;
+
+    @Override
+    public void run() {
+        log.info("webSocket服务器正在启动");
+        NioEventLoopGroup boss = new NioEventLoopGroup();
+        NioEventLoopGroup work = new NioEventLoopGroup();
+        try {
+            ServerBootstrap bootstrap = new ServerBootstrap();
+            bootstrap.group(boss, work);
+            bootstrap.channel(NioServerSocketChannel.class);
+            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                    socketChannel.pipeline()
+                            .addLast("http-codec", new HttpServerCodec())
+                            .addLast("aggregator", new HttpObjectAggregator(65536))
+                            //自定义消息处理器
+                            .addLast("handler", new NioWebSocketHandler());
+                }
+            });
+            Channel channel = bootstrap.bind(this.port).sync().channel();
+            log.info("webSocket服务器启动成功 port = " + port);
+            channel.closeFuture().sync();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            log.info("运行出错:" + e);
+        } finally {
+            boss.shutdownGracefully();
+            work.shutdownGracefully();
+            log.info("websocket服务器已关闭");
+        }
+    }
+}