Issue
I am new with Spring-Batch and I would like to understand how it should be used to process a List<String>
as fast as possible in parallel using multiple threads and then just return a subset of them based on some condition.
For example, I was thinking to use it for checking which IP is up within a subnet.
import org.apache.commons.net.util.SubnetUtils;
String subnet = "192.168.8.0/24";
SubnetUtils utils = new SubnetUtils(subnet);
List<String> addresses = List.of(utils.getInfo().getAllAddresses());
if(InetAddress.getByName(address).isReachable(100){
// Consider this address for the final list
return true;
};
My code is as follows:
import it.eng.cysec.discoverer.service.NetworkService;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Date;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final NetworkService networkService;
@Bean
public Job checkSubnetJob(Step checkIPStep){
return this.jobBuilderFactory.get("check-subnet-job")
.incrementer(new RunIdIncrementer())
.start(checkIPStep)
.build();
}
@Bean
@JobScope
public Step checkIPStep(@Value("#{jobParameters['subnet']}") String subnet) {
System.out.println("Subnet parameter: " + subnet);
return this.stepBuilderFactory.get("check-ip-step")
.<String, String>chunk(10)
.reader(reader(null))
.processor(processor())
.writer(writer())
.allowStartIfComplete(true)
.build();
}
@Bean
@JobScope
public ItemReader<String> reader(@Value("#{jobParameters['subnet']}") String subnet) {
return new ListItemReader<>(this.networkService.getAllSubnetAddresses(subnet));
}
@Bean
public ItemProcessor<String, String> processor() {
return ip -> {
System.out.println("Processor IP: " + ip + " " + new Date());
try {
InetAddress address = InetAddress.getByName(ip);
if(address.isReachable(5000)){
return ip;
}else {
return null;
}
}catch (Exception e){
return null;
}
};
}
@Bean
public ItemWriter<String> writer() {
// TODO How to pass the list of up IPs back to the calling function?
return list -> {
System.out.println("Arrays to String" + Arrays.toString(list.toArray()));
};
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("test")
@RequiredArgsConstructor
public class TestController {
private final Job job;
private final JobLauncher jobLauncher;
@GetMapping()
public List<String> test(){
JobParameters parameters = new JobParametersBuilder()
.addString("subnet", "192.168.8.0/24", false)
.toJobParameters();
try {
this.jobLauncher.run(this.job, parameters);
} catch (Exception e) {
throw new RuntimeException(e);
}
// TODO How to return the IP that are up based on the previous object?
return List.of("OK");
}
}
So my main questions are:
- How to make different chunks (of 10 IP) to be processed in parallel? Right now they are not.
- What is the fastest approach that Spring-Batch provides to process all the IPs of a local network? Is it enough to keep them in memory or would it be better to persist them while processing the remaining IPs? If so, how?
- How to pass back to the calling method the computed IPs result?
Solution
You can create a custom partitioner that partitions the input list based on indexes. Here is a quick example:
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.sample;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
/**
* Example of a partitioned step where the input is a list, and partitions
* are sublists that are processed in parallel with local worker threads.
*
* @author Mahmoud Ben Hassine
*/
@Configuration
@EnableBatchProcessing
public class ListPartitioningSample {
@Bean
public Step managerStep(StepBuilderFactory stepBuilderFactory) {
List<String> items = Arrays.asList("foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8"); // retrieved with this.networkService.getAllSubnetAddresses(subnet)
return stepBuilderFactory.get("managerStep")
.partitioner("workerStep", new ListPartitioner(items.size()))
.gridSize(2)
.taskExecutor(new SimpleAsyncTaskExecutor())
.step(workerStep(stepBuilderFactory))
.build();
}
@Bean
public Step workerStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("workerStep")
.<String, String>chunk(2)
.reader(itemReader(null))
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
@StepScope
public ListItemReader<String> itemReader(@Value("#{stepExecutionContext['range']}") Range partition) {
List<String> items = Arrays.asList("foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8"); // retrieved with this.networkService.getAllSubnetAddresses(subnet)
return new ListItemReader<>(items.subList(partition.start, partition.end));
}
@Bean
public ItemProcessor<String, String> itemProcessor() {
return new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
return item; // filter items as needed here
}
};
}
@Bean
public ItemWriter<String> itemWriter() {
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
items.forEach(new Consumer<String>() {
@Override
public void accept(String item) {
System.out.println(Thread.currentThread().getName() + ": " + item);
}
});
}
};
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(managerStep(stepBuilderFactory))
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
// TODO quick and dirty implementation, please add sanity checks and verify edge cases
public static class ListPartitioner implements Partitioner {
private int listSize;
public ListPartitioner(int listSize) {
this.listSize = listSize;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
// calculate ranges
int partitionSize = listSize / gridSize;
Range[] ranges = new Range[gridSize];
for (int i = 0, j = 0; i < gridSize; i++, j+= partitionSize) {
ranges[i] = new Range(j, j + partitionSize);
System.out.println("range = " + ranges[i]);
}
// prepare partition meta-data
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.put("range", ranges[i]);
partitions.put("partition" + i, context);
}
return partitions;
}
}
/**
* Represents an index range (ie a partition) in a list.
* Ex: List = ["foo1", "foo2", "bar1", "bar2"]
* Range1 = [0, 2] => sublist1 = ["foo1", "foo2"]
* Range2 = [2, 4] => sublist2 = ["bar1", "bar2"]
* @param start of sublist, inclusive
* @param end of sublist, exclusive
*/
record Range(int start, int end) {};
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(ListPartitioningSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
System.out.println("jobExecution = " + jobExecution);
}
}
The idea is to create sub lists and make each worker step work on a distinct sublist. (note the list is not duplicated, it could be shared and each worker thread will read its own distinct partition).
The sample above prints:
[main] INFO org.springframework.batch.core.launch.support.SimpleJobLauncher - Job: [SimpleJob: [name=job]] launched with the following parameters: [{}]
[main] INFO org.springframework.batch.core.job.SimpleStepHandler - Executing step: [managerStep]
range = Range[start=0, end=4]
range = Range[start=4, end=8]
SimpleAsyncTaskExecutor-1: foo1
SimpleAsyncTaskExecutor-1: foo2
SimpleAsyncTaskExecutor-2: foo5
SimpleAsyncTaskExecutor-2: foo6
SimpleAsyncTaskExecutor-1: foo3
SimpleAsyncTaskExecutor-1: foo4
SimpleAsyncTaskExecutor-2: foo7
SimpleAsyncTaskExecutor-2: foo8
[SimpleAsyncTaskExecutor-1] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition0] executed in 82ms
[SimpleAsyncTaskExecutor-2] INFO org.springframework.batch.core.step.AbstractStep - Step: [workerStep:partition1] executed in 82ms
[main] INFO org.springframework.batch.core.step.AbstractStep - Step: [managerStep] executed in 137ms
[main] INFO org.springframework.batch.core.launch.support.SimpleJobLauncher - Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 162ms
jobExecution = JobExecution: id=0, version=2, startTime=Wed Aug 17 12:21:00 CEST 2022, endTime=Wed Aug 17 12:21:00 CEST 2022, lastUpdated=Wed Aug 17 12:21:00 CEST 2022, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, Job=[job]], jobParameters=[{}]
This shows that partitions (ie sublists) are processed in parallel by different threads.
Now to answer your question about how to gather written elements (the retained IPs in your case), you can put item indexes in the Execution context (not items them selves), and grab them from the execution context with a StepExecutionAggregator
. You can find an example of how to do that in the word count fork/join sample that I shared here:
EDIT: show how to access the subnet
job parameter from the item reader
You are already passing the subnet
as a job parameter in your controller method. So you can access it in the item reader bean definition with a SpEL expression as follows:
@Bean
@StepScope
public ListItemReader<String> itemReader(@Value("#{stepExecutionContext['range']}") Range partition, @Value("#{jobParameters['subnet']}") String subnet) {
// use subnet parameter as needed here
List<String> items = Arrays.asList("foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7", "foo8"); // retrieved with this.networkService.getAllSubnetAddresses(subnet)
return new ListItemReader<>(items.subList(partition.start, partition.end));
}
Answered By - Mahmoud Ben Hassine
Answer Checked By - Katrina (JavaFixing Volunteer)