面試官問如何結合Apollo構建動態線程池,咱們聊了二十分鐘

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習java

0 文章概述

流量洪峯是互聯網生產環境常常遇到的場景,例如某個時間點進行商品搶購活動,或者某個時間點集中觸發定時任務,這些場景都有可能引起流量洪峯,因此如何應對流量洪峯是咱們必須面對的問題。node

縱向維度咱們能夠從代理層、WEB層、服務層、緩存層、數據層進行思考,橫向維度咱們能夠從高頻檢測、緩存前置、節點冗餘、服務降級等方向進行思考。本文咱們從服務層動態調節線程數這個角度進行思考。git

動態線程池是指咱們能夠根據流量的不一樣調節線程池某些參數,例如能夠在業務低峯期調低線程數,在業務高峯期調高線程數增長處理線程從而應對流量洪峯。本文咱們結合Apollo和線程池實現一個動態線程池。github

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

1 線程池基礎

1.1 七個參數

咱們首先回顧一下Java線程池七大參數,查看源碼ThreadPoolExecutor構造函數以下:面試

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,                               int maximumPoolSize,                               long keepAliveTime,                               TimeUnit unit,                               BlockingQueue<Runnable> workQueue,                               ThreadFactory threadFactory,                               RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                   null :
                   AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

(1) corePoolSize

線程池核心線程數,類比業務大廳開設的固定窗口。例如業務大廳開設2個固定窗口,那麼這兩個窗口不會關閉,全天都會進行業務辦理數據庫

(2) workQueue

存儲已提交但還沒有執行的任務,類比業務大廳等候區。例如業務大廳一開門進來不少顧客,2個固定窗口進行業務辦理,其餘顧客到等候區等待緩存

(3) maximumPoolSize

線程池能夠容納同時執行最大線程數,類比業務大廳最大窗口數。例如業務大廳最大窗口數是5個,業務員看到2個固定窗口和等候區都滿了,能夠臨時增長3個窗口微信

(4) keepAliveTime

非核心線程數存活時間。當業務不忙時剛纔新增的3個窗口須要關閉,空閒時間超過keepAliveTime空閒會被關閉架構

(5) unit

keepAliveTime存活時間單位app

(6) threadFactory

線程工廠能夠用來指定線程名

(7) handler

線程池線程數已達到maximumPoolSize且隊列已滿時執行拒絕策略。例如業務大廳5個窗口所有處於忙碌狀態且等候區已滿,業務員根據實際狀況選擇拒絕策略

1.2 四種拒絕策略

(1) AbortPolicy

默認策略直接拋出RejectExecutionException阻止系統正常運行

/**  * AbortPolicy  *  * @author 微信公衆號「JAVA前線」  *  */
public class AbortPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), abortPolicy);
        for (int i = 0; i < 100; i++) {
            executor.execute(new Runnable() {
                @Override                 public void run() {
                    System.out.println(Thread.currentThread().getName() + " -> run");
                }
            });
        }
    }
}

程序執行結果:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xy.juc.threadpool.reject.AbortPolicyTest$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.xy.juc.threadpool.reject.AbortPolicyTest.main(AbortPolicyTest.java:21)

 

(2) CallerRunsPolicy

任務交給調用者本身運行

/**  * CallerRunsPolicy  *  * @author 微信公衆號「JAVA前線」  *  */
public class CallerRunsPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), callerRunsPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override                 public void run() {
                    System.out.println(Thread.currentThread().getName() + " -> run");
                }
            });
        }
    }
}

程序執行結果:

main -> run
pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
main -> run
main -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
main -> run
pool-1-thread-2 -> run

 

(3) DiscardOldestPolicy

拋棄隊列中等待最久的任務不會拋出異常

/**  * DiscardOldestPolicy  *  * @author 今日頭條號「JAVA前線」  *  */
public class DiscardOldestPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardOldestPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override                 public void run() {
                    System.out.println(Thread.currentThread().getName() + " -> run");
                }
            });
        }
    }
}

程序執行結果:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run

 

(4) DiscardPolicy

直接丟棄任務不會拋出異常

/**  * DiscardPolicy  *  * @author 今日頭條號「JAVA前線」  *  */
public class DiscardPolicyTest {
    public static void main(String[] args) {
        int coreSize = 1;
        int maxSize = 2;
        int queueSize = 1;
        DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardPolicy);
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override                 public void run() {
                    System.out.println(Thread.currentThread().getName() + " -> run");
                }
            });
        }
    }
}

程序執行結果:

pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run

 

1.3 修改參數

若是初始化線程池完成後,咱們是否能夠修改線程池某些參數呢?答案是能夠。咱們選擇線程池提供的四個修改方法進行源碼分析。

(1) setCorePoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        // 新核心線程數減去原核心線程數
        int delta = corePoolSize - this.corePoolSize;
        // 新核心線程數賦值
        this.corePoolSize = corePoolSize;
        // 若是當前線程數大於新核心線程數
        if (workerCountOf(ctl.get()) > corePoolSize)
            // 中斷空閒線程
            interruptIdleWorkers();
        // 若是須要新增線程則經過addWorker增長工做線程
        else if (delta > 0) {
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(nulltrue)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
}

 

(2) setMaximumPoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        // 若是當前線程數量大於新最大線程數量
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            // 中斷空閒線程
            interruptIdleWorkers();
    }
}

 

(3) setKeepAliveTime

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        // 新超時時間減去原超時時間
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        // 若是新超時時間小於原超時時間
        if (delta < 0)
            // 中斷空閒線程
            interruptIdleWorkers();
    }
}

 

(4) setRejectedExecutionHandler

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        if (handler == null)
            throw new NullPointerException();
        // 設置拒絕策略
        this.handler = handler;
    }
}

如今咱們知道上述線程池調整參數的方法,但僅僅分析到此是不夠的,由於若是沒有動態調整參數的方法,那麼每次修改必須從新發布才能夠生效,那麼有沒有方法不用發佈就能夠動態調整線程池參數呢?

 

2 Apollo配置中心

2.1 核心原理

Apollo是攜程框架部門研發的分佈式配置中心,可以集中化管理應用不一樣環境、不一樣集羣的配置,配置修改後可以實時推送到應用端,而且具有規範的權限、流程治理等特性,適用於微服務配置管理場景,開源地址以下:

https://github.com/ctripcorp/apollo

第一步用戶在配置中心修改配置項,第二步配置中心通知Apollo客戶端有配置更新,第三步Apollo客戶端從配置中心拉取最新配置,更新本地配置並通知到應用,官網基礎模型圖以下:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

配置中心配置項發生變化客戶端如何感知呢?分爲推和拉兩種方式。推依賴客戶端和服務端保持了一個長鏈接,發生數據變化時服務端推送信息給客戶端,這就是長輪詢機制。拉依賴客戶端定時從配置中心服務端拉取應用最新配置,這是一個fallback機制。官網客戶端設計圖以下:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

本文重點分析配置更新推送方式,咱們首先看官網服務端設計圖:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

ConfigService模塊提供配置的讀取推送等功能,服務對象是Apollo客戶端。AdminService模塊提供配置的修改發佈等功能,服務對象是Portal模塊即管理界面。須要說明Apollo並無引用消息中間件,發送異步消息是指ConfigService定時掃描異步消息數據表:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

消息數據保存在MySQL消息表:

CREATE TABLE `releasemessage` (
  `Id` int(11unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主鍵',
  `Message` varchar(1024NOT NULL DEFAULT '' COMMENT '發佈的消息內容',
  `DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最後修改時間',
  PRIMARY KEY (`Id`),
  KEY `DataChange_LastTime` (`DataChange_LastTime`),
  KEY `IX_Message` (`Message`(191))
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='發佈消息'

Apollo核心原理本文暫時分析到這裏,後續我將寫文章經過源碼分析Apollo長輪詢機制工做原理請繼續關注。

 

2.2 實例分析

2.2.1 服務端安裝

服務端關鍵步驟是導入數據庫和修改端口號,具體步驟請參看官方網站:

https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

啓動成功後訪問地址:

http://localhost:8070

 

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

輸入用戶名apollo、密碼admin:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

進入我以前建立的myApp項目,咱們看到在DEV環境、default集羣、application命名空間包含一個timeout配置項:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

2.2.2 應用程序

(1) 引入依賴

<dependencies>
  <dependency>
    <groupId>com.ctrip.framework.apollo</groupId>
    <artifactId>apollo-client</artifactId>
    <version>1.7.0</version>
  </dependency>
</dependencies>

(2) 簡單實例

public class GetApolloConfigTest extends BaseTest {

    /**      * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080      *      * myApp+DEV+default+application      */
    @Test     public void testGet() throws InterruptedException {
        Config appConfig = ConfigService.getAppConfig();
        while (true) {
            String value = appConfig.getProperty("timeout""200");
            System.out.println("timeout=" + value);
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

由於上述程序是經過while(true)不斷讀取配置項的值,因此程序輸出結果以下:

timeout=100
timeout=100
timeout=100
timeout=100
timeout=100
timeout=100

如今把配置項的值改成200程序輸出結果以下:

timeout=100
timeout=100
timeout=100
timeout=100
timeout=200
timeout=200
timeout=200

(3) 監聽實例

生產環境咱們通常不用while(true)監聽變化,而是經過註冊監聽器方式感知變化信息:

public class GetApolloConfigTest extends BaseTest {

    /**      * 監聽命名空間變化      *      * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080      *      * myApp+DEV+default+application      */
    @Test
    public void testListen() throws InterruptedException {
        Config config = ConfigService.getConfig("application");
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                System.out.println("發生變化命名空間=" + changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    System.out.println(String.format("發生變化key=%s,oldValue=%s,newValue=%s,changeType=%s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));
                }
            }
        });
        Thread.sleep(1000000L);
    }
}

咱們把timeout值從200改成300,客戶端能夠監聽到這個變化,程序輸出結果以下:

發生變化命名空間=application
發生變化key=timeout,oldValue=200,newValue=300,changeType=MODIFIED

 

3 動態線程池

如今咱們能夠把線程池和Apollo結合起來構建動態線程池。首先咱們用默認值構建一個線程池,而後線程池會監聽Apollo相關配置項,若是相關配置有變化則刷新相關線程池參數。第一步在Apollo配置中心設置三個線程池參數(本文省略拒絕策略設置):

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

第二步編寫核心代碼:

/**  * 動態線程池工廠  *  * @author 今日頭條號「JAVA前線」  *  */
@Slf4j
@Component
public class DynamicThreadPoolFactory {
    private static final String NAME_SPACE = "threadpool-config";

    /** 線程執行器 **/
    private volatile ThreadPoolExecutor executor;

    /** 核心線程數 **/
    private Integer CORE_SIZE = 10;

    /** 最大值線程數 **/
    private Integer MAX_SIZE = 20;

    /** 等待隊列長度 **/
    private Integer QUEUE_SIZE = 2000;

    /** 線程存活時間 **/
    private Long KEEP_ALIVE_TIME = 1000L;

    /** 線程名 **/
    private String threadName;

    public DynamicThreadPoolFactory() {
        Config config = ConfigService.getConfig(NAME_SPACE);
        init(config);
        listen(config);
    }

    /**      * 初始化      */
    private void init(Config config) {
        if (executor == null) {
            synchronized (DynamicThreadPoolFactory.class) {
                if (executor == null) {
                    String coreSize = config.getProperty(KeysEnum.CORE_SIZE.getNodeKey(), CORE_SIZE.toString());
                    String maxSize = config.getProperty(KeysEnum.MAX_SIZE.getNodeKey(), MAX_SIZE.toString());
                    String keepAliveTIme = config.getProperty(KeysEnum.KEEP_ALIVE_TIME.getNodeKey(), KEEP_ALIVE_TIME.toString());
                    BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);
                    executor = new ThreadPoolExecutor(Integer.valueOf(coreSize), Integer.valueOf(maxSize), Long.valueOf(keepAliveTIme), TimeUnit.MILLISECONDS, queueToUse, new NamedThreadFactory(threadName, true), new AbortPolicyDoReport(threadName));
                }
            }
        }
    }

    /**      * 監聽器      */
    private void listen(Config config) {
        config.addChangeListener(new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
                log.info("命名空間發生變化={}", changeEvent.getNamespace());
                for (String key : changeEvent.changedKeys()) {
                    ConfigChange change = changeEvent.getChange(key);
                    String newValue = change.getNewValue();
                    refreshThreadPool(key, newValue);
                    log.info("發生變化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());
                }
            }
        });
    }

    /**      * 刷新線程池      */
    private void refreshThreadPool(String key, String newValue) {
        if (executor == null) {
            return;
        }
        if (KeysEnum.CORE_SIZE.getNodeKey().equals(key)) {
            executor.setCorePoolSize(Integer.valueOf(newValue));
            log.info("修改核心線程數key={},value={}", key, newValue);
        }
        if (KeysEnum.MAX_SIZE.getNodeKey().equals(key)) {
            executor.setMaximumPoolSize(Integer.valueOf(newValue));
            log.info("修改最大線程數key={},value={}", key, newValue);
        }
        if (KeysEnum.KEEP_ALIVE_TIME.getNodeKey().equals(key)) {
            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
            log.info("修改活躍時間key={},value={}", key, newValue);
        }
    }

    public ThreadPoolExecutor getExecutor(String threadName) {
        return executor;
    }

    enum KeysEnum {

        CORE_SIZE("coreSize""核心線程數"),

        MAX_SIZE("maxSize""最大線程數"),

        KEEP_ALIVE_TIME("keepAliveTime""線程活躍時間")

        ;

        private String nodeKey;
        private String desc;

        KeysEnum(String nodeKey, String desc) {
            this.nodeKey = nodeKey;
            this.desc = desc;
        }

        public String getNodeKey() {
            return nodeKey;
        }

        public void setNodeKey(String nodeKey) {
            this.nodeKey = nodeKey;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }
    }
}

/**  * 動態線程池執行器  *  * @author 今日頭條號「JAVA前線」  *  */
@Component
public class DynamicThreadExecutor {

    @Resource
    private DynamicThreadPoolFactory threadPoolFactory;

    public void execute(String bizName, Runnable job) {
        threadPoolFactory.getExecutor(bizName).execute(job);
    }

    public Future<?> sumbit(String bizName, Runnable job) {
        return threadPoolFactory.getExecutor(bizName).submit(job);
    }
}

 

第三步運行測試用例:

/**  * 動態線程池測試  *  * @author 今日頭條號「JAVA前線」  *  */
public class DynamicThreadExecutorTest extends BaseTest {

    @Resource
    private DynamicThreadExecutor dynamicThreadExecutor;

    /**      * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080      *      * myApp+DEV+default+thread-pool      */
    @Test
    public void testExecute() throws InterruptedException {
        while (true) {
            dynamicThreadExecutor.execute("bizName"new Runnable() {
                @Override
                public void run() {
                    System.out.println("bizInfo");
                }
            });
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

 

第四步經過VisualVM觀察線程數:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

咱們在配置中心修改配置項把核心線程數設置爲50,最大線程數設置爲100,經過VisualVM能夠觀察到線程數顯著上升:

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

面試官問如何結合Apollo構建動態線程池,咱們聊了三十分鐘

 

4 文章總結

本文咱們首先介紹了線程池基礎知識,包括七大參數和四個拒絕策略,隨後咱們介紹了Apollo配置中心的原理和應用,最後咱們將線程池和配置中心相結合,實現了動態調整線程數的效果,但願本文對你們有所幫助。

歡迎你們關注公衆號「JAVA前線」查看更多精彩分享文章,主要包括源碼分析、實際應用、架構思惟、職場分享、產品思考等等,同時歡迎你們加我我的微信「java_front」一塊兒交流學習