Issue
I'm using Oracle, Spring, Hibernate, and JPA. I want to iterate over arbitrary numbers of DB results without having to worry about running out of memory.
I'm trying to scroll through query results without retaining every object I get.
First I create the native query:
Query q = getEm().createNativeQuery(sql,reportRowType);
q.setHint("org.hibernate.fetchSize",1000);
q.setHint("org.hibernate.cacheable",false);
Then I call a method on an autowired object which executes the query and does something with the results. For testing I just entirely ignore the results and just iterate over them.
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public <T extends ResultRow> long run(EntityManager em, Query q) {
ScrollableResults sr = q.unwrap(org.hibernate.Query.class)
.setReadOnly(true)
.setFetchSize(1000)
.setCacheable(false)
.setCacheMode(CacheMode.IGNORE)
.scroll(ScrollMode.FORWARD_ONLY);
try {
while(sr.next()) {
T obj = (T)sr.get(0);
em.detach(obj);
// do something with the row here
}
} finally {
if(sr!=null)
sr.close();
}
}
I've found that I will eventually run out of memory using the above code (somewhere around 1.5 million results for my test). The Query object is retaining the objects somehow.
I found that even if I run the query through pages (using q.setFirstResult and q.setMaxResults) it actually still retains the objects.
The only way to do this is to create an entirely new Query object and then using setFirstResult and setMaxResults to get results 1 through 10000 and then 10001 through 20000, etc.
I have read about hibernate StatelessSession, but it looks pretty involved to get it working. Is there really no way to execute a jpa query without retaining all results of the query?
Solution
I eventually figured out how to use hibernate stateless sessions. This won't be portable, but if you're using hibernate, something like this will work.
import javax.persistence.EntityManager;
import java.util.Map;
import org.apache.log4j.Logger;
import org.hibernate.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Service
public class StatelessQueryRunner
{
/** Executes specified native sql in a stateless session. The consumer is given each row as it's received. */
@Transactional(readOnly = true, propagation = Propagation.REQUIRES_NEW)
public <T> long run(EntityManager em, String sql, Class<T> resultRowType, Map<String,Object> params, int fetchSize, Integer firstResult, Integer maxResults, QueryResultConsumer<T> consumer) {
int totalResults = 0;
Session hibernateSession = em.unwrap(Session.class);
StatelessSession statelessSession = hibernateSession.getSessionFactory().openStatelessSession();
try {
// create the query for the stateless session.
SQLQuery q = statelessSession.createSQLQuery(sql);
q.addEntity(resultRowType);
q.setFetchSize(1000);
JpaQueryWrapper qw = new JpaQueryWrapper(q);
if(params!=null) {
for(Map.Entry<String,Object> entry : params.entrySet()) {
qw.setParameter(entry.getKey(),entry.getValue());
}
}
if(firstResult!=null)
q.setFirstResult(firstResult);
if(maxResults!=null)
q.setMaxResults(maxResults);
ScrollableResults sr = q
.setReadOnly(true)
.setFetchSize(fetchSize)
.setCacheable(false)
.scroll(ScrollMode.FORWARD_ONLY);
try {
while(sr.next()) {
T obj = (T)sr.get(0);
em.detach(obj);
consumer.consume(obj);
++totalResults;
// if(totalResults % 100000 == 0)
// Logger.getLogger(getClass()).debug("totalResults="+ totalResults);
}
} finally {
if(sr!=null)
sr.close();
}
return totalResults;
} finally {
statelessSession.close();
}
}
/** You can't use annotations to start transactions when inside a stateless session, so if you want to do anything you'll need to call this method to run code in a separate session which uses the Session object to start a transaction and save things. */
public <T> void runInSession(EntityManager em, SessionRunnable<T> action) {
Session hibernateSession = em.unwrap(Session.class);
Session session = hibernateSession.getSessionFactory().openSession();
try {
action.run(session);
} finally {
session.close();
}
}
public interface SessionRunnable<T>
{
public void run(Session session);
}
}
import javax.persistence.*;
import java.util.*;
import org.hibernate.SQLQuery;
public class JpaQueryWrapper
implements javax.persistence.Query
{
private SQLQuery q;
public JpaQueryWrapper(SQLQuery q) {
this.q = q;
}
@Override
public List getResultList() {throw new UnsupportedOperationException();}
@Override
public Object getSingleResult() {throw new UnsupportedOperationException();}
@Override
public int executeUpdate() {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setMaxResults(int maxResult) {throw new UnsupportedOperationException();}
@Override
public int getMaxResults() {throw new UnsupportedOperationException();}
@Override
public Query setFirstResult(int startPosition) {throw new UnsupportedOperationException();}
@Override
public int getFirstResult() {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setHint(String hintName, Object value) {throw new UnsupportedOperationException();}
@Override
public Map<String, Object> getHints() {throw new UnsupportedOperationException();}
@Override
public <T> javax.persistence.Query setParameter(Parameter<T> param, T value) {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setParameter(Parameter<Calendar> param, Calendar value, TemporalType temporalType) {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setParameter(Parameter<Date> param, Date value, TemporalType temporalType) {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setParameter(String name, Object value) {
if(value instanceof Enum) {
q.setParameter(name,((Enum) value).name());
} else {
q.setParameter(name,value);
}
return this;
}
@Override
public javax.persistence.Query setParameter(String name, Calendar value, TemporalType temporalType) {return setParameter(name,(Date)(value==null?null:value.getTime()),temporalType);}
@Override
public javax.persistence.Query setParameter(String name, Date value, TemporalType temporalType) {
if(temporalType==TemporalType.DATE)
q.setDate(name,value);
else if(temporalType==TemporalType.TIME)
q.setTime(name,value);
else if(temporalType==TemporalType.TIMESTAMP)
q.setTimestamp(name,value);
else
throw new UnsupportedOperationException();
return this;
}
@Override
public javax.persistence.Query setParameter(int position, Object value) {q.setParameter(position,value); return this;}
@Override
public javax.persistence.Query setParameter(int position, Calendar value, TemporalType temporalType) {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setParameter(int position, Date value, TemporalType temporalType) {throw new UnsupportedOperationException();}
@Override
public Set<Parameter<?>> getParameters() {throw new UnsupportedOperationException();}
@Override
public Parameter<?> getParameter(String name) {throw new UnsupportedOperationException();}
@Override
public <T> Parameter<T> getParameter(String name, Class<T> type) {throw new UnsupportedOperationException();}
@Override
public Parameter<?> getParameter(int position) {throw new UnsupportedOperationException();}
@Override
public <T> Parameter<T> getParameter(int position, Class<T> type) {throw new UnsupportedOperationException();}
@Override
public boolean isBound(Parameter<?> param) {throw new UnsupportedOperationException();}
@Override
public <T> T getParameterValue(Parameter<T> param) {throw new UnsupportedOperationException();}
@Override
public Object getParameterValue(String name) {throw new UnsupportedOperationException();}
@Override
public Object getParameterValue(int position) {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setFlushMode(FlushModeType flushMode) {throw new UnsupportedOperationException();}
@Override
public FlushModeType getFlushMode() {throw new UnsupportedOperationException();}
@Override
public javax.persistence.Query setLockMode(LockModeType lockMode) {throw new UnsupportedOperationException();}
@Override
public LockModeType getLockMode() {throw new UnsupportedOperationException();}
@Override
public <T> T unwrap(Class <T> cls) {throw new UnsupportedOperationException();}
}
public interface QueryResultConsumer<T>
{
public void consume(T obj);
}
Answered By - HappyEngineer
Answer Checked By - Katrina (JavaFixing Volunteer)