Deserializer¶
-
class
kafkit.registry.
Deserializer
(*, registry)¶ Bases:
object
An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.
Parameters: - registry :
kafkit.registry.sansio.RegistryApi
A registry client.
Notes
The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.
When an encoded message is deserialized in the
deserialize
method, it does the following steps:- Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.
- Obtains the schema from the
RegistryApi
. Schemas are cached, so this is a fast operation. - Decodes the message using
fastavro.schemaless_reader
.
Why not implement a __call__ method?
The
Serializer
implements a__call__
method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that becauseDeserializer.deserialize
is a coroutine (internally it works with the asynchronousRegistryApi
) and magic methods can’t be coroutines. It’s not the end of the world, though, just calldeserialize
manually on by bytes obtained by the consumer.Methods Summary
deserialize
(data[, include_schema])Deserialize a message. Methods Documentation
-
deserialize
(data, include_schema=False)¶ Deserialize a message.
Parameters: - data :
bytes
The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format.
- include_schema :
bool
, optional If
True
, the schema itself is included in the returned value. This is useful if your application operates on many different types of messages, and needs a convenient way to introspect a message’s type.
Returns: - message_info :
dict
The deserialized message is wrapped in a dictionary to include metadata. The keys are:
- data :
- registry :