Issue
I am using Kafka Consumer API to build the consumer.The message structure is complex. To build the deserializers I have implemented the Deserializer class and provide necessary implementations.I am using Jackson API for deserializing. I am getting this error "Exception raisedorg.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition staging.datafeeds.PartnerHotel-0 at offset 19205124"
#POJO classes
public class Change {
private Schema schema;
private Payload payload;
//Getters and constructor
}
public class Details {
private List<String> effectedAttributes;
private List<PartnerHotel> cluster;
//Getters and contructor
}
public class Field {
private String type;
private Boolean optional;
private String field;
//Getters and constructor
}
public class Fields {
private String type;
private List<Field> fields;
private Boolean optional;
private String name;
//Getters and contructor
}
public class Geom{
private int srid;
private String wkb;
//Getters and contructor
}
public class PartnerHotel{
private int id;
private int shopId;
private String partnerHotelId;
private boolean isOnline;
private boolean isRemovedByUser;
private int mappingPriority;
private int hotelId;
private String statusHotelId;
private String name;
private String street;
private String zipCode;
private String city;
private String sourceCityId;
private String state;
private String stateAlpha2;
private String country;
private String alpha2;
private String alpha3;
private double latitude;
private double longitude;
private Geom geomPoint;
private int countryIdShop;
private int selectedGeoname;
private String propertyType;
private List<String> tags;
private int stars;
private String url;
private int nrRatings;
private double recommendation;
private long dateHotelId;
private long timeStamp;
private long lastImport;
//Getters and contructor
}
public class Payload {
private PartnerHotel before;
private PartnerHotel after;
private Source source;
private String op;
private String ts_ms;
//Getters and contructor
}
public class Schema {
private String type;
private Boolean optional;
private String name;
private List<Fields> fields;
//Getters and contructor
}
public class Source {
private String version;
private String name;
private String ts_usec;
private String txId;
private String lxn;
private Boolean snapshot;
private Object lastSnapshotRecord;
//Getters and contructor
}
#Deserializer
public class ChangeDeserializer implements Deserializer<Change> {
public ChangeDeserializer(){ }
public void configure(Map<String, ?> map, boolean b) {}
public Change deserialize(String topic, byte[] data) {
if(data == null){
return null;
}
try{
ObjectMapper objectMapper = new ObjectMapper();
Change change = objectMapper.readValue(data,Change.class);
return change;
}
catch(IOException exception){
throw new DeserializationException("Unable to deserialize Change", exception);
}}
public void close() {}
}
#Consumer
public class KafkaAcnowledger {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "someUrl");
props.put("group.id", "test131");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records",1);
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
props.put("value.deserializer", "deserializer.ChangeDeserializer");
KafkaConsumer<Long, Change> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("staging.datafeeds.PartnerHotel"));
while (true) {
try{
ConsumerRecords<Long, Change> records = consumer.poll(100);
for (ConsumerRecord<Long, Change> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
catch(Exception exception){
System.out.println("Exception raised" + exception);
}
}
}
}
The poll() in the consumer looks fine , and the enter code here
exception I am getting a Serialization exception . I checked the consumer group via kafka-consumer-groups.sh , the group of this consumer is there in the list.Any direction is appreciated .
Structure of the message in the Kafka topic:
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int16","optional":false,"field":"shopId"},{"type":"string","optional":false,"field":"partnerHotelId"},{"type":"boolean","optional":false,"field":"isOnline"},{"type":"boolean","optional":false,"field":"isRemovedByUser"},{"type":"int32","optional":false,"field":"mappingPriority"},{"type":"int32","optional":true,"field":"hotelId"},{"type":"string","optional":true,"field":"statusHotelId"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"dateHotelId"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"timestamp"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"lastImport"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"street"},{"type":"string","optional":true,"field":"zipcode"},{"type":"string","optional":true,"field":"city"},{"type":"string","optional":true,"field":"sourceCityId"},{"type":"string","optional":true,"field":"state"},{"type":"string","optional":true,"field":"stateAlpha2"},{"type":"string","optional":true,"field":"country"},{"type":"string","optional":true,"field":"alpha2"},{"type":"string","optional":true,"field":"alpha3"},{"type":"double","optional":true,"field":"latitude"},{"type":"double","optional":true,"field":"longitude"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"geomPoint"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedGeonames"},{"type":"int32","optional":true,"field":"countryIdShop"},{"type":"int32","optional":true,"field":"selectedGeoname"},{"type":"string","optional":true,"field":"propertyType"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"tags"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"chains"},{"type":"array","items":{"type":"string","optional":true},"optional":false,"field":"creditCards"},{"type":"int32","optional":true,"field":"stars"},{"type":"string","optional":true,"field":"url"},{"type":"int32","optional":true,"field":"nrRatings"},{"type":"double","optional":true,"field":"recommendation"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"proposedPartnerHotels"},{"type":"array","items":{"type":"int32","optional":true},"optional":false,"field":"removedFromHotelIds"}],"optional":true,"name":"staging.datafeeds.PartnerHotel.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"db"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"string","optional":true,"field":"schema"},{"type":"string","optional":true,"field":"table"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"staging.datafeeds.PartnerHotel.Envelope"},"payload":{"before":null,"after":{"id":13893497,"shopId":135,"partnerHotelId":"6-42036","isOnline":false,"isRemovedByUser":false,"mappingPriority":0,"hotelId":null,"statusHotelId":"AUTO","dateHotelId":null,"timestamp":1529334013938327,"lastImport":1503491984188866,"name":"Ferienvermietung Wiedemann","street":"Chausseeberg 3","zipcode":"17429","city":"Mellenthin","sourceCityId":null,"state":null,"stateAlpha2":null,"country":"Deutschland","alpha2":"DE","alpha3":null,"latitude":53.920278,"longitude":14.013333,"geomPoint":{"wkb":"AQEAACDmEAAARuo9ldMGLEA5nWSry/VKQA==","srid":4326},"proposedGeonames":[2872064],"countryIdShop":83,"selectedGeoname":2872064,"propertyType":null,"tags":["77","36","33","34","38","43","41","123","26","29","1","7","6","70","9","1000","58","17","18","15","13","14","20","65","63","46","10","52"],"chains":[],"creditCards":[],"stars":null,"url":"http://www.buchen.travel/onepage-idealo-booking/index.php?room=6-42036","nrRatings":null,"recommendation":null,"proposedHotels":[],"proposedPartnerHotels":[],"removedFromHotelIds":[]},"source":{"version":"0.8.3.Final","name":"staging","db":"geo","ts_usec":1554391067119000,"txId":4757138,"lsn":1139303143104,"schema":"datafeeds","table":"PartnerHotel","snapshot":true,"last_snapshot_record":false},"op":"r","ts_ms":1554391067119}}
Solution
You POJO is not compatible with your message and jackson cannot parse it. At least there is lack of few fields, following error can be found.
Unrecognized field "timestamp" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "zipcode" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedGeonames" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "chains" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "creditCards" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedHotels" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "proposedPartnerHotels" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "removedFromHotelIds" (class com.example.kafka.Change$PartnerHotel), not marked as ignorable
Unrecognized field "db" (class com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "lsn" (class com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "schema" (class com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "table" (class com.example.kafka.Change$Source), not marked as ignorable
Unrecognized field "last_snapshot_record" (class com.example.kafka.Change$Source), not marked as ignorable
To fix it you have to add those fields to your POJO or disable fail on unknown: objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
. More regarding jackson deserialization error can be found here: jackson Unrecognized field
Answered By - Bartosz Wardziński
Answer Checked By - Terry (JavaFixing Volunteer)