亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

Spring Batch遠程分區的本地Jar包模式的代碼詳解

瀏覽:2日期:2023-08-14 14:35:11

1 前言

Spring Batch遠程分區對于大量數據的處理非常擅長,它的實現有多種方式,如本地Jar包模式、MQ模式、Kubernetes模式。這三種模式的如下:

(1)本地Jar包模式:分區處理的worker為一個Java進程,從jar包啟動,通過jvm參數和數據庫傳遞參數;官方提供示例代碼。

(2)MQ模式:worker是一個常駐進程,Manager和Worker通過消息隊列來傳遞參數;網上有不少相關示例代碼。

(3)Kubernetes模式:worker為K8s中的Pod,Manager直接啟動Pod來處理;網上并沒有找到任何示例代碼。

本文將通過代碼來講解第一種模式(本地Jar包模式),其它后續再介紹。

Spring Batch遠程分區的本地Jar包模式的代碼詳解

建議先看下面文章了解一下:

Spring Batch入門:Spring Batch入門教程篇

Spring Batch并行處理介紹:詳解SpringBoot和SpringBatch 使用

2 代碼講解

本文代碼中,Manager和Worker是放在一起的,在同一個項目里,也只會打一個jar包而已;我們通過profile來區別是manager還是worker,也就是通過Spring Profile實現一份代碼,兩份邏輯。實際上也可以拆成兩份代碼,但放一起更方便測試,而且代碼量不大,就沒有必要了。

2.1 項目準備

2.1.1 數據庫

首先我們需要準備一個數據庫,因為Manager和Worker都需要同步狀態到DB上,不能直接使用嵌入式的內存數據庫了,需要一個外部可共同訪問的數據庫。這里我使用的是H2 Database,安裝可參考:把H2數據庫從jar包部署到Kubernetes,并解決Ingress不支持TCP的問題。

2.1.2 引入依賴

maven引入依賴如下所示:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-task</artifactId></dependency><dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-deployer-local</artifactId> <version>2.4.1</version></dependency><dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId></dependency>

spring-cloud-deployer-local用于部署和啟動worker,非常關鍵;其它就是Spring Batch和Task相關的依賴;以及數據庫連接。

2.1.3 主類入口

Springboot的主類入口如下:

@EnableTask@SpringBootApplication@EnableBatchProcessingpublic class PkslowRemotePartitionJar { public static void main(String[] args) { SpringApplication.run(PkslowRemotePartitionJar.class, args); }}

在Springboot的基礎上,添加了Spring Batch和Spring Cloud Task的支持。

2.2 關鍵代碼編寫

前面的數據庫搭建和其它代碼沒有太多可講的,接下來就開始關鍵代碼的編寫。

2.2.1 分區管理Partitioner

Partitioner是遠程分區中的核心bean,它定義了分成多少個區、怎么分區,要把什么變量傳遞給worker。它會返回一組<分區名,執行上下文>的鍵值對,即返回Map<String, ExecutionContext>。把要傳遞給worker的變量放在ExecutionContext中去,支持多種類型的變量,如String、int、long等。實際上,我們不建議通過ExecutionContext來傳遞太多數據;可以傳遞一些標識或主鍵,然后worker自己去拿數據即可。

具體代碼如下:

private static final int GRID_SIZE = 4;@Beanpublic Partitioner partitioner() { return new Partitioner() { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put('partitionNumber', i); partitions.put('partition' + i, executionContext); } return partitions; } };}

上面分成4個區,程序會啟動4個worker來處理;給worker傳遞的參數是partitionNumber。

2.2.2 分區處理器PartitionHandler

PartitionHandler也是核心的bean,它決定了怎么去啟動worker,給它們傳遞什么jvm參數(跟之前的ExecutionContext傳遞不一樣)。

@Beanpublic PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception { Resource resource = this.resourceLoader.getResource(workerResource); DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, 'workerStep', taskRepository); List<String> commandLineArgs = new ArrayList<>(3); commandLineArgs.add('--spring.profiles.active=worker'); commandLineArgs.add('--spring.cloud.task.initialize-enabled=false'); commandLineArgs.add('--spring.batch.initializer.enabled=false'); partitionHandler .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs)); partitionHandler .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(2); partitionHandler.setApplicationName('PkslowWorkerJob'); return partitionHandler;}

上面代碼中:

resource是worker的jar包地址,表示將啟動該程序;

workerStep是worker將要執行的step;

commandLineArgs定義了啟動worker的jvm參數,如--spring.profiles.active=worker;

environment是manager的系統環境變量,可以傳遞給worker,當然也可以選擇不傳遞;

MaxWorkers是最多能同時啟動多少個worker,類似于線程池大小;設置為2,表示最多同時有2個worker來處理4個分區。

2.2.3 Manager和Worker的Batch定義

完成了分區相關的代碼,剩下的就只是如何定義Manager和Worker的業務代碼了。

Manager作為管理者,不用太多業務邏輯,代碼如下:

@Bean@Profile('!worker')public Job partitionedJob(PartitionHandler partitionHandler) throws Exception { Random random = new Random(); return this.jobBuilderFactory.get('partitionedJob' + random.nextInt()) .start(step1(partitionHandler)) .build();}@Beanpublic Step step1(PartitionHandler partitionHandler) throws Exception { return this.stepBuilderFactory.get('step1') .partitioner(workerStep().getName(), partitioner()) .step(workerStep()) .partitionHandler(partitionHandler) .build();}

Worker主要作用是處理數據,是我們的業務代碼,這里就演示一下如何獲取Manager傳遞過來的partitionNumber:

@Beanpublic Step workerStep() { return this.stepBuilderFactory.get('workerStep') .tasklet(workerTasklet(null, null)) .build();}@Bean@StepScopepublic Tasklet workerTasklet(final @Value('#{stepExecutionContext[’partitionNumber’]}') Integer partitionNumber) { return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { Thread.sleep(6000); //增加延時,查看效果,通過jps:在jar情況下會新起java進程 System.out.println('This tasklet ran partition: ' + partitionNumber); return RepeatStatus.FINISHED; } };}

通過表達式@Value('#{stepExecutionContext[’partitionNumber’]}') 獲取Manager傳遞過來的變量;注意要加注解@StepScope。

3 程序運行

因為我們分為Manager和Worker,但都是同一份代碼,所以我們先打包一個jar出來,不然manager無法啟動。配置數據庫和Worker的jar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/testspring.datasource.username=pkslowspring.datasource.password=pkslowspring.datasource.driver-class-name=org.h2.Driverpkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

執行程序如下:

Spring Batch遠程分區的本地Jar包模式的代碼詳解

可以看到啟動了4次Java程序,還給出日志路徑。

通過jps命令查看,能看到一個Manager進程,還有兩個worker進程:

Spring Batch遠程分區的本地Jar包模式的代碼詳解

4 復雜變量傳遞

前面講了Manager可以通過ExecutionContext傳遞變量,如簡單的String、long等。但其實它也是可以傳遞復雜的Java對象的,但對應的類需要可序列化,如:

import java.io.Serializable;public class Person implements Serializable { private Integer age; private String name; private String webSite; //getter and setter}

Manager傳遞:

executionContext.put('person', new Person(0, 'pkslow', 'www.pkslow.com'));

Worker接收:

@Value('#{stepExecutionContext[’person’]}') Person person

5 總結

本文介紹了Spring Batch遠程分區的本地Jar包模式,只能在一臺機器上運行,所以也是無法真正發揮出遠程分區的作用。但它對我們后續理解更復雜的模式是有很大幫助的;同時,我們也可以使用本地模式進行開發測試,畢竟它只需要一個數據庫就行了,依賴很少。

標簽: Spring
相關文章:
主站蜘蛛池模板: 国产精品久久久久久久久久久威 | 成人精品视频 成人影院 | 美女国产福利视频 | 毛片网站在线观看 | 国产成人在线免费视频 | 91麻豆网| 91精品国产露脸在线 | 欧美一级在线毛片免费观看 | 综合图色 | 日韩精品久久久久久 | 大学生一级毛片高清版 | 美女黄视频大全 | 老妇女性较大毛片 | 国产色婷婷精品综合在线 | 这里只有精品99re在线 | 夜夜夜精品视频免费 | 国产性夜夜性夜夜爽91 | 国产系列欧美系列日韩系列在线 | 亚洲国产精品自产在线播放 | 国产麻豆视频免费观看 | 亚洲精品综合一区二区三区在线 | 中文字幕色综合久久 | 日本一级不卡一二三区免费 | 精品日韩在线视频 | 欧美一级淫片a免费播放口aaa | 欧美一级一级做性视频 | 国产欧美一区二区精品性色99 | 真实国语对白视频播放 | 国产91原创 | 四虎激情做人爰 | 国产床戏做爰免费观看网站 | 久久国产免费观看精品 | 欧美在线观看网站 | 日本a一级毛片免费观看 | 黄在线视频 | 免费视频精品一区二区 | aaa国产精品 | 成人在线小视频 | 你懂的网站在线 | 欧美一级特黄aa大片视频 | 91正在播放极品白嫩在线观看 |