《java 8 實戰》讀書筆記 -第十一章 CompletableFuture:組合式異步編程

1、Future 接口

在Future中觸發那些潛在耗時的操做把調用線程解放出來,讓它能繼續執行其餘有價值的工做,再也不須要呆呆等待耗時的操做完成。打個比方,你能夠把它想象成這樣的場景:你拿了一袋子衣服到你中意的乾洗店去洗。乾洗店的員工會給你張發票,告訴你何時你的衣服會洗好(這就是一個Future事件)。衣服乾洗的同時,你能夠去作其餘的事情。Future的另外一個優勢是它比更底層的Thread更易用。要使用Future,一般你只須要將耗時的操做封裝在一個Callable對象中,再將它提交給ExecutorService,就萬事大吉了。java

ExecutorService executor = Executors.newCachedThreadPool(); 
Future<Double> future = executor.submit(new Callable<Double>() { 
 public Double call() {
 return doSomeLongComputation(); 
 }}); 
doSomethingElse(); //異步操做進行的同時,你能夠作其餘的事情
try { 
 Double result = future.get(1, TimeUnit.SECONDS); //獲取異步操做的結果,若是最終被阻塞,沒法獲得結
//果,那麼在最多等待1秒鐘以後退出
} catch (ExecutionException ee) {
 // 計算拋出一個異常
} catch (InterruptedException ie) { 
 // 當前線程在等待過程當中被中斷
} catch (TimeoutException te) { 
 // 在Future對象完成以前超過已過時
}
同步API與異步API

同步API其實只是對傳統方法調用的另外一種稱呼:你調用了某個方法,調用方在被調用方運行的過程當中會等待,被調用方運行結束返回,調用方取得被調用方的返回值並繼續運行。即便調用方和被調用方在不一樣的線程中運行,調用方仍是須要等待被調用方結束運行,這就是阻塞式調用這個名詞的由來。數組

與此相反,異步API會直接返回,或者至少在被調用方計算完成以前,將它剩餘的計算任務交給另外一個線程去作,該線程和調用方是異步的——這就是非阻塞式調用的由來。執行剩餘計算任務的線程會將它的計算結果返回給調用方。返回的方式要麼是經過回調函數,要麼是由調用方再次執行一個「等待,直到計算完成」的方法調用。這種方式的計算在I/O系統程序設計中很是常見:你發起了一次磁盤訪問,此次訪問和你的其餘計算操做是異步的,你完成其餘的任務時,磁盤塊的數據可能還沒載入到內存,你只須要等待數據的載入完成。服務器

2、實現異步 API

使用CompletableFuture後,getPriceAsync方法的實現網絡

public Future<Double> getPriceAsync(String product) {
 CompletableFuture<Double> futurePrice = new CompletableFuture<>(); 
 new Thread( () -> { 
 double price = calculatePrice(product); //calculatePrice需長時間計算,任務結束並得出結果時設置
//Future的返回值
 futurePrice.complete(price); 
 }).start(); 
 return futurePrice; //無需等待還沒結束的計算,直接返回Future對象
}

使用異步API:app

Shop shop = new Shop("BestShop"); 
long start = System.nanoTime(); 
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); 
long invocationTime = ((System.nanoTime() - start) / 1_000_000); 
System.out.println("Invocation returned after " + invocationTime 
 + " msecs"); 
// 執行更多任務,好比查詢其餘商店
doSomethingElse(); 
// 在計算商品價格的同時
try { 
 double price = futurePrice.get(); //從Future對象中讀取價格,若是價格未知,會發生阻塞
 System.out.printf("Price is %.2f%n", price); 
} catch (Exception e) { 
 throw new RuntimeException(e);
} 
long retrievalTime = ((System.nanoTime() - start) / 1_000_000); 
System.out.println("Price returned after " + retrievalTime + " msecs");
Stream和CompletableFuture的設計都遵循了相似的模式:它們都使用了Lambda表達式以及流水線的思想。CompletableFuture和Future的關係就跟Stream和Collection的關係同樣。

錯誤處理
若是計算商品價格的方法出現異常,用於提示錯誤的異常會被限制在試圖計算商品價格的當前線程的範圍內,最終會殺死該線程,而這會致使等待get方法返回結果的客戶端永久地被阻塞。爲了不這種狀況,你須要使用CompletableFuture的completeExceptionally方法將致使CompletableFuture內發生問題的異常拋出。
拋出CompletableFuture內的異常:異步

public Future<Double> getPriceAsync( String product )
{
    CompletableFuture<Double> futurePrice = new CompletableFuture<>();
    new Thread( () - > {
                try {
                    double price = calculatePrice( product );
                    futurePrice.complete( price );
                } catch ( Exception ex ) {
                    futurePrice.completeExceptionally( ex );
                }
            } ).start();
    return(futurePrice);
}

使用工廠方法supplyAsync建立CompletableFuture對象:函數

public Future<Double> getPriceAsync(String product) { 
 return CompletableFuture.supplyAsync(() -> calculatePrice(product)); 
}

此處getPriceAsync方法返回的CompletableFuture對象和上面你手工建立和完成的CompletableFuture對象是徹底等價的,這意味着它提供了一樣的錯誤管理機制supplyAsync方法接受一個生產者(Supplier)做爲參數,返回一個CompletableFuture對象,該對象完成異步執行後會讀取調用生產者方法的返回值。生產者方法會交由ForkJoinPool池中的某個執行線程(Executor)運行,可是你也可使用supplyAsync方法的重載版本,傳遞第二個參數指定不一樣的執行線程執行生產者方法。spa

3、讓你的代碼免受阻塞之苦

在全部店鋪中找出同一商品的價格,使用CompletableFuture實現findPrices方法線程

public List<String> findPrices(String product) { 
 List<CompletableFuture<String>> priceFutures = 
 shops.stream() 
 .map(shop -> CompletableFuture.supplyAsync( 
 () -> shop.getName() + " price is " +
 shop.getPrice(product))) 
 .collect(Collectors.toList()); 
 return priceFutures.stream() 
 .map(CompletableFuture::join) 
 .collect(toList()); 
}

這裏使用了兩個不一樣的Stream流水線,而不是在同一個處理流的流水線上一個接一個地放置兩個map操做——這實際上是有原因的。考慮流操做之間的延遲特性,若是你在單一流水線中處理流,發向不一樣商家的請求只能以同步、順序執行的方式纔會成功。所以,每一個建立CompletableFuture對象只能在前一個操做結束以後執行查詢指定商家的動做、通知join方法返回計算結果。設計

CompletableFuture類中的 join方法和Future接口中的get有相同的含義,而且也聲明在Future接口中,它們惟一的不一樣是join不會拋出任何檢測到的異常。使用它你再也不須要使用try/catch語句塊讓你傳遞給第二個map方法的Lambda表達式變得過於臃腫。

圖片描述

使用定製的執行器:

調整線程池的大小
Brian Goetz建議,線程池大小與處理器的利用率之比可使用下面的公式進行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
❑NCPU是處理器的核的數目,能夠經過 Runtime.getRuntime().availableProcessors()獲得
❑UCPU是指望的CPU利用率(該值應該介於0和1之間)
❑W/C是等待時間與計算時間的比率

實際操做中,若是你建立
的線程數比商店的數目更多,反而是一種浪費,由於這樣作以後,你線程池中的有些線程根本沒有機會被使用。出於這種考慮,咱們建議你將執行器使用的線程數,與你須要查詢的商店數目設定爲同一個值,這樣每一個商店都應該對應一個服務線程。不過,爲了不發生因爲商店的數目過多致使服務器超負荷而崩潰,你仍是須要設置一個上限,好比100個線程。代碼清單以下所示。爲「最優價格查詢器」應用定製的執行器:

private final Executor executor = 
 Executors.newFixedThreadPool(Math.min(shops.size(), 100), 
 new ThreadFactory() { 
 public Thread newThread(Runnable r) { 
 Thread t = new Thread(r); 
 t.setDaemon(true); 
 return t; 
 }
});
並行——使用流仍是CompletableFutures?
目前爲止,你已經知道對集合進行並行計算有兩種方式:要麼將其轉化爲並行流,利用map這樣的操做開展工做,要麼枚舉出集合中的每個元素,建立新的線程,在CompletableFuture內對其進行操做。後者提供了更多的靈活性,你能夠調整線程池的大小,而這能幫助你確保總體的計算不會由於線程都在等待I/O而發生阻塞。
咱們對使用這些API的建議以下。
❑若是你進行的是計算密集型的操做,而且沒有I/O,那麼推薦使用Stream接口,由於實現簡單,同時效率也多是最高的(若是全部的線程都是計算密集型的,那就沒有必要建立比處理器核數更多的線程)。
❑反之,若是你並行的工做單元還涉及等待I/O的操做(包括網絡鏈接等待),那麼使用CompletableFuture靈活性更好,你能夠像前文討論的那樣,依據等待/計算,或者W/C的比率設定須要使用的線程數。這種狀況不使用並行流的另外一個緣由是,處理流的流水線中若是發生I/O等待,流的延遲特性會讓咱們很難判斷到底何時觸發了等待。

4、對多個異步任務進行流水線操做

1.thenCompose

使用CompletableFuture實現findPrices方法(獲取商品折扣後價格):

public List<String> findPrices(String product) {
    List<CompletableFuture<String>> priceFutures =
     shops.stream() 
     .map(shop -> CompletableFuture.supplyAsync( 
     () -> shop.getPrice(product), executor))//getPrice耗時操做,獲取商品的價格字符串,使用異步方式
     .map(future -> future.thenApply(Quote::parse)) //將價格字符串解析成Quote對象(包裝了價格,折扣率等)
     .map(future -> future.thenCompose(quote -> 
     CompletableFuture.supplyAsync( 
     () -> Discount.applyDiscount(quote), executor))) //異步計算商品最終價格
     .collect(toList());
    return priceFutures.stream() 
     .map(CompletableFuture::join) //等待流中的全部Future執行完畢,並提取各自的返回值
     .collect(toList());
}
thenapply()是返回的是非CompletableFuture類型:它的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>。
thenCompose()用來鏈接兩個CompletableFuture,返回值是新的CompletableFuture:
thenCompose方法容許你對兩個異步操做進行流水線,第一個操做完成時,將其結果做爲參數傳遞給第二個操做。
CompletableFuture類中的其餘方法同樣,也提供了一個以 Async後綴結尾的版本 thenComposeAsync。一般而言,名稱中不帶Async
的方法和它的前一個任務同樣,在同一個線程中運行;而名稱以Async結尾的方法會將後續的任務提交到一個線程池,因此每一個任務是由不一樣的線程處理的。

2.用thenCombine將兩個 CompletableFuture 對象整合起來,不管它們是否存在依賴

thenCombine方法,它接收名爲BiFunction的第二參數,這個參數定義了當兩個CompletableFuture對象完成計算後,結果如何合併。同thenCompose方法同樣,thenCombine方法也提供有一個Async的版本。這裏,若是使用thenCombineAsync會致使BiFunction中定義的合併操做被提交到線程池中,由另外一個任務以異步的方式執行

eg:有一家商店提供的價格是以歐元(EUR)計價的,可是你但願以美圓的方式提供給你的客戶:

Future<double> futurePriceInUSD = 
 CompletableFuture.supplyAsync(() -> shop.getPrice(product)) 
 .thenCombine( 
 CompletableFuture.supplyAsync( 
 () -> exchangeService.getRate(Money.EUR, Money.USD)), 
 (price, rate) -> price * rate 
 );

5、響應 CompletableFuture 的 completion 事件

只要有商店返回商品價格就在第一時間顯示返回值,再也不等待那些還未返回的商店(有些甚至會發生超時)。Java 8的CompletableFuture通 過thenAccept方法提供了這一功能,它接收CompletableFuture執行完畢後的返回值作參數。

重構findPrices方法返回一個由Future構成的流

public Stream<CompletableFuture<String>> findPricesStream(String product) { 
 return shops.stream() 
 .map(shop -> CompletableFuture.supplyAsync( 
 () -> shop.getPrice(product), executor)) 
 .map(future -> future.thenApply(Quote::parse)) 
 .map(future -> future.thenCompose(quote -> 
 CompletableFuture.supplyAsync( 
 () -> Discount.applyDiscount(quote), executor))); 
}

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

由 於thenAccept方法已經定義瞭如何處理CompletableFuture返回的結果,一旦CompletableFuture計算獲得結果,它就返回一個CompletableFuture<Void>。對這個<CompletableFuture<Void>>對象,你能作的事很是有限,只能等待其運行結束。

你還但願能給最慢的商店一些機會,讓它有機會打印輸出返回的價格。爲了實現這一目的,你能夠把構成Stream的全部CompletableFuture<Void>對象放到一個數組中,等待全部的任務執行完成,代碼以下所示:

CompletableFuture[] futures = findPricesStream("myPhone") 
 .map(f -> f.thenAccept(System.out::println)) 
 .toArray(size -> new CompletableFuture[size]); 
CompletableFuture.allOf(futures).join();

allOf工廠方法接收一個由CompletableFuture構成的數組,數組中的全部CompletableFuture對象執行完成以後,它返回一個CompletableFuture<Void>對象。這意味着,若是你須要等待最初Stream中的全部 CompletableFuture對象執行完畢,對 allOf方法返回的
CompletableFuture執行join操做是個不錯的主意。

你可能但願只要CompletableFuture對象數組中有任何一個執行完畢就再也不等待,好比,你正在查詢兩個匯率服務器,任何一個返回告終果都能知足你的需求。在這種狀況下,你可使用一個相似的工廠方法anyOf。該方法接收一個CompletableFuture對象構成的數組,返回由第一個執行完畢的CompletableFuture對象的返回值構成的CompletableFuture<Object>。