亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

Java并發中的Fork/Join 框架機制詳解

瀏覽:4日期:2022-08-08 17:21:52
什么是 Fork/Join 框架

Fork/Join 框架是一種在 JDk 7 引入的線程池,用于并行執行把一個大任務拆成多個小任務并行執行,最終匯總每個小任務結果得到大任務結果的特殊任務。通過其命名也很容易看出框架主要分為 Fork 和 Join 兩個階段,第一階段 Fork 是把一個大任務拆分為多個子任務并行的執行,第二階段 Join 是合并這些子任務的所有執行結果,最后得到大任務的結果。

這里不難發現其執行主要流程:首先判斷一個任務是否足夠小,如果任務足夠小,則直接計算,否則,就拆分成幾個更小的小任務分別計算,這個過程可以反復的拆分成一系列小任務。Fork/Join 框架是一種基于 分治 的算法,通過拆分大任務成多個獨立的小任務,然后并行執行這些小任務,最后合并小任務的結果得到大任務的最終結果,通過并行計算以提高效率。。

Fork/Join 框架使用示例

下面通過一個計算列表中所有元素的總和的示例來看看 Fork/Join 框架是如何使用的,總的思路是:將這個列表分成許多子列表,然后對每個子列表的元素進行求和,然后,我們再計算所有這些值的總和就得到原始列表的和了。Fork/Join 框架中定義了 ForkJoinTask 來表示一個 Fork/Join 任務,其提供了 fork()、join() 等操作,通常情況下,我們并不需要直接繼承這個 ForkJoinTask 類,而是使用框架提供的兩個 ForkJoinTask 的子類:

RecursiveAction 用于表示沒有返回結果的 Fork/Join 任務。 RecursiveTask 用于表示有返回結果的 Fork/Join 任務。

很顯然,在這個示例中是需要返回結果的,可以定義 SumAction 類繼承自 RecursiveTask,代碼入下:

/** * @author mghio * @since 2021-07-25 */public class SumTask extends RecursiveTask<Long> { private static final int SEQUENTIAL_THRESHOLD = 50; private final List<Long> data; public SumTask(List<Long> data) { this.data = data; } @Override protected Long compute() { if (data.size() <= SEQUENTIAL_THRESHOLD) { long sum = computeSumDirectly(); System.out.format('Sum of %s: %dn', data.toString(), sum); return sum; } else { int mid = data.size() / 2; SumTask firstSubtask = new SumTask(data.subList(0, mid)); SumTask secondSubtask = new SumTask(data.subList(mid, data.size())); // 執行子任務 firstSubtask.fork(); secondSubtask.fork(); // 等待子任務執行完成,并獲取結果 long firstSubTaskResult = firstSubtask.join(); long secondSubTaskResult = secondSubtask.join(); return firstSubTaskResult + secondSubTaskResult; } } private long computeSumDirectly() { long sum = 0; for (Long l : data) { sum += l; } return sum; } public static void main(String[] args) { Random random = new Random(); List<Long> data = random.longs(1_000, 1, 100).boxed().collect(Collectors.toList()); ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(data); pool.invoke(task); System.out.println('Sum: ' + pool.invoke(task)); }}

這里當列表大小小于 SEQUENTIAL_THRESHOLD 變量的值(閾值)時視為小任務,直接計算求和列表元素結果,否則再次拆分為小任務,運行結果如下:

Java并發中的Fork/Join 框架機制詳解

通過這個示例代碼可以發現,Fork/Join 框架 中 ForkJoinTask 任務與平常的一般任務的主要不同點在于:ForkJoinTask 需要實現抽象方法 compute() 來定義計算邏輯,在這個方法里一般通用的實現模板是,首先先判斷當前任務是否是小任務,如果是,就執行執行任務,如果不是小任務,則再次拆分為兩個子任務,然后當每個子任務調用 fork() 方法時,會再次進入到 compute() 方法中,檢查當前任務是否需要再拆分為子任務,如果已經是小任務,則執行當前任務并返回結果,否則繼續分割,最后調用 join() 方法等待所有子任務執行完成并獲得執行結果。偽代碼如下:

if (problem is small) { directly solve problem.} else { Step 1. split problem into independent parts. Step 2. fork new subtasks to solve each part. Step 3. join all subtasks. Step 4. compose result from subresults.}Fork/Join 框架設計

Fork/Join 框架核心思想是把一個大任務拆分成若干個小任務,然后匯總每個小任務的結果最終得到大任務的結果,如果讓你設計一個這樣的框架,你會如何實現呢?(建議思考一下),Fork/Join 框架的整個流程正如其名所示,分為兩個步驟:

大任務分割 需要有這么一個的類,用來將大任務拆分為子任務,可能一次拆分后的子任務還是比較大,需要多次拆分,直到拆分出來的子任務符合我們定義的小任務才結束。 執行任務并合并任務結果 第一步拆分出來的子任務分別存放在一個個 雙端隊列 里面(P.S. 這里為什么要使用雙端隊列請看下文),然后每個隊列啟動一個線程從隊列中獲取任務執行。這些子任務的執行結果都會放到一個統一的隊列中,然后再啟動一個線程從這個隊列中拿數據,最后合并這些數據返回。

Fork/Join 框架使用了如下兩個類來完成以上兩個步驟:

ForkJoinTask 類 在上文的實例中也有提到,表示 ForkJoin 任務,在使用框架時首先必須先定義任務,通常只需要繼承自 ForkJoinTask 類的子類 RecursiveAction(無返回結果) 或者 RecursiveTask(有返回結果)即可。 ForkJoinPool 從名字也可以猜到一二了,就是用來執行 ForkJoinTask 的線程池。大任務拆分出的子任務會添加到當前線程的雙端隊列的頭部。

喜歡思考的你,心中想必會想到這么一種場景,當我們需要完成一個大任務時,會先把這個大任務拆分為多個獨立的子任務,這些子任務會放到獨立的隊列中,并為每個隊列都創建一個單獨的線程去執行隊列里的任務,即這里線程和隊列時一對一的關系,那么當有的線程可能會先把自己隊列的任務執行完成了,而有的線程則沒有執行完成,這就導致一些先執行完任務的線程干等了,這是個好問題。

既然是做并發的,肯定要最大程度壓榨計算機的性能,對于這種場景并發大師 Doug Lea 使用了工作竊取算法處理,使用工作竊取算法后,先完成自己隊列中任務的線程會去其它線程的隊列中”竊取“一個任務來執行,哈哈,一方有難,八方支援。但是此時這個線程和隊列的持有線程會同時訪問同一個隊列,所以為了減少竊取任務的線程和被竊取任務的線程之間的競爭,ForkJoin 選擇了雙端隊列這種數據結構,這樣就可以按照這種規則執行任務了:被竊取任務的線程始終從隊列頭部獲取任務并執行,竊取任務的線程使用從隊列尾部獲取任務執行。這個算法在絕大部分情況下都可以充分利用多線程進行并行計算,但是在雙端隊列里只有一個任務等極端情況下還是會存在一定程度的競爭。

Java并發中的Fork/Join 框架機制詳解

Fork/Join 框架實現原理

Fork/Join 框架的實現核心是 ForkJoinPool 類,該類的重要組成部分為 ForkJoinTask 數組和 ForkJoinWorkerThread 數組,其中 ForkJoinTask 數組用來存放框架使用者給提交給 ForkJoinPool 的任務,ForkJoinWorkerThread 數組則負責執行這些任務。任務有如下四種狀態:

NORMAL 已完成

CANCELLED 被取消

SIGNAL 信號

EXCEPTIONAL 發生異常

下面來看看這兩個類的核心方法實現原理,首先來看 ForkJoinTask 的 fork() 方法,源碼如下:

Java并發中的Fork/Join 框架機制詳解

方法對于 ForkJoinWorkerThread 類型的線程,首先會調用 ForkJoinWorkerThread 的 workQueue 的 push() 方法異步的去執行這個任務,然后馬上返回結果。繼續跟進 ForkJoinPool 的 push() 方法,源碼如下:

Java并發中的Fork/Join 框架機制詳解

方法將當前任務添加到 ForkJoinTask 任務隊列數組中,然后再調用 ForkJoinPool 的 signalWork 方法創建或者喚醒一個工作線程來執行該任務。然后再來看看 ForkJoinTask 的 join() 方法,方法源碼如下:

Java并發中的Fork/Join 框架機制詳解

Java并發中的Fork/Join 框架機制詳解

方法首先調用了 doJoin() 方法,該方法返回當前任務的狀態,根據返回的任務狀態做不同的處理:

已完成狀態則直接返回結果 被取消狀態則直接拋出異常(CancellationException) 發生異常狀態則直接拋出對應的異常

繼續跟進 doJoin() 方法,方法源碼如下:

Java并發中的Fork/Join 框架機制詳解

方法首先判斷當前任務狀態是否已經執行完成,如果執行完成則直接返回任務狀態。如果沒有執行完成,則從任務數組中(workQueue)取出任務并執行,任務執行完成則設置任務狀態為 NORMAL,如果出現異常則記錄異常并設置任務狀態為 EXCEPTIONAL(在 doExec() 方法中)。

總結

本文主要介紹了 Java 并發框架中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設計方式和部分實現源碼。Fork/Join 框架在 JDK 的官方標準庫中也有應用。比如 JDK 1.8+ 標準庫提供的 Arrays.parallelSort(array) 可以進行并行排序,它的原理就是內部通過 Fork/Join 框架對大數組分拆進行并行排序,可以提高排序的速度,還有集合中的 Collection.parallelStream() 方法底層也是基于 Fork/Join 框架實現的,最后就是定義小任務的閾值往往是需要通過測試驗證才能合理給出,并且保證程序可以達到最好的性能。

到此這篇關于Java 并發中的Fork/Join 框架機制詳解的文章就介紹到這了,更多相關Java Fork/Join 框架內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Java
相關文章:
主站蜘蛛池模板: 起碰97| 国产精品99久久久久久夜夜嗨 | 久久精品亚洲热综合一本奇米 | 午夜dy888理论三级 | 黄色三几片 | 国产精品亚洲专区一区 | 欧美一区永久视频免费观看 | 日本不卡在线视频高清免费 | 国产免费播放一区二区 | 国产精品久久久久久爽爽爽 | 欧美亚洲另类视频 | 日本一级级特黄特色大片 | 国产成人av性色在线影院 | 亚洲国产精品成 | 1000部啪啪未满十八勿入中国 | 国产一二三区在线观看 | 亚洲国产精品毛片∧v卡在线 | 中日黄色大片 | 国内精品久久久久久影院老狼 | 日本免费不卡视频一区二区三区 | 久久久久嫩草影院精品 | 午夜影院欧美 | 亚洲精品第五页中文字幕 | sese在线| 在线播放国产真实女同事 | 国产 欧美 日本 | 久久精品免费播放 | 国产亚洲精品美女一区二区 | ww在线观视频免费观看 | 美国一级特色大黄 | 特级淫片aaaa毛片aa视频 | 欧美一级毛片高清免费观看 | 午夜精品在线 | 六月丁香深爱六月综合激情 | 欧美 日韩 国产 在线 | www.小视频 | 亚洲国产精品久久 | 999精品影视在线观看 | 羞羞一区二区三区四区片 | 日韩视频福利 | 日韩国产在线播放 |