Deserializer¶
-
class
kafkit.registry.
Deserializer
(*, registry: RegistryApi)¶ Bases:
object
An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.
Parameters: registry ( kafkit.registry.sansio.RegistryApi
) – A registry client.Notes
The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.
When an encoded message is deserialized in the
deserialize
method, it does the following steps:- Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.
- Obtains the schema from the
RegistryApi
. Schemas are cached, so this is a fast operation. - Decodes the message using
fastavro.schemaless_reader
.
Why not implement a __call__ method?
The
Serializer
implements a__call__
method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that becauseDeserializer.deserialize
is a coroutine (internally it works with the asynchronousRegistryApi
) and magic methods can’t be coroutines. It’s not the end of the world, though, just calldeserialize
manually on by bytes obtained by the consumer.Methods Summary
deserialize
(data, include_schema)Deserialize a message. Methods Documentation
-
deserialize
(data: bytes, include_schema: bool = False) → Dict[str, Any]¶ Deserialize a message.
Parameters: - data (
bytes
) – The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format. - include_schema (
bool
, optional) – IfTrue
, the schema itself is included in the returned value. This is useful if your application operates on many different types of messages, and needs a convenient way to introspect a message’s type.
Returns: message_info – The deserialized message is wrapped in a dictionary to include metadata. The keys are:
Return type: - data (