Serializer#

class kafkit.registry.Serializer(*, schema, schema_id)#

Bases: object

An Avro message serializer that writes in the Confluent Wire Format.

Use the Serializer.register class method to create a Serializer instance.

Parameters:
  • schema (dict) – An Avro schema.

  • schema_id (int) – The ID of schema in a Schema Registry.

Notes

A Serializer instance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.

This is an example using the MockRegistryApi client. Real-world applications would use the kafkit.registry.aiohttp.RegistryApi instead.

>>> from kafkit.registry.sansio import MockRegistryApi
>>> client = MockRegistryApi()

For demonstration purposes, assume the schema is already cached:

>>> schema = {
...    'type': 'record',
...    'name': 'schema1',
...    'namespace': 'test-schemas',
...    'fields': [
...        {'name': 'a', 'type': 'int'},
...        {'name': 'b', 'type': 'string'}
...    ]
... }
>>> client.schemas.insert(schema, 1)
>>> serializer = await Serializer.register(
...     registry=client,
...     schema=schema)

Serialize messages:

>>> data = serializer({'a': 42, 'b': 'Hello world!'})

This serializer works well as key_serializer or value_serializer parameters to the aiokafka.AIOKafkaProducer. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.html

Methods Summary

__call__(data)

Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.

register(*, registry, schema[, subject])

Create a serializer ensuring that the schema is registered with the schema registry.

Methods Documentation

__call__(data)#

Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.

Parameters:

data (object) – An Avro-serializable object. The object must conform to the schema.

Returns:

message – Message in the Confluent Schema Registry wire format.

Return type:

bytes

async classmethod register(*, registry, schema, subject=None)#

Create a serializer ensuring that the schema is registered with the schema registry.

Parameters:
Returns:

serializer – A serializer instance.

Return type:

Serializer

Notes

It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.