【Java工具】Java-sftp线程池上传

发布于:2025-08-13 ⋅ 阅读:(14) ⋅ 点赞:(0)

定时使用线程池对指定目录下的文件进行多线程上传。
1.SftpConnectionPool 工具类

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 简单的 SFTP 连接池:每个连接包含 Session + ChannelSftp
 */
public class SftpConnectionPool {
    private static final Logger logger = LoggerFactory.getLogger(SftpConnectionPool.class);

    public static class SftpConnection {
        private final Session session;
        private final ChannelSftp channel;

        public SftpConnection(Session session, ChannelSftp channel) {
            this.session = session;
            this.channel = channel;
        }

        public ChannelSftp getChannel() {
            return channel;
        }

        public Session getSession() {
            return session;
        }

        public boolean isConnected() {
            return session != null && session.isConnected() && channel != null && channel.isConnected();
        }

        public void closeQuietly() {
            try {
                if (channel != null && channel.isConnected()) channel.disconnect();
            } catch (Exception ignore) {}
            try {
                if (session != null && session.isConnected()) session.disconnect();
            } catch (Exception ignore) {}
        }
    }

    private final BlockingQueue<SftpConnection> pool;
    private final String host;
    private final int port;
    private final String user;
    private final String password;
    private final int size;
    private final int connectTimeoutMs = 30000;

    public SftpConnectionPool(int size) throws JSchException {
        this.host = YmlUtil.getYmlValueNonNull("sftp.host");
        this.port = Integer.parseInt(YmlUtil.getYmlValueNonNull("sftp.port"));
        this.user = YmlUtil.getYmlValueNonNull("sftp.username");
        this.password = YmlUtil.getYmlValueNonNull("sftp.password");
        this.size =  size;
        this.pool = new ArrayBlockingQueue<>(size);
        for (int i = 0; i < size; i++) {
            pool.offer(createConnection());
        }
    }

    private SftpConnection createConnection() throws JSchException {
        JSch jsch = new JSch();
        Session session = jsch.getSession(user, host, port);
        session.setPassword(password);
        session.setConfig("StrictHostKeyChecking", "no");
        session.connect(connectTimeoutMs);
        ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
        channel.connect(connectTimeoutMs);
        return new SftpConnection(session, channel);
    }

    /**
     * 借用连接(会阻塞直到有连接)
     */
    public SftpConnection borrow() throws InterruptedException {
        return pool.take();
    }

    /**
     * 归还连接。如果连接不可用,会尝试替换成新连接放回池中
     */
    public void release(SftpConnection conn) {
        if (conn == null) return;
        try {
            if (conn.isConnected()) {
                pool.offer(conn, 5, TimeUnit.SECONDS);
                return;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        // 若不可用:销毁并替换
        invalidate(conn);
    }

    /**
     * 标记失效:关闭旧连接并尝试创建新的连接放回池中
     */
    public void invalidate(SftpConnection conn) {
        try {
            if (conn != null) conn.closeQuietly();
        } catch (Exception ignore) {}

        try {
            SftpConnection newConn = createConnection();
            pool.offer(newConn, 5, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.error("重建 SFTP 连接失败: {}", e.getMessage());
            // 如果重建失败,不阻塞(池会逐渐变小),上层会在下一次 borrow 中阻塞等待或报错
        }
    }

    /**
     * 关闭池内所有连接
     */
    public void closeAll() {
        SftpConnection conn;
        while ((conn = pool.poll()) != null) {
            try {
                conn.closeQuietly();
            } catch (Exception ignore) {}
        }
    }
}

2.代码调用

public void start() {
        if (!getEnable()) {
            return;
        }
        // 初始化线程池与连接池(只做一次)
        if (executor == null) {
            executor = Executors.newFixedThreadPool(poolSize);
        }
        if (sftpPool == null) {
            try {
                sftpPool = new SftpConnectionPool(poolSize);
            } catch (Exception e) {
                logger.error("初始化 SFTP 连接池失败: {}", e.getMessage(), e);
            }
        }
        task = new TimerTask() {
            @Override
            public void run() {
                if (runningFlag) {
                    logger.info("上传任务正在执行,本次任务不执行");
                    return;
                }
                runningFlag = true;
                try {
                    logger.info("上传任务开始执行");
                    dstPath = null;
                    upload();
                } catch (Exception e) {
                    logger.error("打包上传任务失败, error: {}", e.getMessage());
                } finally {
                    runningFlag = false;
                }
            }
        };
        //TODO 更新上传间隔
        timer.scheduleAtFixedRate(task, 0, uploadInterval * 1000);
    }

 public void close() {
        if (!getEnable()) {
            return;
        }
        logger.info("上传资源释放开始");
        if (task != null) {
            task.cancel();
            timer.cancel();
        }
        logger.info("上传资源定时任务关闭");
        // 如果需要删除 dstPath.w,使用连接池中的连接操作(保留你原有 close() 的行为)
        if (!StringUtils.isEmpty(dstPath) && sftpPool != null) {
            SftpConnectionPool.SftpConnection conn = null;
            try {
                conn = sftpPool.borrow();
                ChannelSftp ch = conn.getChannel();
                try {
                    ch.rm(dstPath + ".w");
                } catch (Exception ignore) {}
            } catch (Exception e) {
                logger.warn("close 时尝试删除远端 .w 文件失败: {}", e.getMessage());
            } finally {
                if (conn != null) sftpPool.release(conn);
            }
        }

        // 关闭线程池与连接池
        if (executor != null) {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(1, TimeUnit.MINUTES)) executor.shutdownNow();
            } catch (InterruptedException e) {
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        if (sftpPool != null) {
            sftpPool.closeAll();
        }
        logger.info("上传资源释放完成");
    }

网站公告

今日签到

点亮在社区的每一天
去签到