多线程向设备发送数据

发布于:2025-07-30 ⋅ 阅读:(29) ⋅ 点赞:(0)

需求:做一个部门授权,可以把所选择部门下面的所有人的人脸信息传到设备组里(多个设备),问题在于图片是通过Base64处理之后的,会导致文件名非常长,如果一次性传很多数据就会超过设备的最长请求长度,如果不用Base64处理的话让设备自己去minio下载就会导致特别慢,设备容易掉线,所以就用多线程发送。先看一下全部的代码,再看一下多线程的方法。

全部代码:

@Override
    public List<Long> createDeptAuthorize(DeptAuthorizeSaveReqVO createReqVO) {

        List<Long> ids = new ArrayList<>();
        List<Long> userIds = new ArrayList<>();
        //遍历部门表,查看该部门是否已有授权
        List<Long> deptAuthorizeExits = new ArrayList<>();
        List<Long> deptAuthorizeNotExits = new ArrayList<>();
        //设备数组
        List<BaseDeviceDo> baseDeviceDos = new ArrayList<>();
        //把待更新授权的人员id存在这个数组里
        List<Long> userUpdateIds = new ArrayList<>();
        //把待更新授权的人员存在这个数组里
        List<AdminUserRespDTO> userUpdateList = new ArrayList<>();
        // 设置用户信息
        List<User> userList = new ArrayList<>();
        for (Long l : createReqVO.getDeptId()) {
            QueryWrapper<DeptAuthorizeDO> queryWrapperDept = new QueryWrapper<>();
            queryWrapperDept.eq("dept_id", l);
            queryWrapperDept.eq("deleted", 0);
            DeptAuthorizeDO deptAuthorizeDO1 = deptAuthorizeMapper.selectOne(queryWrapperDept);
            if (deptAuthorizeDO1 != null) {
                //表里存在这条数据就代表已有部门授权,所以要修改部门授权
                deptAuthorizeExits.add(deptAuthorizeDO1.getDeptId());
            }else{
                deptAuthorizeNotExits.add(l);
            }
        }
        //先遍历不存在的部门授权,即新增授权并发到设备上
        for (Long l : deptAuthorizeNotExits) {
                // 插入
                DeptAuthorizeDO deptAuthorize = new DeptAuthorizeDO();
                deptAuthorize.setDeptId(l);
                deptAuthorize.setAuthorizeWay(createReqVO.getAuthorizeWay());
                deptAuthorize.setDoorId(createReqVO.getDoorId());
                deptAuthorize.setDoorGroupId(createReqVO.getDoorGroupId());
                deptAuthorize.setEffectMethod(createReqVO.getEffectMethod());
                deptAuthorize.setInPeriodId(createReqVO.getInPeriodId());
                deptAuthorize.setOutPeriodId(createReqVO.getOutPeriodId());
                deptAuthorizeMapper.insert(deptAuthorize);
                // 假设deptAuthorizeMapper.insert()返回插入记录的ID
                ids.add(deptAuthorize.getId());


                //查找这个部门下面有多少人
                List<AdminUserRespDTO> adminUserDOList = adminUserApi.getUserListByDeptId(l);
                for (AdminUserRespDTO adminUserDO : adminUserDOList) {
                    //遍历人员数组看看是否在授权生效表里
                    QueryWrapper queryWrapper = new QueryWrapper();
                    queryWrapper.eq("user_id", adminUserDO.getId());
                    queryWrapper.eq("deleted", 0);
                    AuthorizeEffectDO authorizeEffectDO = authorizeEffectMapper.selectOne(queryWrapper);
                    if (authorizeEffectDO == null) {
                        //如果不存在就插入这条数据
                        AuthorizeEffectDO authorizeEffectDO1 = new AuthorizeEffectDO();
                        authorizeEffectDO1.setUserId(adminUserDO.getId());
                        //0代表部门授权
                        authorizeEffectDO1.setAuthorizeEffectType(0);
                        //0代表非临时授权
                        authorizeEffectDO1.setIsTemporary(0);
                        authorizeEffectMapper.insert(authorizeEffectDO1);
                        userIds.add(adminUserDO.getId());

                        //插入授权日志(生效表里没有这条数据则代表这人没有授权所以肯定会授权成功)
                        AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                        //授权id
                        authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                        //授权类型(0:部门 1:特殊 2:个人 3:临时)
                        authorizeLogDO.setAuthorizeType(0);
                        //用户id
                        authorizeLogDO.setUserId(adminUserDO.getId());
                        //备注
                        authorizeLogDO.setRemark("创建部门授权成功");
                        //是否成功(0:成功 1:失败)
                        authorizeLogDO.setIsSuccess(0);
                        //插入
                        authorizeLogMapper.insert(authorizeLogDO);
                    } else {
                        //如果不存在这条数据则把生效设置为1
                        //临时>人>特殊>部门
                        if (authorizeEffectDO.getAuthorizeEffectType() != 1 && authorizeEffectDO.getAuthorizeEffectType() != 2) {
                            //0代表部门授权
                            authorizeEffectDO.setAuthorizeEffectType(0);
                            authorizeEffectMapper.updateById(authorizeEffectDO);
                            userIds.add(adminUserDO.getId());

                            //插入授权日志(生效表里没有这条数据则代表这人没有授权所以肯定会授权成功)
                            AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                            //授权id
                            authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                            //授权类型(0:部门 1:特殊 2:个人 3:临时)
                            authorizeLogDO.setAuthorizeType(0);
                            //用户id
                            authorizeLogDO.setUserId(adminUserDO.getId());
                            //备注
                            authorizeLogDO.setRemark("创建部门授权成功");
                            //是否成功(0:成功 1:失败)
                            authorizeLogDO.setIsSuccess(0);
                            //插入
                            authorizeLogMapper.insert(authorizeLogDO);
                        } else {
                            //记录在授权日志里
                            //插入授权日志(失败)
                            AuthorizeLogDO authorizeLogDO = new AuthorizeLogDO();
                            //授权id
                            authorizeLogDO.setAuthorizeId(deptAuthorize.getId());
                            //授权类型(0:部门 1:特殊 2:个人 3:临时)
                            authorizeLogDO.setAuthorizeType(0);
                            //用户id
                            authorizeLogDO.setUserId(adminUserDO.getId());
                            //备注
                            authorizeLogDO.setRemark("创建部门授权失败,已有更高授权");
                            //是否成功(0:成功 1:失败)
                            authorizeLogDO.setIsSuccess(1);
                            //插入
                            authorizeLogMapper.insert(authorizeLogDO);
                        }
                    }

                }
                List<AdminUserRespDTO> adminUserDOS =adminUserApi.getUserListByDeptId(l);


                for (AdminUserRespDTO adminUserDO : adminUserDOS) {
                    //更新这个部门下面人的权限
                    //用户列表
                    User user = new User();
                    AdminUserRespDTO user1 = adminUserApi.getUser(adminUserDO.getId());
                    user.setI(user1.getUserSn());
                    user.setN(user1.getNickname());
                    user.setU(user1.getUserSn());
                    user.setC("");
                    user.setB("");
                    user.setW(user1.getPassword());
                    user.setD(user1.getDeptName());
                    //出门规则
                    if (createReqVO.getAuthorizeWay()==0) {
                        //门授权,判断这个门下的设备是进门还是出门
                        List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
                        for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                            if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                //存在并且为进门(0进门 1出门)
//                                Long inPeriodId = accessPeriodMapper.selectById(createReqVO.getInPeriodId()).getParentRuleId();
                                user.setM(createReqVO.getInPeriodId().toString());
                            }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                //存在并且为出门(0进门 1出门)
//                                AccessPeriodDO accessPeriodDO = accessPeriodMapper.selectById(createReqVO.getOutPeriodId());
////                                Long outPeriodId = accessPeriodMapper.selectById(createReqVO.getOutPeriodId()).getParentRuleId();
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }
                        }

                    }else if (createReqVO.getAuthorizeWay() == 1){
                        //获取这个门组下面的门
                        QueryWrapper queryWrapper = new QueryWrapper();
                        queryWrapper.eq("group_id",createReqVO.getDoorGroupId());
                        queryWrapper.eq("deleted",0);
                        List<GroupDoorRelateDO> doorRelateDOS = doorRelateMapper.selectList(queryWrapper);
                        for (GroupDoorRelateDO doorRelateDO : doorRelateDOS) {
                            List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(doorRelateDO.getDoorId());
                            for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                                if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                    //存在并且为进门(0进门 1出门)
                                    user.setM(createReqVO.getOutPeriodId().toString());
                                }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                    //存在并且为出门(0进门 1出门)
                                    user.setM(createReqVO.getOutPeriodId().toString());
                                }
                            }
                        }

                    }

                    //照片
                    FaceImageRespDTO faceImageDO = faceImageApi.getFaceImage(user1.getId());
                    if (faceImageDO != null) {
                        if (faceImageDO.getImage() != null && !faceImageDO.getImage().isEmpty()) {
                            // minio路径转换为文件路径
                            String objectName = faceImageDO.getImage().split("/")[faceImageDO.getImage().split("/").length - 1];
                            // 从 MinIO 下载文件并转换为字节数组
                            byte[] fileBytes = minioToBase64.downloadFileFromMinio(objectName);
                            // 转换为 Base64 字符串
                            String base6String4 = minioToBase64.convertToBase64(fileBytes);
                            user.setF(base6String4);

                        }
                    }
                    //放到user列表里传给设备
                    userList.add(user);

                }

        }
        //遍历已有部门授权列表,即修改现有的部门授权
        for (Long deptAuthorizeExit : deptAuthorizeExits) {
            QueryWrapper queryWrapperDeptAuthorize = new QueryWrapper();
            queryWrapperDeptAuthorize.eq("dept_id",deptAuthorizeExit);
            queryWrapperDeptAuthorize.eq("deleted",0);
            //修改部门授权
            DeptAuthorizeDO deptAuthorizeDO = deptAuthorizeMapper.selectOne(queryWrapperDeptAuthorize);
            //更新部门授权列表的基础性徐
            deptAuthorizeDO.setAuthorizeWay(createReqVO.getAuthorizeWay());
            deptAuthorizeDO.setDoorId(createReqVO.getDoorId());
            deptAuthorizeDO.setDoorGroupId(createReqVO.getDoorGroupId());
            deptAuthorizeDO.setEffectMethod(createReqVO.getEffectMethod());
            deptAuthorizeDO.setInPeriodId(createReqVO.getInPeriodId());
            deptAuthorizeDO.setOutPeriodId(createReqVO.getOutPeriodId());
            deptAuthorizeDO.setUpdateTime(LocalDateTime.now());
            deptAuthorizeMapper.updateById(deptAuthorizeDO);

            //查找这个部门下面有多少人
            List<AdminUserRespDTO> adminUserDOList = adminUserApi.getUserListByDeptId(deptAuthorizeExit);
            for (AdminUserRespDTO adminUserDO : adminUserDOList) {
                //遍历人员数组看看是否在授权生效表里有高于部门授权的授权,如果有则不授权
                QueryWrapper queryWrapper = new QueryWrapper();
                queryWrapper.eq("user_id", adminUserDO.getId());
                //目前生效类型为部门授权
                queryWrapper.eq("authorize_effect_type",0);
                queryWrapper.eq("deleted",0);
                AuthorizeEffectDO authorizeEffectDO = authorizeEffectMapper.selectOne(queryWrapper);
                if (authorizeEffectDO != null) {
                    //把符合规定的人插入到待更新的userId数组中
                    userUpdateIds.add(authorizeEffectDO.getUserId());
                    userUpdateList.add(adminUserApi.getUser(authorizeEffectDO.getUserId()));
                }

            }
            for (AdminUserRespDTO adminUserRespDTO : userUpdateList) {
                //更新这个部门下面人的权限
                //用户列表
                User user = new User();
                AdminUserRespDTO user1 = adminUserApi.getUser(adminUserRespDTO.getId());
                user.setI(user1.getUserSn());
                user.setN(user1.getNickname());
                user.setU(user1.getUserSn());
                user.setC("");
                user.setB("");
                user.setW(user1.getPassword());
                user.setD(user1.getDeptName());
                //出门规则
                if (createReqVO.getAuthorizeWay()==0) {
                    //门授权,判断这个门下的设备是进门还是出门
                    List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
                    for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                        if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                            //存在并且为进门(0进门 1出门)
//                                Long inPeriodId = accessPeriodMapper.selectById(createReqVO.getInPeriodId()).getParentRuleId();
                            user.setM(createReqVO.getInPeriodId().toString());
                        }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                            //存在并且为出门(0进门 1出门)
//                                AccessPeriodDO accessPeriodDO = accessPeriodMapper.selectById(createReqVO.getOutPeriodId());
////                                Long outPeriodId = accessPeriodMapper.selectById(createReqVO.getOutPeriodId()).getParentRuleId();
                            user.setM(createReqVO.getOutPeriodId().toString());
                        }
                    }

                }else if (createReqVO.getAuthorizeWay() == 1){
                    //获取这个门组下面的门
                    QueryWrapper queryWrapper = new QueryWrapper();
                    queryWrapper.eq("group_id",createReqVO.getDoorGroupId());
                    queryWrapper.eq("deleted",0);
                    List<GroupDoorRelateDO> doorRelateDOS = doorRelateMapper.selectList(queryWrapper);
                    for (GroupDoorRelateDO doorRelateDO : doorRelateDOS) {
                        List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(doorRelateDO.getDoorId());
                        for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                            if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 0){
                                //存在并且为进门(0进门 1出门)
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }else if (doorDeviceRelateRespDTO != null && doorDeviceRelateRespDTO.getInOutDirection() == 1){
                                //存在并且为出门(0进门 1出门)
                                user.setM(createReqVO.getOutPeriodId().toString());
                            }
                        }
                    }

                }

                //照片
                FaceImageRespDTO faceImageDO = faceImageApi.getFaceImage(user1.getId());
                if (faceImageDO != null) {
                    if (faceImageDO.getImage() != null && !faceImageDO.getImage().isEmpty()) {
                        // minio路径转换为文件路径
                        String objectName = faceImageDO.getImage().split("/")[faceImageDO.getImage().split("/").length - 1];
                        // 从 MinIO 下载文件并转换为字节数组
                        byte[] fileBytes = minioToBase64.downloadFileFromMinio(objectName);
                        // 转换为 Base64 字符串
                        String base6String4 = minioToBase64.convertToBase64(fileBytes);
                        user.setF(base6String4);

                    }
                }
                //放到user列表里传给设备
                userList.add(user);
            }


        }
        //判断相关联的门找到要传输的设备
        if (createReqVO.getAuthorizeWay() == 0){
            //门授权
            //获得这个门下面的所有设备
            List<DoorDeviceRelateRespDTO> doorDeviceRelateList = doorDeviceRelateApi.getDoorDeviceRelateList(createReqVO.getDoorId());
            for (DoorDeviceRelateRespDTO doorDeviceRelateRespDTO : doorDeviceRelateList) {
                OnsiteEquipDO onsiteEquipDO = onsiteEquipMapper.selectById(doorDeviceRelateRespDTO.getDeviceId());
                //插入设备列表
                BaseDeviceDo baseDeviceDo = new BaseDeviceDo();
                baseDeviceDo.setDeviceType(onsiteEquipDO.getDeviceModel());
                baseDeviceDo.setIpAddr(onsiteEquipDO.getDeviceIp());
                baseDeviceDo.setPassword(onsiteEquipDO.getDevicePassword());
                baseDeviceDos.add(baseDeviceDo);
            }
        }else if (createReqVO.getAuthorizeWay() == 1){
            //门组授权
            //获得门组下的所有设备
            List<OnsiteEquipDO> onsiteEquipDOS = groupDoorRelateService.listByAccessGroupTableId(createReqVO.getDoorGroupId());

            onsiteEquipDOS.stream().forEach(onsiteEquipDO -> {
                BaseDeviceDo baseDeviceDo = new BaseDeviceDo();
                baseDeviceDo.setDeviceType(onsiteEquipDO.getDeviceModel());
                baseDeviceDo.setIpAddr(onsiteEquipDO.getDeviceIp());
                baseDeviceDo.setPassword(onsiteEquipDO.getDevicePassword());
                baseDeviceDos.add(baseDeviceDo);
            });
        }

        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 分批处理用户列表
        int batchSize = 5;
        int totalTasks = (userList.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < totalTasks; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, userList.size());
            List<User> usersBatch = userList.subList(start, end);
            executorService.submit(() -> {
                control.setDeviceUser(baseDeviceDos, usersBatch);
            });
        }
        // 关闭线程池
        executorService.shutdown();
        try {
            // 等待所有任务完成
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return ids;
    }

多线程方法:

// 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 分批处理用户列表
        int batchSize = 5;
        int totalTasks = (userList.size() + batchSize - 1) / batchSize;
        for (int i = 0; i < totalTasks; i++) {
            int start = i * batchSize;
            int end = Math.min(start + batchSize, userList.size());
            List<User> usersBatch = userList.subList(start, end);
            executorService.submit(() -> {
                control.setDeviceUser(baseDeviceDos, usersBatch);
            });
        }
        // 关闭线程池
        executorService.shutdown();
        try {
            // 等待所有任务完成
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

1. 创建线程池

ExecutorService executorService = Executors.newFixedThreadPool(3);

  • 使用 Executors.newFixedThreadPool(3) 创建了一个固定大小为 3 的线程池。

  • 线程池的作用是管理线程的生命周期,避免频繁创建和销毁线程带来的性能开销。

  • 在这个线程池中,最多可以同时运行 3 个线程。

每次运行三个线程可以解决掉请求头过长的问题。

2. 计算分批处理的批次数量

int batchSize = 5;
int totalTasks = (userList.size() + batchSize - 1) / batchSize;

  • batchSize 定义了每一批次处理的用户数量,这里设置为 5。

  • totalTasks 计算总共需要处理的批次数量。通过公式 (userList.size() + batchSize - 1) / batchSize,确保即使用户数量不能被 batchSize 整除,也能正确计算出需要的批次数量。

3. 分批处理用户列表

for (int i = 0; i < totalTasks; i++) {
    int start = i * batchSize;
    int end = Math.min(start + batchSize, userList.size());
    List<User> usersBatch = userList.subList(start, end);
    executorService.submit(() -> {
        control.setDeviceUser(baseDeviceDos, usersBatch);
    });
}

  • 循环逻辑

    • 外层循环 for (int i = 0; i < totalTasks; i++) 遍历所有批次。

  • 计算每一批次的范围

    • int start = i * batchSize; 计算当前批次的起始索引。

    • int end = Math.min(start + batchSize, userList.size()); 计算当前批次的结束索引,确保不会超出 userList 的范围。

  • 提取当前批次的用户子列表

    • List<User> usersBatch = userList.subList(start, end); 使用 subList 方法从 userList 中提取当前批次的用户子列表。

  • 提交任务到线程池

    • executorService.submit(() -> { control.setDeviceUser(baseDeviceDos, usersBatch); }); 将任务提交到线程池中执行。

    • 每个任务调用 control.setDeviceUser(baseDeviceDos, usersBatch) 方法,处理当前批次的用户。

4. 关闭线程池并等待所有任务完成

executorService.shutdown();
try {
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

  • 关闭线程池

    • executorService.shutdown(); 调用 shutdown() 方法,表示不再接受新的任务,但会等待已经提交的任务完成。

  • 等待所有任务完成

    • executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 调用 awaitTermination 方法,等待线程池中的所有任务完成。

    • 这里使用 Long.MAX_VALUETimeUnit.NANOSECONDS,表示等待时间非常长,几乎等同于无限等待。

  • 异常处理

    • 如果线程被中断,会抛出 InterruptedException,捕获并打印堆栈信息。

这样就可以实现多线程向设备发送数据了。

另:附上设备发送数据的方法,也是多线程,可以参考。

/**
     * 设置设备用户(正式员工、常驻)
     */
    public List<ReturnMessage> setDeviceUser(List<BaseDeviceDo> baseDeviceDos, List<User> list) {
        List<ReturnMessage> failureMessages = new ArrayList<>();

        ExecutorService executor = Executors.newFixedThreadPool(baseDeviceDos.size());
        List<Runnable> tasks = new ArrayList<>();


        for (BaseDeviceDo baseDeviceDo : baseDeviceDos) {
            tasks.add(() -> {
                ReturnMessage returnMessage = null;
                try {
                    switch (baseDeviceDo.getDeviceType()) {
                        case M7:
                            returnMessage = m7Control.setDeviceUser(baseDeviceDo, list);
                    }
                    if (returnMessage != null && !returnMessage.getCode().equals("0")) {
                        failureMessages.add(returnMessage);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    // 将异常信息封装到 ReturnMessage 中,或者创建一个新的 ReturnMessage 来表示失败
                    returnMessage.setCode("50100");
                    returnMessage.setMsg("处理设备类型 " + baseDeviceDo.getDeviceType() + "设备IP为:" + baseDeviceDo.getIpAddr() + " 时发生异常: " + e.getMessage());
                    failureMessages.add(returnMessage);
                }
            });

        }

        tasks.forEach(executor::submit);
        executor.shutdown();
        while (!executor.isTerminated()) {
            // 等待所有线程完成
        }


        return failureMessages;
    }


网站公告

今日签到

点亮在社区的每一天
去签到