CompletableFuture是jdk1.8引入的實現類。擴展了Future和CompletionStage,是壹個可以在任務完成階段觸發壹些操作Future。簡單的來講就是可以實現異步回調。
對於jdk1.5的Future,雖然提供了異步處理任務的能力,但是獲取結果的方式很不優雅,還是需要通過阻塞(或者輪訓)的方式。如何避免阻塞呢?其實就是註冊回調。
業界結合觀察者模式實現異步回調。也就是當任務執行完成後去通知觀察者。比如Netty的ChannelFuture,可以通過註冊監聽實現異步結果的處理。
通過addListener方法註冊監聽。如果任務完成,會調用notifyListeners通知。
CompletableFuture通過擴展Future,引入函數式編程,通過回調的方式去處理結果。
CompletableFuture的功能主要體現在他的CompletionStage。
可以實現如下等功能
消費和運行的區別:
消費使用執行結果。運行則只是運行特定任務。具體其他功能大家可以根據需求自行查看。
這裏舉個簡單的例子來體驗壹下他的功能。
執行結果
根據結果我們可以看到會有序執行對應任務。
註意:
這裏說明壹下,如果是同壹任務的依賴任務有多個:
上面的結論是通過閱讀源代碼得到的。下面我們深入源代碼。
創建的方法有很多,甚至可以直接new壹個。我們來看壹下supplyAsync異步創建的方法。
入參Supplier,帶返回值的函數。如果是異步方法,並且傳遞了執行器,那麽會使用傳入的執行器去執行任務。否則采用公***的ForkJoin並行線程池,如果不支持並行,新建壹個線程去執行。
這裏我們需要註意ForkJoin是通過守護線程去執行任務的。所以必須有非守護線程的存在才行。
這裏會創建壹個用於返回的CompletableFuture。
然後構造壹個AsyncSupply,並將創建的CompletableFuture作為構造參數傳入。
那麽,任務的執行完全依賴AsyncSupply。
在看postComplete方法之前我們先來看壹下創建依賴任務的邏輯。
上面提到過。thenAcceptAsync是用來消費CompletableFuture的。該方法調用uniAcceptStage。
uniAcceptStage邏輯:
Mark1邏輯:
這裏簡單說壹下,其實mode有同步異步,和叠代。叠代為了避免無限遞歸。
這裏強調壹下d.uniAccept方法的第三個參數。
如果是異步調用(mode>0),傳入null。否則傳入this。
區別看下面代碼。c不為null會調用c.claim方法。
claim方法是邏輯:
this的run任務如下。也就是在異步線程同步調用tryFire方法。達到其被異步線程執行的目的。
看完上面的邏輯,我們基本理解依賴任務的邏輯。
其實就是先判斷源任務是否完成,如果完成,直接在對應線程執行以來任務(如果是同步,則在當前線程處理,否則在異步線程處理)
如果任務沒有完成,直接返回,因為等任務完成之後會通過postComplete去觸發調用依賴任務。
在源任務完成之後會調用。
其實邏輯很簡單,就是叠代堆棧的依賴任務。調用h.tryFire方法。NESTED就是為了避免遞歸死循環。因為FirePost會調用postComplete。如果是NESTED,則不調用。
堆棧的內容其實就是在依賴任務創建的時候加入進去的。上面我們已經提到過。
基本上述源碼已經分析了邏輯。
因為涉及異步等操作,我們需要理壹下(這裏針對全異步任務):
主要是考慮代碼的復用。所以邏輯相對難理解。
postComplete方法會被源任務線程執行完源任務後調用。同樣也可能被依賴任務線程後調用。
執行依賴任務的方法主要就是靠tryFire方法。因為這個方法可能會被多種不同類型線程觸發,所以邏輯也繞壹點。(其他依賴任務線程、源任務線程、當前依賴任務線程)
不得不說Doug Lea的編碼,真的是藝術。代碼的復用性全體現在邏輯上了。