Issue
I have the following code to send data to Kafka:
@Service
public class KafkaSender{
@Autowired
private KafkaTemplate<String, Employee> kafkaTemplate;
public void sendMessage(Employee employee) {
ObjectMapper objectMapper = new ObjectMapper();
ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
@Override
public void onSuccess(SendResult<String, Employee> result) {
// method to save in DB
saveInDatabaseMethod(result.getProducerRecord());
}
@Override
public void onFailure(Throwable ex) {
// class cast exception occur here
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
saveInDatabaseMethod(producerRecord);
}
}
}
}
I am able to test the OnSucess
callback scenario, but i am not able to test the OnFailure
one.
@Test
void test() throws InterruptedException, ExecutionException {
Throwable ex = mock(Throwable.class);
Employee employee = new Employee();
when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
when(sendResult.getProducerRecord()).thenReturn(producerRecord);
when(producerRecord.value()).thenReturn(employee);
doAnswer(invocationOnMock -> {
ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
listenableFutureCallback.onFailure(ex);
return null;
}).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
kafkaSender.sendMessage(employee);
}
The above test throws:
java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 cannot be cast to org.springframework.kafka.core.KafkaProducerException
Solution
ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
Your mock is not calling the callback with a KPE, its calling it with this
Throwable ex = mock(Throwable.class)
;
You need to wrap it in a KPE.
Answered By - Gary Russell
Answer Checked By - Cary Denson (JavaFixing Admin)