ScheduledThreadPoolExecutor和时间轮算法比较

发布于:2024-06-28 ⋅ 阅读:(46) ⋅ 点赞:(0)

最近项目中需要用到超时操作,对于不是特别优秀的timer和DelayQueue没有看。

  • Timer 是单线程模式。如果某个 TimerTask 执行时间很久,会影响其他任务的调度。
  • Timer 的任务调度是基于系统绝对时间的,如果系统时间不正确,可能会出现问题。
  • TimerTask 如果执行出现异常,Timer 并不会捕获,会导致线程终止,其他任务永远不会执行。
  • 相比于 Timer,DelayQueue 只实现了任务管理的功能,需要与异步线程配合使用。

着重看了一下ScheduledThreadPoolExecutor和时间轮算法,以下一些大致讲解和我的理解。

ScheduledThreadPoolExecutor

前置知识

继承于ThreadPoolExecutor,也就是说调用线程执行任务的流程是一样的,详见ensurePrestart();方法。

在ScheduledThreadPoolExecutor中使用的阻塞队列是DelayedWorkQueue,一个自定义的类。内部使用以下queue存放定时任务。

private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

上边提到的这个queue,是一个模拟优先级队列的堆(定时任务类实现了compareTo的方法),

    public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }

查询时间复杂度log(n),可以从siftUp方法看出来是以何种规则把任务存放到数组。

大致流程

创建一个定时任务

把任务放在优先级阻塞队列

新增worker(ThreadPoolExecutor的流程)从阻塞队列拿出任务

判断任务时间(任务中会有预期执行时间,时间不到则调用available.awaitNanos(delay);阻塞)

由于是优先级队列,所以都从queue[0]去取任务,一个线程阻塞,其他线程available.await();也阻塞

具体代码:

    public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0L)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

    private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

        public RunnableScheduledFuture<?> poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                RunnableScheduledFuture<?> first = queue[0];
                return (first == null || first.getDelay(NANOSECONDS) > 0)
                    ? null
                    : finishPoll(first);
            } finally {
                lock.unlock();
            }
        }

 个人看法

面对海量任务插入和删除的场景,会遇到比较严重的性能瓶颈

原因有以下两方面:

1. 自定义的阻塞队列queue不能自定义初始容量,只有16,需要考虑是否会频繁扩容

2. 就是上方提到的以下两个方法会导致 DelayedWorkQueue 数据结构频繁发生变化

    private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

        /**
         * Sifts element added at top down to its heap-ordered spot.
         * Call only when holding lock.
         */
        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }

时间轮算法

原理

原理很简单,一张图直接概括(偷来的图,下图是netty的实现),只看原理不看类名

时间的维度是无限的,但是钟表却能表示无限的时间。所以这个时间轮算法和钟表相似都可以表示无限的时间,只要把圈数和槽位记录好即可。

需要注意的是,时间轮算法虽然看起来是物理上的⚪,但其实只是逻辑上的回环,走到末尾时重新回到头。

类比钟表,时间轮上也有一个个的插槽(slot),一个插槽代表的是时间间隔(interval),时间间隔越小,时间表示越精确。

大致流程

放入定时任务

封装任务记录剩余圈数,槽位等等

扫描槽位以及链表,扫描到对应槽位找到剩余圈数为0的执行

个人看法,因为这只是算法思想,所以以下不是缺陷而是需要注意的点

  • 如果长时间没有到期任务,那么会存在时间轮空推进的现象。
  • 只适用于处理耗时较短的任务,由于 Worker 是单线程的,如果一个任务执行的时间过长,会造成 Worker 线程阻塞。
  • 相比传统定时器的实现方式,内存占用较大。

网站公告

今日签到

点亮在社区的每一天
去签到