定时使用线程池对指定目录下的文件进行多线程上传。
1.SftpConnectionPool 工具类
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 简单的 SFTP 连接池:每个连接包含 Session + ChannelSftp
*/
public class SftpConnectionPool {
private static final Logger logger = LoggerFactory.getLogger(SftpConnectionPool.class);
public static class SftpConnection {
private final Session session;
private final ChannelSftp channel;
public SftpConnection(Session session, ChannelSftp channel) {
this.session = session;
this.channel = channel;
}
public ChannelSftp getChannel() {
return channel;
}
public Session getSession() {
return session;
}
public boolean isConnected() {
return session != null && session.isConnected() && channel != null && channel.isConnected();
}
public void closeQuietly() {
try {
if (channel != null && channel.isConnected()) channel.disconnect();
} catch (Exception ignore) {}
try {
if (session != null && session.isConnected()) session.disconnect();
} catch (Exception ignore) {}
}
}
private final BlockingQueue<SftpConnection> pool;
private final String host;
private final int port;
private final String user;
private final String password;
private final int size;
private final int connectTimeoutMs = 30000;
public SftpConnectionPool(int size) throws JSchException {
this.host = YmlUtil.getYmlValueNonNull("sftp.host");
this.port = Integer.parseInt(YmlUtil.getYmlValueNonNull("sftp.port"));
this.user = YmlUtil.getYmlValueNonNull("sftp.username");
this.password = YmlUtil.getYmlValueNonNull("sftp.password");
this.size = size;
this.pool = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
pool.offer(createConnection());
}
}
private SftpConnection createConnection() throws JSchException {
JSch jsch = new JSch();
Session session = jsch.getSession(user, host, port);
session.setPassword(password);
session.setConfig("StrictHostKeyChecking", "no");
session.connect(connectTimeoutMs);
ChannelSftp channel = (ChannelSftp) session.openChannel("sftp");
channel.connect(connectTimeoutMs);
return new SftpConnection(session, channel);
}
/**
* 借用连接(会阻塞直到有连接)
*/
public SftpConnection borrow() throws InterruptedException {
return pool.take();
}
/**
* 归还连接。如果连接不可用,会尝试替换成新连接放回池中
*/
public void release(SftpConnection conn) {
if (conn == null) return;
try {
if (conn.isConnected()) {
pool.offer(conn, 5, TimeUnit.SECONDS);
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 若不可用:销毁并替换
invalidate(conn);
}
/**
* 标记失效:关闭旧连接并尝试创建新的连接放回池中
*/
public void invalidate(SftpConnection conn) {
try {
if (conn != null) conn.closeQuietly();
} catch (Exception ignore) {}
try {
SftpConnection newConn = createConnection();
pool.offer(newConn, 5, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("重建 SFTP 连接失败: {}", e.getMessage());
// 如果重建失败,不阻塞(池会逐渐变小),上层会在下一次 borrow 中阻塞等待或报错
}
}
/**
* 关闭池内所有连接
*/
public void closeAll() {
SftpConnection conn;
while ((conn = pool.poll()) != null) {
try {
conn.closeQuietly();
} catch (Exception ignore) {}
}
}
}
2.代码调用
public void start() {
if (!getEnable()) {
return;
}
// 初始化线程池与连接池(只做一次)
if (executor == null) {
executor = Executors.newFixedThreadPool(poolSize);
}
if (sftpPool == null) {
try {
sftpPool = new SftpConnectionPool(poolSize);
} catch (Exception e) {
logger.error("初始化 SFTP 连接池失败: {}", e.getMessage(), e);
}
}
task = new TimerTask() {
@Override
public void run() {
if (runningFlag) {
logger.info("上传任务正在执行,本次任务不执行");
return;
}
runningFlag = true;
try {
logger.info("上传任务开始执行");
dstPath = null;
upload();
} catch (Exception e) {
logger.error("打包上传任务失败, error: {}", e.getMessage());
} finally {
runningFlag = false;
}
}
};
//TODO 更新上传间隔
timer.scheduleAtFixedRate(task, 0, uploadInterval * 1000);
}
public void close() {
if (!getEnable()) {
return;
}
logger.info("上传资源释放开始");
if (task != null) {
task.cancel();
timer.cancel();
}
logger.info("上传资源定时任务关闭");
// 如果需要删除 dstPath.w,使用连接池中的连接操作(保留你原有 close() 的行为)
if (!StringUtils.isEmpty(dstPath) && sftpPool != null) {
SftpConnectionPool.SftpConnection conn = null;
try {
conn = sftpPool.borrow();
ChannelSftp ch = conn.getChannel();
try {
ch.rm(dstPath + ".w");
} catch (Exception ignore) {}
} catch (Exception e) {
logger.warn("close 时尝试删除远端 .w 文件失败: {}", e.getMessage());
} finally {
if (conn != null) sftpPool.release(conn);
}
}
// 关闭线程池与连接池
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) executor.shutdownNow();
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
if (sftpPool != null) {
sftpPool.closeAll();
}
logger.info("上传资源释放完成");
}