Deserializer

class kafkit.registry.Deserializer(*, registry)

Bases: object

An Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.

Parameters:
registry : kafkit.registry.sansio.RegistryApi

A registry client.

Notes

The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.

When an encoded message is deserialized in the deserialize method, it does the following steps:

  1. Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.
  2. Obtains the schema from the RegistryApi. Schemas are cached, so this is a fast operation.
  3. Decodes the message using fastavro.schemaless_reader.

Why not implement a __call__ method?

The Serializer implements a __call__ method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that because Deserializer.deserialize is a coroutine (internally it works with the asynchronous RegistryApi) and magic methods can’t be coroutines. It’s not the end of the world, though, just call deserialize manually on by bytes obtained by the consumer.

Methods Summary

deserialize(data[, include_schema]) Deserialize a message.

Methods Documentation

deserialize(data, include_schema=False)

Deserialize a message.

Parameters:
data : bytes

The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format.

include_schema : bool, optional

If True, the schema itself is included in the returned value. This is useful if your application operates on many different types of messages, and needs a convenient way to introspect a message’s type.

Returns:
message_info : dict

The deserialized message is wrapped in a dictionary to include metadata. The keys are:

'id'

The ID of the schema (an int) in the Schema Registry. This uniquely identifies the message’s schema.

'message'

The message itself, as a decoded Python object.

'schema'

The schema, as a Python object. This key is only included when include_schema is True.