前言

最近项目上需要内外网数据传输,需要用到代理,原来直接用的第三方的代理库LIttleProxy,运行一段时间发现会出现内存溢出的情况,通过分析dump文件,发现是连接过多导致的,在github上查了一下,发现的确有人提了issue。但这个项目已经没有人维护了。遂决定基于netty自己实现一个代理服务。考虑到http代理还需要对每个请求进行解析,所以打算实现一个socks代理服务,同时为了兼顾一定的安全性,最终决定实现一个socks5的代理服务,因为socks5提供了用户名密码的安全验证功能。

过程

由于netty开发socks5代理服务不是很难,开发工作很快完成。后面就是客户单的适配,由于客户端使用了httpclient4.5,httpclient4.5本身不支持socks代理,但是jdk本身是提供socks代理功能,这是链接,所以socks代理对httpclient来说是透明的,不需要做任何处理可以直接用。在本机联调测试了一下,调用成功,看起来一切正常。

在第二天,将程序部署到一个真实的内网环境测试发现了问题。网络请求报如下异常:

java.net.UnknownHostException: www.baidu.com
 at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
 at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:850)
 at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1201)
 at java.net.InetAddress.getAllByName0(InetAddress.java:1154)
 at java.net.InetAddress.getAllByName(InetAddress.java:1084)
 at java.net.InetAddress.getAllByName(InetAddress.java:1020)
 at org.apache.http.impl.conn.DefaultClientConnectionOperator.resolveHostname(DefaultClientConnectionOperator.java:242)
 at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:130)
 at org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:150)
 at org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:121)
 at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:575)
 at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:425)
 at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:820)
 at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:754)
 at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:732)

由于这是内网环境,无法解析域名,但是httpclient请求又要求解析域名。通过网上找到方法绕过了这个坑。一切就绪后,发现还是有问题,报错变了:

Caused by: javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
    at sun.security.ssl.InputRecord.handleUnknownRecord(InputRecord.java:541)
    at sun.security.ssl.InputRecord.read(InputRecord.java:374)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:893)
    at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1294)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:848)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)

看到这个错误,感觉是SSL握手出问题了,由于对SSL不是很了解,临时看了一下协议,也没有任何进展。在同事提醒下,换个http的请求看看。发现也有问题:

org.apache.http.client.ClientProtocolException
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:187)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
    at HttpTest.http(HttpTest.java:163)
    at HttpTest.main(HttpTest.java:116)
Caused by: org.apache.http.ProtocolException: The server failed to respond with a valid HTTP response
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:149)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
    at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
    at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    ... 4 more

感觉是返回的请求数据不对,把httpclient的debug日志打开发现了问题:

DEBUG [org.apache.http.client.protocol.RequestAddCookies] CookieSpec selected: default
DEBUG [org.apache.http.client.protocol.RequestAuthCache] Auth cache not set in the context
DEBUG [org.apache.http.impl.conn.PoolingHttpClientConnectionManager] Connection request: [route: {}->http://hc.apache.org:80][total available: 0; route allocated: 0 of 30; total allocated: 0 of 30]
DEBUG [org.apache.http.impl.conn.PoolingHttpClientConnectionManager] Connection leased: [id: 0][route: {}->http://hc.apache.org:80][total available: 0; route allocated: 1 of 30; total allocated: 1 of 30]
DEBUG [org.apache.http.impl.execchain.MainClientExec] Opening connection {}->http://hc.apache.org:80
DEBUG [org.apache.http.impl.conn.DefaultHttpClientConnectionOperator] Connecting to hc.apache.org/95.216.24.32:80
DEBUG [org.apache.http.impl.conn.DefaultHttpClientConnectionOperator] Connection established 10.6.252.194:53999<->0.0.0.0:80
DEBUG [org.apache.http.impl.conn.DefaultManagedHttpClientConnection] http-outgoing-0: set socket timeout to 30000
DEBUG [org.apache.http.impl.execchain.MainClientExec] Executing request GET / HTTP/1.1
DEBUG [org.apache.http.impl.execchain.MainClientExec] Target auth state: UNCHALLENGED
DEBUG [org.apache.http.impl.execchain.MainClientExec] Proxy auth state: UNCHALLENGED
DEBUG [org.apache.http.headers] http-outgoing-0 >> GET / HTTP/1.1
DEBUG [org.apache.http.headers] http-outgoing-0 >> Host: hc.apache.org
DEBUG [org.apache.http.headers] http-outgoing-0 >> Connection: Keep-Alive
DEBUG [org.apache.http.headers] http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.12 (Java/1.8.0_191)
DEBUG [org.apache.http.headers] http-outgoing-0 >> Accept-Encoding: gzip,deflate
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: c.apache.orgPHTTP/1.1 200 OK
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Date: Thu, 14 May 2020 03:54:56 GMT
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Server: Apache/2.4.18 (Ubuntu)
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Last-Modified: Sat, 22 Feb 2020 12:48:20 GMT
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: ETag: "3239-59f298d8029ef-gzip"
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Accept-Ranges: bytes
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Vary: Accept-Encoding
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Content-Encoding: gzip
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Access-Control-Allow-Origin: *
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Content-Length: 3050
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Keep-Alive: timeout=5, max=2000
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Connection: Keep-Alive
DEBUG [org.apache.http.impl.conn.DefaultHttpResponseParser] Garbage in response: Content-Type: text/html

客户端收取的相应行居然多了几个字符c.apache.orgPHTTP/1.1 200 OK,所以导致无法解析response。但是这些字符是从哪里来的呢?首先怀疑是开发的socks5服务有问题,但是通过浏览器测试和curl测试都是正常的,说明socks5服务没问题。进一步排查发现,如果httpclient所在机器能够解析域名,就能正常发送请求,如果解析不了,就会出现这个问题。

带着疑惑重新梳理了一个socks5协议的流程,详情参见

  • 第一步,client与proxy建立连接后,发送一个请求,告诉proxy它支持的认证方法,格式如下:
    +——+————–+————–+
    |VER | NMETHODS | METHODS |
    +—–+—————+————–+
    | 1 | 1 | 1 to 255 |
    +—–+—————+————–+
    第一个字节,表示服务的版本号,对于socks5 就是“0X05”,第二个字节表示客户端支持的认证方法数量,METHODS,就是具体的方法,总数等于NMETHODS。

  • 第二步,proxy发送给client一个响应,通知client它选择的认证方法:
    +—-+——–+
    |VER | METHOD |
    +—-+——–+
    | 1 | 1 |
    +—-+——–+
    VER是版本号,同第一步,METHOD是从第一步的请求中选择的一个MEHTOD。
    目前定义的方法有:

    • 0X’00’ NO AUTHENTICATION REQUIRED
    • 0X’01’ GSSAPI
    • 0X’02’ USERNAME/PASSWORD
    • 0X’03’ to X’7F’ IANA ASSIGNED
    • 0X’80’ to X’FE’ RESERVED FOR PRIVATE METHODS
    • 0X’FF’ NO ACCEPTABLE METHODS
  • 第三步,这不是可选的,如果proxy不需要验证,则不进行这一步,主要进行安全认证的。
  • 第四步,是第三步的响应,说明认证是否成功。
  • 第五步,是发送请求,告诉proxy执行什么命令:
    +----+-----+-------+------+----------+----------+
    |VER | CMD |  RSV  | ATYP | DST.ADDR | DST.PORT |
    +----+-----+-------+------+----------+----------+
    | 1  |  1  | X'00' |  1   | Variable |    2     |
    +----+-----+-------+------+----------+----------+
    
    • VER protocol version: X’05’
    • CMD
      • CONNECT X’01’
      • BIND X’02’
      • UDP ASSOCIATE X’03’
    • RSV RESERVED
    • ATYP address type of following address
      • IP V4 address: X’01’
      • DOMAINNAME: X’03’
      • IP V6 address: X’04’
    • DST.ADDR desired destination address
    • DST.PORT desired destination port in network octet order

    由于,client无法解析域名,在我的场景中,ATYP是0X03,也就是DOMAIN,DOMAIN类型 DST.ADDR是不定长的,其第一个字节描述了长度。

  • 第六步,根据请求连接目标地址。

  • 第七步,Proxy发送响应给client,告诉客户端结果。

    +----+-----+-------+------+----------+----------+
    |VER | REP |  RSV  | ATYP | BND.ADDR | BND.PORT |
    +----+-----+-------+------+----------+----------+
    | 1  |  1  | X'00' |  1   | Variable |    2     |
    +----+-----+-------+------+----------+----------+
    
    • VER protocol version: X’05’
    • REP Reply field:
      • X’00’ succeeded
      • X’01’ general SOCKS server failure
      • X’02’ connection not allowed by ruleset
      • X’03’ Network unreachable
      • X’04’ Host unreachable
      • X’05’ Connection refused
      • X’06’ TTL expired
      • X’07’ Command not supported
      • X’08’ Address type not supported
      • X’09’ to X’FF’ unassigned
    • RSV RESERVED
    • ATYP address type of following address
      由于第五步,ATYP是DOMAIN,这个地方也是DOMAIN,然后把域名和端口返回给客户端。
  • 剩下的就是client和internet交互,proxy只负责转发数据。

在梳理socks5协议的过程中,参照jdk的代码,发现了问题的位置java.net.SocksSocketImpl.connect(SocketAddress endpoint, int timeout)。参照socks5的步骤:

       // 开始第一步
        out.write(PROTO_VERS);
        out.write(2);
        out.write(NO_AUTH);//不需要认证
        out.write(USER_PASSW);//用户名密码认证
        out.flush();
        byte[] data = new byte[2];
        int i = readSocksReply(in, data, deadlineMillis);
        if (i != 2 || ((int)data[0]) != PROTO_VERS) {
            // Maybe it's not a V5 sever after all
            // Let's try V4 before we give up
            // SOCKS Protocol version 4 doesn't know how to deal with
            // DOMAIN type of addresses (unresolved addresses here)
            if (epoint.isUnresolved())
                throw new UnknownHostException(epoint.toString());
            connectV4(in, out, epoint, deadlineMillis);
            return;
        }
        if (((int)data[1]) == NO_METHODS)
            throw new SocketException("SOCKS : No acceptable methods");
        //安全认证逻辑
        if (!authenticate(data[1], in, out, deadlineMillis)) {
            throw new SocketException("SOCKS : authentication failed");
        }
       //发送 CONNECT请求
        out.write(PROTO_VERS);
        out.write(CONNECT);
        out.write(0);
        /* Test for IPV4/IPV6/Unresolved */
       //由于内网没法解析域名,所以走此分支,DOMAIN方式
        if (epoint.isUnresolved()) {
            out.write(DOMAIN_NAME);
            out.write(epoint.getHostName().length());//先写入adder的长度
            try {
                out.write(epoint.getHostName().getBytes("ISO-8859-1"));//写域名的具体值
            } catch (java.io.UnsupportedEncodingException uee) {
                assert false;
            }
            out.write((epoint.getPort() >> 8) & 0xff);
            out.write((epoint.getPort() >> 0) & 0xff);
        } else if (epoint.getAddress() instanceof Inet6Address) {
            out.write(IPV6);
            out.write(epoint.getAddress().getAddress());
            out.write((epoint.getPort() >> 8) & 0xff);
            out.write((epoint.getPort() >> 0) & 0xff);
        } else {
            out.write(IPV4);
            out.write(epoint.getAddress().getAddress());
            out.write((epoint.getPort() >> 8) & 0xff);
            out.write((epoint.getPort() >> 0) & 0xff);
        }
        out.flush();
        data = new byte[4];
       //先读取4个字节的响应
        i = readSocksReply(in, data, deadlineMillis);
        if (i != 4)
            throw new SocketException("Reply from SOCKS server has bad length");
        SocketException ex = null;
        int len;
        byte[] addr;
       //data[1]存储的是状态,根据其判断是否成功
        switch (data[1]) {
        case REQUEST_OK:
            // success!
            // 根据data[3] 判断地址类型
            switch(data[3]) {
            case IPV4:
                addr = new byte[4];//IPV4 直接从网络读取四个字节。
                i = readSocksReply(in, addr, deadlineMillis);
                if (i != 4)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];//端口号
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            case DOMAIN_NAME:
                len = data[1];//等等问题出在这里,len不应该是从网络读一个字节么,怎么直接用了data[1]?
                byte[] host = new byte[len];
                i = readSocksReply(in, host, deadlineMillis);
                if (i != len)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            case IPV6:
                len = data[1];//这个地方也有问题,len应该是固定的16字节,怎么直接用了data[1]?
                addr = new byte[len];
                i = readSocksReply(in, addr, deadlineMillis);
                if (i != len)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            default:
                ex = new SocketException("Reply from SOCKS server contains wrong code");
                break;
            }
            break;
        case GENERAL_FAILURE:
            ex = new SocketException("SOCKS server general failure");
            break;
        case NOT_ALLOWED:
            ex = new SocketException("SOCKS: Connection not allowed by ruleset");
            break;
        case NET_UNREACHABLE:
            ex = new SocketException("SOCKS: Network unreachable");
            break;
        case HOST_UNREACHABLE:
            ex = new SocketException("SOCKS: Host unreachable");
            break;
        case CONN_REFUSED:
            ex = new SocketException("SOCKS: Connection refused");
            break;
        case TTL_EXPIRED:
            ex =  new SocketException("SOCKS: TTL expired");
            break;
        case CMD_NOT_SUPPORTED:
            ex = new SocketException("SOCKS: Command not supported");
            break;
        case ADDR_TYPE_NOT_SUP:
            ex = new SocketException("SOCKS: address type not supported");
            break;
        }
        if (ex != null) {
            in.close();
            out.close();
            throw ex;
        }

上面的代码描述中,已经指出了问题,实际上就是jdk实现的socks5,响应处理有问题,没有把完整的响应读出来,导致剩余的部分和HTTP的响应黏在了一起,最终导致http请求失败。

搜索了一下jdk的bug,发现了问题SOCKS proxying does not work with IPv6 connections。bug已经在jdk9 b02 和openjdk8u222修复了,查看了openjdk8u222的发现的确修复了这个问题:

data = new byte[4];
        i = readSocksReply(in, data, deadlineMillis);
        if (i != 4)
            throw new SocketException("Reply from SOCKS server has bad length");
        SocketException ex = null;
        int len;
        byte[] addr;
        switch (data[1]) {
        case REQUEST_OK:
            // success!
            switch(data[3]) {
            case IPV4:
                addr = new byte[4];
                i = readSocksReply(in, addr, deadlineMillis);
                if (i != 4)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            case DOMAIN_NAME:
                byte[] lenByte = new byte[1];//先读取一个字节的长度
                i = readSocksReply(in, lenByte, deadlineMillis);
                if (i != 1)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                len = lenByte[0] & 0xFF;
                byte[] host = new byte[len];//然后在读取具体内容
                i = readSocksReply(in, host, deadlineMillis);
                if (i != len)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            case IPV6:
                len = 16;//固定16字节
                addr = new byte[len];
                i = readSocksReply(in, addr, deadlineMillis);
                if (i != len)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                data = new byte[2];
                i = readSocksReply(in, data, deadlineMillis);
                if (i != 2)
                    throw new SocketException("Reply from SOCKS server badly formatted");
                break;
            default:
                ex = new SocketException("Reply from SOCKS server contains wrong code");
                break;
            }
            break;
        case GENERAL_FAILURE:
            ex = new SocketException("SOCKS server general failure");
            break;
        case NOT_ALLOWED:
            ex = new SocketException("SOCKS: Connection not allowed by ruleset");
            break;
        case NET_UNREACHABLE:
            ex = new SocketException("SOCKS: Network unreachable");
            break;
        case HOST_UNREACHABLE:
            ex = new SocketException("SOCKS: Host unreachable");
            break;
        case CONN_REFUSED:
            ex = new SocketException("SOCKS: Connection refused");
            break;
        case TTL_EXPIRED:
            ex =  new SocketException("SOCKS: TTL expired");
            break;
        case CMD_NOT_SUPPORTED:
            ex = new SocketException("SOCKS: Command not supported");
            break;
        case ADDR_TYPE_NOT_SUP:
            ex = new SocketException("SOCKS: address type not supported");
            break;
        }

问题找到了,但是升级比较困难,有些用了IBM的JDK。升级很麻烦。最终想了一个取巧的办法,socks5服务响应client的connect请求的时候,不按照请求的ATYP来写响应,直接写成IPV4类型(目前系统不支持IPV6),因为从源码来看,SocksSocketImpl也只是读取了响应,并没有做校验或者其他操作,这样绕过了这个bug,目前还没有遇到其他问题。

前言

最近在做的项目上遇到了奇怪的问题,有一个定时任务在某些情况下不再执行了。通过jstack发现线程池的线程还在,只是线程是waiting状态,通过排查代码最终发现是有一个地方疏忽了,其实这个问题可以避免的,坑也知道,可能就是写代码的时候不够仔细,现在记下来

过程

ScheduledThreadPoolExecutor是执行周期性任务的线程池。通过这个类可以实现简单的定时任务的功能。仔细查看该类的api说明可以发现我在前面遇到的问题,都已经说明,只是我没看仔细,太想当让了。下面以scheduleAtFixedRate的说明为例:

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

上面已经说得很清楚了,如果提交的任务抛出了异常,那么后续这个任务不再进行调度。

下面看一下方法scheduleAtFixedRate代码:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) {
        //数据校验
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //将Runnable封装成ScheduledFutureTask
        ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit),unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //延迟执行
        delayedExecute(t);
        return t;
    }

上面这段代码提交定时任务的代码,会把runnable对象封装成ScheduledFutureTask然后再调度执行。

下面看一下delayedExecute的逻辑:

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            //把任务加到队列
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //预启动一个Worker
                ensurePrestart();
        }
    }

这段代码简单明了,把任务加到队列,然后交由worker执行。

下面我们看一下worker的执行逻辑方法runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这段代码简单介绍一下就是,拿到任务,执行任务,就这么简单,这里面会调用任务的run方法,我们只需要看一下ScheduledFutureTaskrun方法就行了:

        public void run() {
            // 是否周期性任务
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //周期性任务走这个分支,
            else if (ScheduledFutureTask.super.runAndReset()) {
                //设置下次执行时间
                setNextRunTime();
                //继续下次调度
                reExecutePeriodic(outerTask);
            }
        }

可以看到runAndReset方法返回true的时候才会进行下次调度,我们继续看runAndReset方法:

 protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    //如果上面的方法抛出异常,那么ran就不会为true。整个方法就会返回false
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

上面代码可以发现,如果我们提交的任务抛出异常,就不会在进行下一次调度了,这个方法说明是一致的。

总结

写代码的时候还是要仔细查看API说明,不要想当然。
对于用ScheduledThreadPoolExecutor来执行周期性任务的场景,对于任务应该自己把异常处理好。
例如:

public void run() {
    // TODO Auto-generated method stub
    try {
        //somecode
        code();
    }catch(Throwable t) {
        log.error(t.getMessage(), t);
    }
}