网络编程:使用UDP实现数据帧的接收

发布于:2024-07-02 ⋅ 阅读:(11) ⋅ 点赞:(0)

目录

1、需求

2、逻辑处理

3、代码实现

4、总结


1、需求

        使用java代码实现数据帧的接收需求,完成数据到数据库的存储。

2、逻辑处理

        由于udp传输不保证数据的有序性、可靠性,所以在做业务开发的时候,要程序员自己考虑需求完善udp的缺陷。因此我们定义几个字段保证数据的有序,对于可靠性未进行考虑。

3、代码实现

        监听端口55000,等待数据的发送:

@Service
public class UDPReceiverSuper {
    private static final int BUFFER_SIZE = 1044;
    private static final int HEAD_SIZE = 20;
    private static final int DATA_SIZE = 1024;
    private static final int MAX_BUFFER_SIZE = 1 * 1024 * 1024; // 缓冲器大小设置为1MB
    private static final double MAX_BUFFER_THRESHOLD = 0.8; // 缓冲区阈值
    private static final int MAX_BUFFER_INDEX = (int) (MAX_BUFFER_SIZE * MAX_BUFFER_THRESHOLD / DATA_SIZE); //缓冲区元素数量阈值

    //timestampToBufferMap存储的是:时间戳,TreeMap,TreeMap里面存储的是:当前包序号,接受数据的对象
    private Map<Long, ConcurrentHashMap<Long, DatagramPacket>> timestampToBufferMap = new HashMap();
    private long timeStamp;
    private boolean isClosed = false;// 使用阻塞队列作为缓冲区
    private long errorPackageSum = 0;
    private int frameNum;        //用于帧计数

    Thread udpReceiverThread;

    @Value("${GK.GKOriginalDataFilePath}")
    private String GKOriginalDataFilePath; // 管控原始数据文件存储路径

    @Value("${HP.storagePath}")
    private String storagePath;    //高性能数据接收路径
    @Autowired
    private INetworkConfigService networkConfigService;
    @Autowired
    private DealGkDataServiceSuperWithNewThread dealGkDataServiceSuperWithNewThread;
    @Autowired
    private DealGkDataServiceSuperWithThreadPoolAndBuffer dealGkDataServiceSuperWithThreadPoolAndBuffer;
    @Autowired
    private DealGkDataServiceSuperWithThreadPool dealGkDataServiceSuperWithThreadPool;
    @Autowired
    private SaveGKOriginalDataService saveGKOriginalDataService;
    @Autowired
    private SaveGKOriginalDataServiceWithBuffer saveGKOriginalDataServiceWithBuffer;



    public UDPReceiverSuper() {
    }

    public void start() {
        //创建父文件夹

        Path path = Paths.get(storagePath);
        if (Files.notExists(path)) {
            try {
                Files.createDirectories(path);
                System.out.println("Directories created successfully: " + storagePath);
            } catch (IOException e) {
                System.err.println("Failed to create directories: " + e.getMessage());
            }
        } else {
            System.out.println("Directories already exist: " + storagePath);
        }

        // 启动接收数据的线程
        if (udpReceiverThread == null) {
            udpReceiverThread = new Thread(new Receiver());
            udpReceiverThread.start();
        }
    }

    //数据帧头定义
    private class PackageHeader {
        public long id = 0;
        public long timestamp = 0;
        public long totalPackageNum = 0;
        public long currentPackageNum = 0;
        public long dataLength = 0;
    }


    // 接收数据的线程
    private class Receiver implements Runnable {
        @Override
        public void run() {
            NetworkConfig networkConfig = networkConfigService.selectNetworkConfigById(1L);
            String port = networkConfig.getPort();
            String ip = networkConfig.getIp();
            System.out.println("实际未绑定ip");
            System.out.println("ip: " + ip + "  port: " + port);
            try {
                DatagramSocket ds = new DatagramSocket(Integer.parseInt(port));
                if (ds != null) {
                    isClosed = false;
                }
                System.out.println("udpReceiver_ds: " + ds + "   等待接收数据......");


                while (true) {
                    if (isClosed) {
                        break;
                    }
                    byte[] receiveData = new byte[BUFFER_SIZE];   //接收数据缓存区,大小为1044
                    DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
                    ds.receive(receivePacket);     //接收数据
                    byte[] data1 = receivePacket.getData();
                    frameNum++;
//                    System.out.println("当前帧数为: " + frameNum);   //todo 用于打印输出当前接收到的帧数

                    ByteBuffer byteBuffer1 = ByteBuffer.allocate(data1.length);
                    byteBuffer1.put(data1);
                    byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0
//                    ByteBuffer byteBuffer1 = ByteBuffer.allocate(receiveData.length);
//                    byteBuffer1.put(receiveData);
//                    byteBuffer1.flip();   //flip操作是将:写模式切换到读模式,将‘limit’设置为当前的‘position’,将‘position’重置为0
                    /*两种情况:1、接收管控  2、接收高性能*/
                    byteBuffer1.order(ByteOrder.LITTLE_ENDIAN);  //转化为小端
                    int headerType = byteBuffer1.getInt();       //得到设备标识符
                    if (headerType == 1) {
                        /*解决方法一: 这个是采用多线程的方式进行写入数据到txt文件*/
                        saveGKOriginalDataService.saveGKOriginalData(receivePacket, GKOriginalDataFilePath);
                        /*解决方法二:直接处理管控的函数*/
//                        dealGkDataServiceSuperWithNewThread.dealGKApi(byteBuffer1);
                        /*解决方法三:在UDPReceiverSuperSuper类里面,并且要在NetworkConfigController中进行函数*/
                        /*解决方法四:使用线程池的方式解决,每接收一帧,就开始处理*/
//                        dealGkDataServiceSuperWithThreadPoolAndBuffer.dealGKApi(byteBuffer1);
                        /*解决方法五:直接开启线程进行处理数据,这个方法是对的*/
                        dealGkDataServiceSuperWithThreadPool.dealGKApi(byteBuffer1);
                        /*解决方法六:将接收到的数据存储到缓冲区中,然后使用多线程从缓冲区中取出,方法实现写在method3中*/
                    } 

        业务处理逻辑:

package com.ruoyi.system.service.customService.dealGKService_ThreadPool;

import com.ruoyi.system.domain.*;
import com.ruoyi.system.mapper.*;
import com.ruoyi.system.utlis.CSVFileUtil;
import com.ruoyi.system.utlis.ConvertUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.LongAdder;

/**
 * @Author 不要有情绪的  ljy
 * @Date 2024/3/22 15:54
 * @Description:
 */
@Service
public class DealGkDataServiceSuperWithThreadPool {


    @Value("${GK.statusLogPath}")
    private String stateLogFilePath;  //状态日志存储路径


    @Autowired
    private OperateLogInfoMapper operateLogInfoMapper;
    @Autowired
    private InstructLogInfoMapper instructLogInfoMapper;
    @Autowired
    private Instruct1553bLogInfoMapper instruct1553bLogInfoMapper;
    @Autowired
    private InstructRs422LogInfoMapper instructRs422LogInfoMapper;
    @Autowired
    private StateLogInfoMapper stateLogInfoMapper;
    @Autowired
    private ErrorLogInfoMapper errorLogInfoMapper;


    int frontTimeFlag = -1;

    private int currentReceivedFrameNum = 0;  //用于计算管控接收帧数

    private Map<Integer, BlockingQueue<byte[]>> currTimeFlagToQueue = new HashMap<>();
    int threadNum = 1;
    private ExecutorService threadPool;

    private Counter counter = new Counter();




    public DealGkDataServiceSuperWithThreadPool() {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        System.out.println("核心线程数:   " + corePoolSize);
        int maximumPoolSize = corePoolSize * 2;
        long keepAliveTime = 60L;
        threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()
        );
    }

    public void dealGKApi(ByteBuffer byteBuffer) {
        currentReceivedFrameNum++;
        if (currentReceivedFrameNum % 1000 == 0) {
            System.out.println("管控当前接收的数据帧数(每隔1000打印一次): " + currentReceivedFrameNum);
        }


        int currTimeFlag = byteBuffer.getInt();       //当前时间标识,用于区分是否丢包
        int packagesTotalNum = byteBuffer.getInt();   //表示当前发送的包总数
        int currPackageNum = byteBuffer.getInt();     //表示当前包序号
        int messageLength = byteBuffer.getInt();     //消息长度
        int remainingBytes = byteBuffer.remaining();
        byte[] remainingData = new byte[messageLength];   //用于获取日志长度 1024
        if (remainingBytes > 0) {
            byteBuffer.get(remainingData); // 获取剩余的字节
            threadPool.submit(new GKRunnable(remainingData));
        }

    }


    class GKRunnable implements Runnable {
        private byte[] bytes;

        public GKRunnable(byte[] remainingData) {
            this.bytes = remainingData;
        }

        @Override
        public void run() {
            System.out.println("新启动一个线程用于处理管控日志,当前线程名:  " + Thread.currentThread().getName());
            ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);   //将不用byte数组放到ByteBuffer中
            byteBuffer.put(bytes);
            dealWithAssembledFrame(byteBuffer);
        }

        private void dealWithAssembledFrame(ByteBuffer byteBuffer) {
            byteBuffer.flip();
            byteBuffer.order(ByteOrder.LITTLE_ENDIAN);  //转化为小端
            dealLogPart(byteBuffer);
        }

        /*
           todo 要写的内容为:出现错误就要将异常抛出,并且找到帧头的位置
         */
        private void dealLogPart(ByteBuffer byteBuffer) {
//            /*找到帧头的位置*/
//            searchFrameHeaderAndSetPosition(byteBuffer);

            while (byteBuffer.position() != byteBuffer.capacity() && (byteBuffer.position() + 4) < byteBuffer.capacity()) {
                try {
                    /*找到帧头的位置*/
                    searchFrameHeaderAndSetPosition(byteBuffer);
                    int startPosition = byteBuffer.position();//获取开始的长度
                    //每个日志都包含的部分
                    byte[] bytes2 = new byte[2];
                    byteBuffer.get(bytes2);
                    String frameHeaderInfo = ConvertUtil.byteArrayToHexString(bytes2);   //日志帧头字段
                    short logLength = byteBuffer.getShort();                             //日志长度
                    int logNumber = byteBuffer.getShort();                             //日志编号
//                    System.out.println(logNumber + logLength);
                    byte logType = byteBuffer.get();//byte转化为字符串                     //日志类型
                    String logTypeStr = String.format("%02X", logType);
                    int time = byteBuffer.getInt();                //日志时间
                    //根据日志类型,选择处理日志剩余方式
                    if ("01".equals(logTypeStr) || "02".equals(logTypeStr) || "03".equals(logTypeStr)) {
                        byte sendingAndReceivingBit = byteBuffer.get();
                        byte sourceDeviceId = byteBuffer.get();
                        byte targetDeviceId = byteBuffer.get();
                        //得到日志内容长度    先将日志长度转化为十进制,然后减掉帧头信息,减掉日志长度,减掉日志编号,减掉日志类型、减掉时间,减掉校验码,减掉发送接收位,减掉源原设备ID,减掉目标设备ID
                        int logContentLength = logLength - 2 - 2 - 2 - 1 - 4 - 1 - 1 - 1 - 1;
                        String instructDataContent = null;
                        if ("01".equals(logTypeStr)) {  //子地址+数据内容,子地址占1个字节
                            byte[] bytes = new byte[1];
                            byteBuffer.get(bytes);
                            String subAddress = ConvertUtil.byteArrayToHexString(bytes);
                            int dataContentLength = logContentLength - 1;
                            byte[] bytes1 = new byte[dataContentLength];// 输出剩余字节的十六进制表示,即指令数据内容
                            byteBuffer.get(bytes1);
                            instructDataContent = ConvertUtil.byteArrayToHexString(bytes1);
                            dealInstruct1553bLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, subAddress, instructDataContent);
                        } else if ("02".equals(logTypeStr)) { //can指令类型(1个字节)  + ID(子地址)+数据内容,ID占4个字节
                            byte[] bytes = new byte[1];
                            byteBuffer.get(bytes);
                            String canInstructType = ConvertUtil.byteArrayToHexString(bytes);
                            byte[] bytes1 = new byte[4];
                            byteBuffer.get(bytes1);
                            String subAddress = ConvertUtil.byteArrayToHexString(bytes1);
                            int dataContentLength = logContentLength - 1 - 4;
                            byte[] ID = new byte[dataContentLength];// 输出剩余字节的十六进制表示,即指令数据内容
                            byteBuffer.get(ID);
                            instructDataContent = ConvertUtil.byteArrayToHexString(ID);
                            dealInstructCANLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, canInstructType, subAddress, instructDataContent);
                        } else if ("03".equals(logTypeStr)) { //数据内容
                            byte[] bytes1 = new byte[logContentLength];
                            byteBuffer.get(bytes1);
                            instructDataContent = ConvertUtil.byteArrayToHexString(bytes1);
                            dealInstructRs422Log(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time, sendingAndReceivingBit, sourceDeviceId, targetDeviceId, instructDataContent);
                        }
                    } else if ("04".equals(logTypeStr)) {
                        //存储到excel表中
//                        dealStateLog(byteBuffer, startPosition, time);
                        //存储到数据库中
                        dealStateLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);
                    } else if ("05".equals(logTypeStr)) {
                        dealOperateLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);
                    } else if ("06".equals(logTypeStr)) {
                        dealErrorLog(byteBuffer, startPosition, logLength, logNumber & 0xFFFF, time);
                    } else {
                        System.out.println("as;dasd");
                    }
                } catch (Exception e) {
                    if (e.getCause().toString().contains("SQLIntegrityConstraintViolationException")) {
                        counter.increment();
                    }else {
                        e.printStackTrace();

//                        System.err.println(e.getMessage());
                    }
                    //在处理过程报错了,那么就认为日志内容错误,就要重新寻找帧头的位置
                    searchFrameHeaderAndSetPosition(byteBuffer);
                }
            }

            System.out.println("线程名称:" + Thread.currentThread().getName() + "  线程id:" + Thread.currentThread().getId() + "  处理完成!");

            System.out.println("========"+counter.getCount());
        }

        private void dealStateLog(ByteBuffer byteBuffer, int startPosition, short logLength, int logNumber, int time) {
            byte[] bytes92 = new byte[92];  //将92个长度传递到byte数组中
            byteBuffer.get(bytes92);
            int endPosition = byteBuffer.position();
            byte checkCodeByte = calculateCheckCode(byteBuffer, startPosition, endPosition);
            byte checkCode = byteBuffer.get();  //用于校验日志的正确和完整性
            boolean logIsIntegrity = logIsIntegrity(checkCode, checkCodeByte);
            if (logIsIntegrity) {
                StateLogInfo stateLogInfo = new StateLogInfo();
                stateLogInfo.setLogLength(Short.toString(logLength));
                stateLogInfo.setLogNumber(Long.valueOf(logNumber));
                stateLogInfo.setTime(Long.valueOf(time));
//                Integer size = stateLogInfoMapper.searchDataIsDuplicate(stateLogInfo);
//                if (size > 0) { //判断数据库中是否已经存在
//                    return;
//                }
                //将参数设置到stateLogInfo实例中
                setParameter(bytes92, stateLogInfo);
                stateLogInfoMapper.insertStateLogInfo(stateLogInfo);
            } else {
                logIsNotIntegrity(byteBuffer, startPosition, endPosition);
            }
        }


        private void searchFrameHeaderAndSetPosition(ByteBuffer byteBuffer) {
            /*找到帧头的位置*/
            int frameHeaderPosition = findFrameHeaderPosition(byteBuffer, hexStringToByteArray("eafc"));
            if (frameHeaderPosition != -1) {
                byteBuffer.position(frameHeaderPosition);
            } else {
                System.out.println("未找到帧头为 eafc 的位置");
                return;   //说明从头查到尾都没有查到,就直接退出
            }
        }


        /**
         * 判断日志是否完整的函数,如果不完整,那么就要找下一个eafc帧头的位置
         *
         * @param byteBuffer
         */
        private void logIsNotIntegrity(ByteBuffer byteBuffer, int startPosition, int endPosition) {
            System.out.println("日志不完整!丢掉");
            //如果日志不完整,那么就要找到下一帧的头位置
            /*找到帧头的位置*/
            searchFrameHeaderAndSetPosition(byteBuffer);
        }

      


        /**
         * 判断是否为完整日志的函数isIntegrity(日志长度 + 帧头 - 校验码)
         *
         * @param a
         * @param b
         * @return
         */
        private boolean logIsIntegrity(byte a, byte b) {
            return a == b;
        }


        /**
         * 计算校验码的函数
         *
         * @param byteBuffer
         * @param startPosition
         * @param endPosition
         * @return
         */
        private byte calculateCheckCode(ByteBuffer byteBuffer, int startPosition, int endPosition) {
            int length = endPosition - startPosition;
            byteBuffer.position(startPosition);
            byte res = 0;
            for (int i = 0; i < length; i++) {
                byte b = byteBuffer.get();
                res += b;
            }
            return res;
        }

        /**
         * 通过遍历的方式得到帧头的位置
         *
         * @param byteBuffer
         * @param frameHeader
         * @return
         */
        public int findFrameHeaderPosition(ByteBuffer byteBuffer, byte[] frameHeader) {
            // 记录当前位置
            int startPosition = byteBuffer.position();
            // 遍历 ByteBuffer 从当前位置开始搜索帧头
            for (int i = startPosition; i < byteBuffer.limit() - frameHeader.length + 1; i++) {
                // 标记当前位置
                byteBuffer.position(i);
                boolean found = true;
                for (int j = 0; j < frameHeader.length; j++) {
                    if (byteBuffer.get() != frameHeader[j]) {
                        found = false;
                        break;
                    }
                }
                if (found) {
                    // 恢复 ByteBuffer 的当前位置
//                byteBuffer.position(startPosition);
                    return i; // 返回帧头 'e' 的位置
                }
            }
            // 恢复 ByteBuffer 的当前位置
//        byteBuffer.position(startPosition);
            // 如果没有找到,返回 -1 表示未找到
            return -1;
        }


        /**
         * 将十六进制字符串转换为字节数组的方法
         *
         * @param hexString
         * @return
         */
        public byte[] hexStringToByteArray(String hexString) {
            int len = hexString.length();
            byte[] byteArray = new byte[len / 2]; // 每两个十六进制字符表示一个字节
            for (int i = 0; i < len; i += 2) {
                byteArray[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
                        + Character.digit(hexString.charAt(i + 1), 16));
            }
            return byteArray;
        }

    }

}

class Counter {
    private LongAdder count = new LongAdder();

    public void increment() {
        count.increment();
    }

    public int getCount() {
        return count.intValue();
    }
}

解释:以上代码是采用多线程,将接收到的数据帧解析为日志,并将日志存储到数据库中,根据日志类型不同,存储到不同的数据库表中。

4、总结

        采用DatagramPacket实现数据帧接收准备,将接收到的每一帧数据解析为日志,每一帧都交给一个线程去处理,为节省线程频繁创建和销毁的资源,采用多线程。

学习之所以会想睡觉,是因为那是梦开始的地方。
ଘ(੭ˊᵕˋ)੭ (开心) ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)ଘ(੭ˊᵕˋ)੭ (开心)
                                                                                                        ------不写代码不会凸的小刘