Issue
I have a spring boot application in which I need to schedule a job to read files from a specific directory and store the data into the DB.
I used Spring batch for handling the files part as the number of files is very large.
The application has a component named PraserStarer
which has a method named startParsing
. This method is annotated with @scheduled
annotation.
@scheduled(fixedDelay = 60 * 1000)
public startParsing(){
// start spring batch job
}
I have a repository interface NewsRepositry
injected into the writer of the spring batch first step.
The application has a simple controller to manually call the startParsing
method. When calling the startParsing method from the controller, everything works fine. The spring batch job starts normally, read the files, write the data into the DB, and archive the files.
When the method startParsing
is invoked from the scheduling framework, the spring batch job starts normally, and read the files but nothing is stored the DB.
I suspect the problem here is that there are two different contexts, one for the scheduling part and another for the rest of the application.
For some reason, there is no transaction manager in the scheduling context which causes nothing to go to the DB.
1- Is my suspicion correct?
2- If yes, how can I force the transaction manager to be loaded to the other context?
EDIT
The code for the parser starter class is below
@Component
public class ParserStarter {
@Autowired
JobLauncher jobLauncher;
@Value("${app.data_directory}")
private String dataDir;
@Autowired
private ParserJobListener jobListener;
@Autowired
private JobBuilderFactory jobBuilderFactory;
public Resource[] getResources() throws IOException {
// return array of file resource to be processed
}
// @Scheduled(fixedDelay = 60 * 1000)
public void startParsing() throws Exception {
String jobName = System.currentTimeMillis() + " New Parser Job";
JobParameters jobParameters = new JobParametersBuilder().addString("source", jobName).toJobParameters();
jobLauncher.run(getParsingJob(), jobParameters);
}
@Bean(name="getParsingJob")
private Job getParsingJob() throws IOException {
jobListener.setResources(getResources());
Step processingStep = jobListener.processingStep();
Step archivingStep = jobListener.archivingStep();
Job job = jobBuilderFactory.get("Store News").incrementer(new RunIdIncrementer())
.listener(jobListener).start(processingStep).next(archivingStep).build();
return job;
}
}
The code for the job listener is below
@Component
public class ParserJobListener extends JobExecutionListenerSupport {
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Resource[] resources;
@Value("${app.archive_directory}")
private String archiveDirectory;
@Autowired
private Writer writer;
public MultiResourceItemReader<DataRecord> multiResourceItemReader() {
MultiResourceItemReader<DataRecord> resourceItemReader = new MultiResourceItemReader<DataRecord>();
resourceItemReader.setResources(resources);
resourceItemReader.setDelegate(new Reader());
return resourceItemReader;
}
public Step archivingStep() {
FileArchivingTask archivingTask = new FileArchivingTask(resources, archiveDirectory);
return stepBuilderFactory.get("Archiving step").tasklet(archivingTask).build();
}
public Step processingStep() {
return stepBuilderFactory.get("Process news file").<DataRecord, DataRecord>chunk(1000)
.reader(multiResourceItemReader()).writer(writer).build();
}
@Override
public void afterJob(JobExecution jobExecution) {
if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
System.out.println("Job finished")
}
}
public void setResources(Resource[] resources) {
this.resources = resources;
}
}
What is remaining is the writer and it is below
@Component
public class Writer implements ItemWriter<DataRecord>{
@Autowired
private DataRepository dataRepo;
@Override
public void write(List<? extends DataRecord> items) throws Exception {
dataRepo.saveAll(items);
}
}
Edit 2
I have changed the writer's write method to save and flush each item indiviually as following
@Transactional
public void write(List<? extends GdeltRecord> items) throws Exception {
for (GdeltRecord gdeltRecord : items) {
dataRepo.saveAndFlush(gdeltRecord);
}
// dataRepo.saveAll(items);
}
This time the application throws a TransactionRequiredException: no transaction is in progress
exception.
Here is the full stack trace of the exception
Caused by: javax.persistence.TransactionRequiredException: no transaction is in progress
at org.hibernate.internal.SessionImpl.checkTransactionNeeded(SessionImpl.java:3552) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1444) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1440) ~[hibernate-core-5.3.7.Final.jar:5.3.7.Final]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.orm.jpa.ExtendedEntityManagerCreator$ExtendedEntityManagerInvocationHandler.invoke(ExtendedEntityManagerCreator.java:350) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:308) ~[spring-orm-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at com.sun.proxy.$Proxy87.flush(Unknown Source) ~[na:na]
at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:533) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:504) ~[spring-data-jpa-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:359) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:200) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:644) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:608) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.lambda$invoke$3(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:595) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:59) ~[spring-data-commons-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) [spring-aop-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.1.4.RELEASE.jar:5.1.4.RELEASE]
... 66 common frames omitted
Solution
I tried the approach described in this question (JpaItemWriter: no transaction is in progress) and it worked for me.
I defined a JpaTransactionManager
bean and used it with step configuration.
@Bean
@Primary
public JpaTransactionManager jpaTransactionManager() {
final JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setDataSource(dataSource);
return transactionManager;
}
and in the step configuration
@Autowired
JpaTransactionManager trxm;
public Step processingStep(Resource[] resources) throws IOException {
return stepBuilderFactory.get("Process CSV File")
.transactionManager(trxm)
.<DataRecord, DataRecord>chunk(1000)
.reader(multiResourceItemReader()).writer(writer).build();
}
Answered By - Fanooos
Answer Checked By - David Marino (JavaFixing Volunteer)