这个需求其实就是一个 带优先级调度、可取消的线程池任务管理器。封装一个 Java Demo,支持:
任务优先级:数值越大优先级越高,可以“插队”。
任务取消:可以根据任务 ID 取消还未执行的任务。
多线程并发执行:用线程池来跑任务。
✨ 基础功能
实现思路
使用
PriorityBlockingQueue
管理任务,保证高优先级任务先执行。封装一个
Task
类,实现Runnable
和Comparable<Task>
,用于排序。每个任务有 唯一 ID,方便取消。
使用
ThreadPoolExecutor
配合优先级队列执行任务。
代码示例
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 优先级任务
*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final String taskId;
private final int priority;
private final Runnable action;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
public PriorityTask(String taskId, int priority, Runnable action) {
this.taskId = taskId;
this.priority = priority;
this.action = action;
}
public String getTaskId() {
return taskId;
}
public void cancel() {
cancelled.set(true);
}
@Override
public void run() {
if (!cancelled.get()) {
action.run();
} else {
System.out.println("任务 " + taskId + " 已被取消,未执行");
}
}
@Override
public int compareTo(PriorityTask other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
}
/**
* 任务管理器
*/
class TaskManager {
private final ThreadPoolExecutor executor;
private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();
public TaskManager(int poolSize) {
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
this.executor = new ThreadPoolExecutor(
poolSize,
poolSize,
60L,
TimeUnit.SECONDS,
queue
);
}
/** 提交任务 */
public String submitTask(int priority, Runnable action) {
String taskId = UUID.randomUUID().toString();
PriorityTask task = new PriorityTask(taskId, priority, action);
taskMap.put(taskId, task);
executor.execute(task);
return taskId;
}
/** 取消任务 */
public boolean cancelTask(String taskId) {
PriorityTask task = taskMap.remove(taskId);
if (task != null) {
task.cancel();
return true;
}
return false;
}
/** 关闭线程池 */
public void shutdown() {
executor.shutdown();
}
}
/**
* 测试
*/
public class PriorityTaskDemo {
public static void main(String[] args) throws InterruptedException {
TaskManager manager = new TaskManager(3);
// 提交一些任务
String id1 = manager.submitTask(1, () -> {
System.out.println("执行任务1,优先级1");
});
String id2 = manager.submitTask(10, () -> {
System.out.println("执行任务2,优先级10");
});
String id3 = manager.submitTask(5, () -> {
System.out.println("执行任务3,优先级5");
});
// 取消一个任务
boolean cancelled = manager.cancelTask(id3);
System.out.println("取消任务3结果: " + cancelled);
Thread.sleep(2000);
manager.shutdown();
}
}
执行结果示例
可能输出类似:
执行任务1,优先级1
执行任务2,优先级10
任务 c264a955-23e2-4192-8914-bd5e83d9f89d 已被取消,未执行
取消任务3结果: true
说明:
任务 优先级10 插队最先执行。
任务3 被取消,不会执行。
任务1 继续执行。
进一步扩展一下,让任务支持 回调监听(成功/失败/取消),方便在业务里统一埋点
✨ 功能扩展
每个任务可以带一个
TaskListener
回调,监听任务状态:onSuccess(String taskId)
onFailure(String taskId, Throwable error)
onCancelled(String taskId)
PriorityTask
内部捕获异常,调用对应的回调方法。TaskManager
提交任务时可以选择性传入监听器。
完整代码
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 任务监听器
*/
interface TaskListener {
void onSuccess(String taskId);
void onFailure(String taskId, Throwable error);
void onCancelled(String taskId);
}
/**
* 优先级任务
*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final String taskId;
private final int priority;
private final Runnable action;
private final TaskListener listener;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
public PriorityTask(String taskId, int priority, Runnable action, TaskListener listener) {
this.taskId = taskId;
this.priority = priority;
this.action = action;
this.listener = listener;
}
public String getTaskId() {
return taskId;
}
public void cancel() {
cancelled.set(true);
}
@Override
public void run() {
if (cancelled.get()) {
if (listener != null) listener.onCancelled(taskId);
System.out.println("任务 " + taskId + " 已被取消,未执行");
return;
}
try {
action.run();
if (listener != null) listener.onSuccess(taskId);
} catch (Throwable t) {
if (listener != null) listener.onFailure(taskId, t);
}
}
@Override
public int compareTo(PriorityTask other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
}
/**
* 任务管理器
*/
class TaskManager {
private final ThreadPoolExecutor executor;
private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();
public TaskManager(int poolSize) {
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
this.executor = new ThreadPoolExecutor(
poolSize,
poolSize,
60L,
TimeUnit.SECONDS,
queue
);
}
/** 提交任务(带监听器) */
public String submitTask(int priority, Runnable action, TaskListener listener) {
String taskId = UUID.randomUUID().toString();
PriorityTask task = new PriorityTask(taskId, priority, action, listener);
taskMap.put(taskId, task);
executor.execute(task);
return taskId;
}
/** 简化方法(无监听器) */
public String submitTask(int priority, Runnable action) {
return submitTask(priority, action, null);
}
/** 取消任务 */
public boolean cancelTask(String taskId) {
PriorityTask task = taskMap.remove(taskId);
if (task != null) {
task.cancel();
return true;
}
return false;
}
/** 关闭线程池 */
public void shutdown() {
executor.shutdown();
}
}
/**
* 测试
*/
public class PriorityTaskDemo {
public static void main(String[] args) throws InterruptedException {
TaskManager manager = new TaskManager(3);
// 定义一个统一的监听器
TaskListener listener = new TaskListener() {
@Override
public void onSuccess(String taskId) {
System.out.println("任务 " + taskId + " 执行成功 ✅");
}
@Override
public void onFailure(String taskId, Throwable error) {
System.out.println("任务 " + taskId + " 执行失败 ❌: " + error.getMessage());
}
@Override
public void onCancelled(String taskId) {
System.out.println("任务 " + taskId + " 被取消 ⏹️");
}
};
// 提交任务
String id1 = manager.submitTask(1, () -> {
System.out.println("执行任务1(优先级1)");
}, listener);
String id2 = manager.submitTask(10, () -> {
System.out.println("执行任务2(优先级10)");
throw new RuntimeException("模拟异常");
}, listener);
String id3 = manager.submitTask(5, () -> {
System.out.println("执行任务3(优先级5)");
}, listener);
// 取消一个任务
boolean cancelled = manager.cancelTask(id3);
System.out.println("取消任务3结果: " + cancelled);
Thread.sleep(2000);
manager.shutdown();
}
}
运行结果示例
执行任务1(优先级1)
执行任务2(优先级10)
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 被取消 ⏹️
任务 cc143e3d-3808-4b98-93a8-09ba90e281f2 已被取消,未执行
任务 ede52aab-e4a3-4896-88a3-e664b1c1a667 执行成功 ✅
取消任务3结果: true
任务 36230882-e049-44a9-8406-a90d34acad47 执行失败 ❌: 模拟异常
这样就实现了:
优先级插队(10 → 5 → 1)
取消任务(取消后不会执行,并回调
onCancelled
)失败捕获(异常不会导致线程池崩溃,回调
onFailure
)成功回调(回调
onSuccess
)
再加一个 全局监听器(所有任务统一回调,方便在一个地方打点统计),希望在 一个地方统一打点统计,包括:
任务ID
开始时间
结束时间
耗时
执行状态(成功 / 失败 / 取消)
这可以通过在 TaskManager
里引入一个 全局监听器 GlobalTaskListener 来实现。
✨ 改进方案
新增
GlobalTaskListener
接口,统一接收所有任务的生命周期事件。PriorityTask
内部在执行时打点(记录开始、结束时间),并回调到GlobalTaskListener
。TaskManager
可以配置一个全局监听器(所有任务都会走这里)。
🔧 完整代码
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 全局任务监听器(统一打点)
*/
interface GlobalTaskListener {
void onTaskStart(String taskId, int priority, Instant startTime);
void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime, Duration duration);
void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime, Duration duration,
Throwable error);
void onTaskCancelled(String taskId, int priority, Instant startTime);
}
/**
* 优先级任务
*/
class PriorityTask implements Runnable, Comparable<PriorityTask> {
private final String taskId;
private final int priority;
private final Runnable action;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final GlobalTaskListener globalListener;
public PriorityTask(String taskId, int priority, Runnable action, GlobalTaskListener globalListener) {
this.taskId = taskId;
this.priority = priority;
this.action = action;
this.globalListener = globalListener;
}
public String getTaskId() {
return taskId;
}
public void cancel() {
cancelled.set(true);
}
@Override
public void run() {
Instant startTime = Instant.now();
if (cancelled.get()) {
if (globalListener != null) {
globalListener.onTaskCancelled(taskId, priority, startTime);
}
System.out.println("任务 " + taskId + " 已被取消,未执行");
return;
}
if (globalListener != null) {
globalListener.onTaskStart(taskId, priority, startTime);
}
try {
action.run();
Instant endTime = Instant.now();
if (globalListener != null) {
globalListener.onTaskSuccess(taskId, priority, startTime, endTime,
Duration.between(startTime, endTime));
}
} catch (Throwable t) {
Instant endTime = Instant.now();
if (globalListener != null) {
globalListener.onTaskFailure(taskId, priority, startTime, endTime, Duration.between(startTime, endTime),
t);
}
}
}
@Override
public int compareTo(PriorityTask other) {
// 优先级高的排在前面
return Integer.compare(other.priority, this.priority);
}
}
/**
* 任务管理器
*/
class TaskManager {
private final ThreadPoolExecutor executor;
private final ConcurrentHashMap<String, PriorityTask> taskMap = new ConcurrentHashMap<>();
private final GlobalTaskListener globalListener;
public TaskManager(int poolSize, GlobalTaskListener globalListener) {
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
this.executor = new ThreadPoolExecutor(
poolSize,
poolSize,
60L,
TimeUnit.SECONDS,
queue);
this.globalListener = globalListener;
}
/** 提交任务 */
public String submitTask(int priority, Runnable action) {
String taskId = UUID.randomUUID().toString();
PriorityTask task = new PriorityTask(taskId, priority, action, globalListener);
taskMap.put(taskId, task);
executor.execute(task);
return taskId;
}
/** 取消任务 */
public boolean cancelTask(String taskId) {
PriorityTask task = taskMap.remove(taskId);
if (task != null) {
task.cancel();
return true;
}
return false;
}
/** 关闭线程池 */
public void shutdown() {
executor.shutdown();
}
}
/**
* 测试
*/
public class PriorityTaskDemo {
public static void main(String[] args) throws InterruptedException {
// 定义全局监听器
GlobalTaskListener listener = new GlobalTaskListener() {
@Override
public void onTaskStart(String taskId, int priority, Instant startTime) {
System.out.println("任务 " + taskId + "(优先级 " + priority + ")开始执行,时间:" + startTime);
}
@Override
public void onTaskSuccess(String taskId, int priority, Instant startTime, Instant endTime,
Duration duration) {
System.out.println("任务 " + taskId + " 成功 ✅,耗时:" + duration.toMillis() + " ms");
}
@Override
public void onTaskFailure(String taskId, int priority, Instant startTime, Instant endTime,
Duration duration, Throwable error) {
System.out.println("任务 " + taskId + " 失败 ❌,耗时:" + duration.toMillis() + " ms,错误:" + error.getMessage());
}
@Override
public void onTaskCancelled(String taskId, int priority, Instant startTime) {
System.out.println("任务 " + taskId + " 被取消 ⏹️,取消时间:" + startTime);
}
};
TaskManager manager = new TaskManager(3, listener);
// 提交几个任务
String id1 = manager.submitTask(1, () -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
System.out.println("执行任务1");
});
String id2 = manager.submitTask(10, () -> {
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
System.out.println("执行任务2");
throw new RuntimeException("模拟异常");
});
String id3 = manager.submitTask(5, () -> {
try {
Thread.sleep(300);
} catch (InterruptedException ignored) {
}
System.out.println("执行任务3");
});
// 取消任务3
boolean cancelled = manager.cancelTask(id3);
System.out.println("取消任务3结果: " + cancelled);
Thread.sleep(2000);
manager.shutdown();
}
}
🖥️ 输出示例
取消任务3结果: true
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 被取消 ⏹️,取消时间:2025-09-12T15:49:57.720205Z
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a(优先级 1)开始执行,时间:2025-09-12T15:49:57.719738Z
任务 d489c530-70d7-4ee7-87b8-627eba04c797(优先级 10)开始执行,时间:2025-09-12T15:49:57.719892Z
任务 4504f655-738c-4cdc-b7ba-85db8ccd8c35 已被取消,未执行
执行任务2
任务 d489c530-70d7-4ee7-87b8-627eba04c797 失败 ❌,耗时:218 ms,错误:模拟异常
执行任务1
任务 6a9d761e-97e8-4e47-a461-cc68217bf19a 成功 ✅,耗时:518 ms
这样就可以在一个地方(GlobalTaskListener
)统一打点,清楚知道:
任务什么时候开始
是否执行成功 / 失败 / 被取消
每个任务的耗时