Issue
I have been struggling with implementing a multi-threaded approach to the application I am working on.
The part I want to run in parallel threads was originally constructed with a for loop going about a list.
@Service
public ApplicationServiceImpl implements ApplicationService {
@Override
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
for (MyObject myObject : myObjectList) {
AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
runMethodD(yetAnotherTypeOfObject);
}
}
}
The methods private AnotherTypeOfObject runMethodA(MyObject myObject) {...}
, private YetAnotherTypeOfObject yetAnotherTypeOfObject(AnotherTypeOfObject anotherTypeOfObject) {...}
, private void runMethodC(YetAnotherTypeOfObject yetAnotherTypeOfObject, String aStringValue, String anotherStringValue) {...}
and private void runMethodD(MyObject myObject) {...}
only use local variables.
I have looked quite a bit to get a solution that would allow firing the threads of a list of 100s of MyObject
instead of one after the other.
What I have done is create a:
@Configuration
@EnableAsync
public class AsyncConfiguration() {
@Bean(name = "threadPoolTaskExecutor")
public Executor aSyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(4);
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setQueueCapacity(50);
threadPoolTaskExecutor.setThreadNamePrefix("threadNamePrefix");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
I do have loads of log.info("some recognizable text")
through the methods A, B, C and D so I can make sure what is going on and I aggregated these methods into one like
private void runThreads(MyObject myObject, String aStringValue, String anotherStringValue) {
AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
runMethodD(yetAnotherTypeOfObject);
}
And I have tried to run the main method as:
@Override
@Async("threadPoolTaskExecutor")
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
String aStringValue = myObject.getAStringValue();
String anotherStringValue = myObject.getAnotherStringValue();
myObjectList.forEach(myObject -> runThreads(myObject, aStringValue, anotherStringValue));
}
I still don't get the intended result of firing a few threads for the runThreads(...) {}
method, so the processing is done in parallel.
What am I missing here?
Solution
If it's only for running all elements of a collection in parallel, then you can use Stream.parallel(). It uses a default ForkJoinPool with a thread per CPU core. This is the simplest method introduced in Java 8.
myObjectList.stream()
.parallel()
.forEach(myObject -> runThreads(myObject, myObject.getAStringValue(), myObject.getAnotherStringValue()));
For this you don't need any @Async or Spring-provided Executor.
You can use a custom ForkJoinPool to customize the number of threads, but the default might work well, too.
ForkJoinPool customThreadPool = new ForkJoinPool(4);
customThreadPool.invoke(
() -> myObjectList.stream()
.parallel()
.forEach(myObject -> runThreads(myObject, myObject.getAStringValue(), myObject.getAnotherStringValue())));
Answered By - GeertPt