基于Netty的UDPServer端和Client端解决正向隔离网闸数据透传问题

发布于:2025-06-14 ⋅ 阅读:(21) ⋅ 点赞:(0)

背景

因为安装了正向隔离网闸,导致数据传输的时候仅支持TCP协议和UDP协议,因此需要开发UDP Client和Server服务来将数据透传,当前环境是获取的数据并将数据转发到kafka

PS: TCP 协议也能解决该问题,但是TCP可能会出现粘包或者是半包问题

 1.UDP Server端

 server端启动类

package com.huanyu.forward.udp.server;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;

@Slf4j
@Service("udpServer")
@ConditionalOnExpression("#{'${spring.udp-server.port:}'.length()>0}")
public class UdpNettyServer {
    @Value("${spring.udp-server.port:33333}")
    private Integer port = 33333;

    public static void main(String[] args) throws Exception {
        new UdpNettyServer().udpServer(33333);
    }

    @PostConstruct()
    public void initUdpServer() {
        try {
            log.info("start udp server......");
            udpServer(port);
        } catch (Exception e) {
            log.error("tcp server start failed");
        }
    }

    public void udpServer(int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .localAddress(new InetSocketAddress(port)).handler(new ChannelInitializer<DatagramChannel>() {
                    @Override
                    protected void initChannel(DatagramChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
//                            pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
                        pipeline.addLast(new UdpDataHandler());
                    }
                });

        //绑定监听端口,调用sync同步阻塞方法等待绑定操作完
        ChannelFuture future = b.bind().sync();
        if (future.isSuccess()) {
            log.info("udp server is listening on  :{}", port);
        } else {
            log.error("udp server is failed ", future.cause());
            //关闭线程组
            group.shutdownGracefully();
        }
        //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
//            future.channel().closeFuture().sync();
    }
}

 Server数据处理类

package com.huanyu.forward.udp.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;


@Slf4j
@Service
public class UdpDataHandler extends ChannelInboundHandlerAdapter {

    public static final String BIZ_FLAG = "bizFlag";
    public static final String FLAG_PRE = "@@{";
    public static final String FLAG_SUF = "}##";
    public static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);
    public static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);

//    @Resource
//    private KafkaTemplate<String, Object> template;


    //接受client发送的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            DatagramPacket p = (DatagramPacket) msg;
            ByteBuf in = p.content();
            byte[] flagBytes = getBytes(in);
            if (flagBytes == null) {
                return;
            }
            in.readBytes(flagBytes); // 读取标志位
            // 保留标志位的对象结构-以@@{开头以}##结尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之间的数据为补充的对象参数JSON,$为换行符号
            String topicData = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);

            byte[] msgByte = new byte[in.readableBytes()];
            in.readBytes(msgByte);
//        template.send("haha.haha.ha", gbk.getBytes());
            log.info("bizFag:{},data: {}", topicData, new String(msgByte));
        } catch (Exception e) {
            log.error("udp handler 异常: ", e);
        }
    }

    private byte[] getBytes(ByteBuf in) {
        if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {
            log.warn("数据长度不够");
            text(in);
            return null;
        }
        int prefixIndex = in.readerIndex();
        if (!startsWith(in)) {
            text(in);
            // 忽略非标志位开头的数据
            in.skipBytes(in.readableBytes());
            log.warn("数据不包含指定的前缀");
            return null;
        }

        int suffixIndex = indexOf(in);
        if (suffixIndex == -1) {
            log.warn("数据不包含指定的某字符");
            text(in);
            return null;
        }
        int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;
        return new byte[flagLength];
    }

    //通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    //读操作时捕获到异常时调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    //客户端去和服务端连接成功时触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello udp client [你好,客户端]".getBytes()));
        log.info("udp client 连接成功: {}", ctx.channel());
    }

    private boolean startsWith(ByteBuf buf) {
        for (int i = 0; i < FLAG_PREFIX.length; i++) {
            if (buf.getByte(buf.readerIndex() + i) != FLAG_PREFIX[i]) {
                return false;
            }
        }
        return true;
    }

    private int indexOf(ByteBuf buf) {
        int readerIndex = buf.readerIndex();
        int readableBytes = buf.readableBytes();
        for (int i = 0; i <= readableBytes - FLAG_SUFFIX.length; i++) {
            boolean match = true;
            for (int j = 0; j < FLAG_SUFFIX.length; j++) {
                if (buf.getByte(readerIndex + i + j) != FLAG_SUFFIX[j]) {
                    match = false;
                    break;
                }
            }
            if (match) {
                return readerIndex + i;
            }
        }
        return -1;
    }

    private void text(ByteBuf in) {
        byte[] msgByte = new byte[in.readableBytes()];
        in.readBytes(msgByte);
        log.warn("数据:{}", new String(msgByte, StandardCharsets.UTF_8));
    }
}

 2.UDP Client端

 Client端启动类

package com.aimsphm.forward.udp.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.stream.IntStream;

@Getter
@Slf4j
public class UdpNettyClient {

    private final InetSocketAddress net;
    private Channel channel;

    public UdpNettyClient(String host, int port) {
        net = new InetSocketAddress(host, port);
        udpClient();
    }

    public void udpClient() {
        try {
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioDatagramChannel.class).handler(new ChannelInitializer<DatagramChannel>() {
                @Override
                protected void initChannel(DatagramChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new UdpClientHandler());
                }
            });
            ChannelFuture future = b.bind(0).sync();
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture arg0) throws Exception {
                    if (future.isSuccess()) {
                        log.info("连接UDP服务器成功:");

                    } else {
                        log.warn("连接UDP服务器失败:");
                        System.out.println("连接服务器失败");
                        group.shutdownGracefully(); //关闭线程组
                    }
                }
            });
            this.channel = future.channel(); // 绑定一个随机端口进行发送
        } catch (InterruptedException e) {
            log.error("UDP服务端启动异常:", e);
        }
    }

    public static void main(String[] args) throws Exception {
        UdpNettyClient client = new UdpNettyClient("localhost", 33333);
        Channel ch = client.getChannel(); // 修改为你的服务器地址和端口
        // 发送数据包
        IntStream.range(0, 2).parallel().forEach(i -> {
            try {
                ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("@@{\"k\":\"v" + i + "\"}##{\"k\":\"v\"}", CharsetUtil.UTF_8), client.getNet())).sync();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

 Client数据处理类

package com.huanyu.forward.udp.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.Map;


public class UdpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {

    //处理服务端返回的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {
        ByteBuf msg = data.get("topic");
        byte[] msgByte = new byte[msg.readableBytes()];
        msg.readBytes(msgByte);
        System.out.println("接受到server响应数据: " + new String(msgByte));
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

分别启动服务端和客户端,进行数据调试吧,也可以下载TCP/UDP数据调试工具,进行调试

 


网站公告

今日签到

点亮在社区的每一天
去签到