Deserializer#
- class kafkit.registry.Deserializer(*, registry)#
Bases:
object
An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.
- Parameters:
registry (
kafkit.registry.sansio.RegistryApi
) – A registry client.
Notes
The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.
When an encoded message is deserialized in the
deserialize
method, it does the following steps:Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.
Obtains the schema from the
RegistryApi
. Schemas are cached, so this is a fast operation.Decodes the message using
fastavro.schemaless_reader
.
Why not implement a __call__ method?
The
Serializer
implements a__call__
method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that becauseDeserializer.deserialize
is a coroutine (internally it works with the asynchronousRegistryApi
) and magic methods can’t be coroutines. It’s not the end of the world, though, just calldeserialize
manually on by bytes obtained by the consumer.Methods Summary
deserialize
(data[, include_schema])Deserialize a message.
Methods Documentation
- async deserialize(data, include_schema=False)#
Deserialize a message.
- Parameters:
data (
bytes
) – The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format.include_schema (
bool
, optional) – IfTrue
, the schema itself is included in the returned value. This is useful if your application operates on many different types of messages, and needs a convenient way to introspect a message’s type.
- Returns:
message_info – The deserialized message is wrapped in a dictionary to include metadata. The keys are:
- Return type: