前言

最近在做的项目上遇到了奇怪的问题,有一个定时任务在某些情况下不再执行了。通过jstack发现线程池的线程还在,只是线程是waiting状态,通过排查代码最终发现是有一个地方疏忽了,其实这个问题可以避免的,坑也知道,可能就是写代码的时候不够仔细,现在记下来

过程

ScheduledThreadPoolExecutor是执行周期性任务的线程池。通过这个类可以实现简单的定时任务的功能。仔细查看该类的api说明可以发现我在前面遇到的问题,都已经说明,只是我没看仔细,太想当让了。下面以scheduleAtFixedRate的说明为例:

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

上面已经说得很清楚了,如果提交的任务抛出了异常,那么后续这个任务不再进行调度。

下面看一下方法scheduleAtFixedRate代码:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) {
        //数据校验
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //将Runnable封装成ScheduledFutureTask
        ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit),unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //延迟执行
        delayedExecute(t);
        return t;
    }

上面这段代码提交定时任务的代码,会把runnable对象封装成ScheduledFutureTask然后再调度执行。

下面看一下delayedExecute的逻辑:

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            //把任务加到队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //预启动一个Worker
                ensurePrestart();
        }
    }

这段代码简单明了,把任务加到队列,然后交由worker执行。

下面我们看一下worker的执行逻辑方法runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这段代码简单介绍一下就是,拿到任务,执行任务,就这么简单,这里面会调用任务的run方法,我们只需要看一下ScheduledFutureTaskrun方法就行了:

        public void run() {
            // 是否周期性任务
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //周期性任务走这个分支,
            else if (ScheduledFutureTask.super.runAndReset()) {
                //设置下次执行时间
                setNextRunTime();
                //继续下次调度
                reExecutePeriodic(outerTask);
            }
        }

可以看到runAndReset方法返回true的时候才会进行下次调度,我们继续看runAndReset方法:

 protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    //如果上面的方法抛出异常,那么ran就不会为true。整个方法就会返回false
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

上面代码可以发现,如果我们提交的任务抛出异常,就不会在进行下一次调度了,这个方法说明是一致的。

总结

写代码的时候还是要仔细查看API说明,不要想当然。
对于用ScheduledThreadPoolExecutor来执行周期性任务的场景,对于任务应该自己把异常处理好。
例如:

public void run() {
    // TODO Auto-generated method stub
    try {
        //somecode
        code();
    }catch(Throwable t) {
        log.error(t.getMessage(), t);
    }
}