Issue
We have been using Confluent Schema Registry with KafkaStreams for over a year now and everything has been working well; until yesterday.
In an UAT environment we seem to have had a Schema subject deleted and one of our applications started failing over with the message
[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531
I checked the Schema Registry and noticed the subject was missing and queries the id 1531 that is listed in the error with curl such as:
curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
And got back:
{"error_code":40403,"message":"Schema not found"}
I naively just tried to register the schema again without thinking about it and it worked but the id the schema was registered with was not the same as the previous 1531 ID.
I need the schema registered to the ID 1531 since the existing messages in the topic already contain that Id 1531 in the magic byte.
I checked the API docs at https://docs.confluent.io/current/schema-registry/docs/develop/api.html but did not see anything for setting a given Id for a schema.
Is there anyway to force a schema to a specific Id with schema registry?
I am aware of some backup solutions but I am looking for a fix now that will hopefully prevent loss of data or extraordinary measures to fix the topic data.
Solution
Is there anyway to force a schema to a specific Id with schema registry?
There is not.
The ID of 1531 isn't actually "gone", by the way, it just is marked as deleted in the registry (consume the _schemas
topic to see it).
There really is no way around the error that I know of while you use the KafkaAvroDeserializer. You would have to use the ByteArrayDeserializer, then "fix" or "lookup" the correct ID using the Schema Registry client, and then deserialize the rest of the message.
The other option would be to reset your consumer group such that you skip these messages entirely, or setup exception handling. Handling bad messages using Kafka's Streams API
Answered By - OneCricketeer
Answer Checked By - Mildred Charles (JavaFixing Admin)