
spring batch进阶-基于RabbitMQ远程分区Step
下面按原理分步骤实施,完成springbatch的远程分区实例。2、spring batch远程分区Step的原理。3、springbatch的远程分区实例。3.3 Integration配置。3.2 Master节点数据分发。3.4 从节点接收分区信息并处理。如下ItemReader。3.1 引入相关依赖。
·
参考:spring batch进阶-基于RabbitMQ远程分区Step
1、前言
关于spring batch概念及基本使用,可移步 《spring batch精选,一文吃透spring batch》,本文主要内容为spring batch的进阶内容,也就是spring batch的扩展(Multithreaded Step 多线程执行一个Step;Parallel Step 通过多线程并行执行多个Step;Remote Chunking 在远端节点上执行分布式Chunk作;Partitioning Step 对数据进行分区,并分开执行;)的Partitioning Step。本文构建的实例可为主服务,从服务,主从混用等模式,可以大大提高spring batch在单机处理时的时效。
本文项目源码: https://gitee.com/kailing/partitionjob
2、spring batch远程分区Step的原理
master节点将数据根据相关逻辑( ID,hash),拆分成一段一段要处理的数据集,然后将数据集放到消息 中间件中( ActiveMQ,RabbitMQ ),从节点监听到消息,获取消息,读取消息中的数据集处理并发回结果。如下图:

3、springbatch的远程分区实例
下面按原理分步骤实施,完成springbatch的远程分区实例
3.1 引入相关依赖
见: https://gitee.com/kailing/partitionjob/blob/master/pom.xml
分区job主要依赖为: spring-batch-integration,提供了远程通讯的能力
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kailing.bootBatch</groupId>
<artifactId>partitionjob</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>partitionjob</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.2 Master节点数据分发
@Profile({"master", "mixed"})
@Bean
public Job job(@Qualifier("masterStep") Step masterStep) {
return jobBuilderFactory.get("endOfDayjob")
.start(masterStep)
.incrementer(new BatchIncrementer())
.listener(new JobListener())
.build();
}
@Bean("masterStep")
public Step masterStep(@Qualifier("slaveStep") Step slaveStep,
PartitionHandler partitionHandler,
DataSource dataSource) {
return stepBuilderFactory.get("masterStep")
.partitioner(slaveStep.getName(), new ColumnRangePartitioner(dataSource))
.step(slaveStep)
.partitionHandler(partitionHandler)
.build();
}
master节点关键部分,其Step需要设置从节点 Step的 Name,和一个 数据分区器,数据分区器需要实现 Partitioner接口,返回Map<String, ExecutionContext>的数据结构,该结构完整的描述了每个从节点需要处理的分区片段。ExecutionContext保存了从节点要处理的数据边界,当然,ExecutionContext里的参数是根据你的业务来的,本例以数据ID为边界划分了每个区。
具体的Partitioner实现如下:
/**
* Created by kl on 2018/3/1.
* Content :根据数据ID分片
*/
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
ColumnRangePartitioner(DataSource dataSource){
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(arcid) from kl_article", Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(arcid) from kl_article", Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
3.3 Integration配置
spring batch Integration提供了远程分区通讯能力,Spring Integration拥有丰富的通道适配器(例如 JMS和 AMQP),基于 ActiveMQ,RabbitMQ等中间件都可以实现远程分区处理。本文使用 RabbitMQ来做为通讯的中间件。关于RabbitMQ的安装等不在本篇范围,下面代码描述了如何配置MQ连接,以及spring batch分区相关队列,消息适配器等。
/**
* Created by kl on 2018/3/1.
* Content :远程分区通讯
*/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbit")
public class IntegrationConfiguration {
private String host;
private Integer port=5672;
private String username;
private String password;
private String virtualHost;
private int connRecvThreads=5;
private int channelCacheSize=10;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(connRecvThreads);
executor.initialize();
connectionFactory.setExecutor(executor);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setChannelCacheSize(channelCacheSize);
return connectionFactory;
}
@Bean
public MessagingTemplate messageTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());
messagingTemplate.setReceiveTimeout(60000000l);
return messagingTemplate;
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundRequests")
public AmqpOutboundEndpoint amqpOutboundEndpoint(AmqpTemplate template) {
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(template);
endpoint.setExpectReply(true);
endpoint.setOutputChannel(inboundRequests());
endpoint.setRoutingKey("partition.requests");
return endpoint;
}
@Bean
public Queue requestQueue() {
return new Queue("partition.requests", false);
}
@Bean
@Profile({"slave","mixed"})
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inboundRequests());
adapter.afterPropertiesSet();
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("partition.requests");
container.setAutoStartup(false);
return container;
}
@Bean
public PollableChannel outboundStaging() {
return new NullChannel();
}
@Bean
public QueueChannel inboundRequests() {
return new QueueChannel();
}
3.4 从节点接收分区信息并处理
@Bean
@Profile({"slave","mixed"})
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
stepLocator.setBeanFactory(this.applicationContext);
stepExecutionRequestHandler.setStepLocator(stepLocator);
stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
return stepExecutionRequestHandler;
}
@Bean("slaveStep")
public Step slaveStep(MyProcessorItem processorItem,
JpaPagingItemReader reader) {
CompositeItemProcessor itemProcessor = new CompositeItemProcessor();
List<ItemProcessor> processorList = new ArrayList<>();
processorList.add(processorItem);
itemProcessor.setDelegates(processorList);
return stepBuilderFactory.get("slaveStep")
.<Article, Article>chunk(1000)//事务提交批次
.reader(reader)
.processor(itemProcessor)
.writer(new PrintWriterItem())
.build();
}
从节点最关键的地方在于 StepExecutionRequestHandler,他会接收 MQ消息中间件中的消息,并从分区信息中获取到需要处理的数据边界, 如下ItemReader:
// 其中的minValuemin,maxValue,正是前文中Master节点分区中设置的值
@Bean(destroyMethod = "")
@StepScope
public JpaPagingItemReader<Article> jpaPagingItemReader(
@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.err.println("接收到分片参数["+minValue+"->"+maxValue+"]");
JpaPagingItemReader<Article> reader = new JpaPagingItemReader<>();
JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider<>();
String sql = "select * from kl_article where arcid >= :minValue and arcid <= :maxValue";
queryProvider.setSqlQuery(sql);
queryProvider.setEntityClass(Article.class);
reader.setQueryProvider(queryProvider);
Map queryParames= new HashMap();
queryParames.put("minValue",minValue);
queryParames.put("maxValue",maxValue);
reader.setParameterValues(queryParames);
reader.setEntityManagerFactory(entityManagerFactory);
return reader;
}
4、文末总结
如上,已经完成了整个 spring batch 远程分区处理的实例,需要注意的是,一个实例,即可主可从可主从,是有 spring profile来控制的,细心的人可能会发现 @Profile({"master", "mixed"})等注解,所以如果你在测试的时候,别忘了在 spring boot中配置好 spring.profiles.active=slave等
更多推荐
所有评论(0)