Deserializer#

class kafkit.registry.Deserializer(*, registry)#

Bases: object

An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.

Parameters:

registry (kafkit.registry.sansio.RegistryApi) – A registry client.

Notes

The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.

When an encoded message is deserialized in the deserialize method, it does the following steps:

  1. Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.

  2. Obtains the schema from the RegistryApi. Schemas are cached, so this is a fast operation.

  3. Decodes the message using fastavro.schemaless_reader.

Why not implement a __call__ method?

The Serializer implements a __call__ method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that because Deserializer.deserialize is a coroutine (internally it works with the asynchronous RegistryApi) and magic methods can’t be coroutines. It’s not the end of the world, though, just call deserialize manually on by bytes obtained by the consumer.

Methods Summary

deserialize(data)

Deserialize a message.

Methods Documentation

async deserialize(data)#

Deserialize a message.

Parameters:

data (bytes) – The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format.

Returns:

The deserialized message and schema information.

Return type:

MessageInfo