Jetty QueuedThreadPool 源碼分析

前言

目前正在維護的一個http服務採用的容器是jetty,而且不是spring-boot,連spring都沒有,就是手寫的jetty server和handler等等。最近在作壓測時發現一個奇怪的現象:jetty的線程池在達到滿載(最大2000個線程)以後,即便下降了壓力,線程池的線程數仍然沒有及時的降低到正常水平,而是很是緩慢地降低,大約每兩個小時降低1%左右。java

這讓我感到很奇怪,明明線程的idle時間設置的是5min,爲何線程數過了好久都沒有恢復正常,本覺得jetty裏的線程池應該和jdk自帶的線程池沒什麼區別,看來事實並非我想象的那樣,因而決定了解下jetty提供的QueuedThreadPool的具體實現(基於9.4.8.v20171121版本)。spring

源碼部分

咱們在項目中使用的是QueuedThreadPool,它繼承了AbstractLifeCycle類,同時實現了SizedThreadPool以及Dumpable兩個接口。AbstractLifeCycle是jetty中用於管理實例生命週期相關的邏輯,SizedThreadPool則繼承自jdk的ThreadPool,在其基礎上增長了一些用於獲取ThreadPool狀態的方法,而Dumpable則聲明瞭將對象dump爲string的方法。app

QueuedThreadPool採用了jdk中的ThreadPoolExecutor不一樣的實現方式,看起來邏輯彷佛更簡單。ide

成員變量

QueuedThreadPool的成員變量並很少,具體列列舉以下:spring-boot

private final AtomicInteger _threadsStarted; //線程池中一共有多少線程
    private final AtomicInteger _threadsIdle;//線程池中一共有多少空閒的線程
    private final AtomicLong _lastShrink;//上一次「縮小」的時間戳,「縮小」即減小線程數
    private final Set<Thread> _threads;//線程池中的線程
    private final Object _joinLock;//等待全部線程結束用到的lock
    private final BlockingQueue<Runnable> _jobs; //線程池任務隊列
    private final ThreadGroup _threadGroup;//線程池任的線程組
    private String _name;//線程池名稱
    private int _idleTimeout;//線程空閒的是時間
    private int _maxThreads;//線程池最大size
    private int _minThreads;//線程池最下size
    private int _priority;//線程池中線程的優先級
    private boolean _daemon;//線程池中線程的daemon屬性
    private boolean _detailedDump;//dump線程池時是否打印詳細的信息
    private int _lowThreadsThreshold;//用於判斷線程池是否缺乏足夠的線程的閾值
    private ThreadPoolBudget _budget;//線程池任務隊列
    private Runnable _runnable;//線程池中每一個線程的主要運行邏輯
複製代碼

主要方法

doStart

doStart其實是在AbstractLifeCycle中定義的,表示實例生命週期的開始,QueuedThreadPool中的主要實現就是預先啓動_minThreads個線程。oop

@Override
    protected void doStart() throws Exception {
        super.doStart();
        _threadsStarted.set(0);
        startThreads(_minThreads);
    }
複製代碼

doStop

doStop也是在AbstractLifeCycle中定義的,表示實例生命週期的結束,QueuedThreadPool中的主要實現就是嘗試中止全部線程。在中止過程當中,會嘗試將全部已提交的任務執行完。ui

@Override
    protected void doStop() throws Exception {
        super.doStop();
        long timeout = getStopTimeout();
        BlockingQueue<Runnable> jobs = getQueue();
        //若是沒有指定stopTimeout,直接清空任務隊列
        if (timeout <= 0)
            jobs.clear();
        //用noop把隊列填滿,目前一共有多少個線程就提交多少個noop任務
        Runnable noop = () -> {};
        for (int i = _threadsStarted.get(); i-- > 0; )
            jobs.offer(noop);
        //先用stopTimeout一半的時間來讓全部任務天然執行完
        long stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
        for (Thread thread : _threads)
        {
            long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
            if (canwait > 0)
                thread.join(canwait);
        }
        //若是到這裏還有任務沒執行完,再嘗試激進一點的策略
        //把剩餘的線程interrupt一遍
        if (_threadsStarted.get() > 0)
            for (Thread thread : _threads)
                thread.interrupt();
        //再用stopTimeout一半的時間來讓全部任務天然執行完
        stopby = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) / 2;
        for (Thread thread : _threads)
        {
            long canwait = TimeUnit.NANOSECONDS.toMillis(stopby - System.nanoTime());
            if (canwait > 0)
                thread.join(canwait);
        }
        Thread.yield();
        int size = _threads.size();
        if (size > 0) //若是還有任務沒執行完,就把剩下的線程都打印一遍
        {
            Thread.yield();
            if (LOG.isDebugEnabled())
            {
                for (Thread unstopped : _threads)
                {
                    StringBuilder dmp = new StringBuilder();
                    for (StackTraceElement element : unstopped.getStackTrace())
                    {
                        dmp.append(System.lineSeparator()).append("\tat ").append(element);
                    }
                    LOG.warn("Couldn't stop {}{}", unstopped, dmp.toString());
                }
            }
            else
            {
                for (Thread unstopped : _threads)
                    LOG.warn("{} Couldn't stop {}",this,unstopped);
            }
        }
        if (_budget!=null)
            _budget.reset();

        synchronized (_joinLock)
        {
            _joinLock.notifyAll();
        }
    }
複製代碼

總的來講,結束過程當中會在指定的超時時間過去一半的時候把還沒執行完的線程都interrupt一下,若是到最後還有任務沒執行完,就把它們dump並打印出來。this

execute

execute即向線程池提交任務的方法,QueuedThreadPool的實現也很簡單,就是嘗試把任務放到隊列裏,而後按需建立新的線程。若是入隊列失敗,則拋出拒絕異常。這裏並不向ThreadPoolExecutor同樣支持指定reject發生時的處理側率,而是直接拋出一個拒絕異常。spa

@Override
    public void execute(Runnable job) {
        if (LOG.isDebugEnabled())
            LOG.debug("queue {}",job);
        if (!isRunning() || !_jobs.offer(job)) //嘗試將任務放入隊列
        {
            LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }
        else
        {
            //若是入隊列成果以後發現一個線程都沒了,就從新建立一個線程
            if (getThreads() == 0)
                startThreads(1);
        }
    }
複製代碼

線程的主要執行邏輯

線程的主要執行邏輯就是runnable成員變量了,其大致的思路就是:線程

  1. 線程啓動後,開始從任務隊列中循環獲取任務。
  2. 若是能成功獲取任務,就執行獲取到的任務。
  3. 若是沒法獲取新的任務,則跳出獲取任務的循環,將_threadsIdle加一,標識自身進入了idle狀態
  4. 當自身出於idle狀態時,根據指定的條件判斷是否須要殺掉空閒的線程
private Runnable _runnable = new Runnable()
    {
        @Override
        public void run() {
            boolean shrink = false; //是否殺掉自身的標識
            boolean ignore = false;//自身是否屬於意外退出的標識
            try
            {
                Runnable job = _jobs.poll(); 從隊列中獲取任務,不會阻塞
                if (job != null && _threadsIdle.get() == 0) 
                {
                    //若是獲取到了任務,同時沒有空閒線程,就建立一個線程。這裏是否有必要?
                    //雖然_threadsIdle,可是當前線程不就是空閒的嗎?看9.4.24.v20191120版本的實現裏已經沒有這邏輯了
                    startThreads(1);
                }
                loop: while (isRunning())
                {
                    //循環獲取任務並執行
                    while (job != null && isRunning())
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("run {}",job);
                        runJob(job);
                        if (LOG.isDebugEnabled())
                            LOG.debug("ran {}",job);
                        if (Thread.interrupted())
                        {
                            ignore=true;
                            break loop;
                        }
                        job = _jobs.poll();
                    }
                    //進入空閒狀態
                    try
                    {
                        _threadsIdle.incrementAndGet();
                        while (isRunning() && job == null)
                        {
                            //若是沒有指定idleTimeout,就阻塞地嘗試獲取任務
                            if (_idleTimeout <= 0)
                                job = _jobs.take();
                            else
                            {
                                //判斷是否須要結束本身:1. size大於_minThreads 2. 距離上次結束線程超過了_idleTimeout
                                final int size = _threadsStarted.get();
                                if (size > _minThreads)
                                {
                                    long last = _lastShrink.get();
                                    long now = System.nanoTime();
                                    if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(_idleTimeout))
                                    {
                                        if (_lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1))
                                        {
                                            //若是知足結束的條件,就跳出外層循環,結束執行
                                            shrink=true;
                                            break loop;
                                        }
                                    }
                                }
                                //若是不知足結束的條件,就嘗試阻塞獲取任務,同時超時時間爲idleTimeout
                                job = idleJobPoll();
                            }
                        }
                    }
                    finally
                    {
                        if (_threadsIdle.decrementAndGet() == 0)
                        {
                            startThreads(1);
                        }
                    }
                }
            }
            catch (InterruptedException e)
            {
                ignore=true;
                LOG.ignore(e);
            }
            catch (Throwable e)
            {
                LOG.warn(e);
            }
            finally
            {
                if (!shrink && isRunning())
                {
                    if (!ignore)
                        LOG.warn("Unexpected thread death: {} in {}",this,QueuedThreadPool.this);
                    if (_threadsStarted.decrementAndGet()<getMaxThreads())
                        startThreads(1);
                }
                _threads.remove(Thread.currentThread());
            }
        }
    };
複製代碼

總結

能夠看出QueuedThreadPool比jdk中的ThreadPoolExecutor確實簡單的多,而回到最開始的問題,線程池size遲遲不降低的緣由就是線程池對idleTimeout的處理方式:當線程出於idle狀態時,它就有可能被結束,但只是有可能而不是必定,由於每次結束一個線程都要間隔idleTimeout指定的時間。而咱們的項目裏指定線程數最大是2000,idleTimeout是5min,就表示每5min纔會結束一個線程,因此線程池size降低的速度纔會這麼慢。