Serializer

class kafkit.registry.Serializer(*, schema, schema_id)

Bases: object

An Avro message serializer that writes in the Confluent Wire Format.

Use the Serializer.register class method to create a Serializer instance.

Parameters:
schema : dict

An Avro schema.

schema_id : int

The ID of schema in a Schema Registry.

Notes

A Serializer instance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.

This is an example using the MockRegistryApi client. Real-world applications would use the kafkit.registry.aiohttp.RegistryApi instead.

>>> from kafkit.registry.sansio import MockRegistryApi
>>> client = MockRegistryApi()

For demonstration purposes, assume the schema is already cached:

>>> schema = {
...    'type': 'record',
...    'name': 'schema1',
...    'namespace': 'test-schemas',
...    'fields': [
...        {'name': 'a', 'type': 'int'},
...        {'name': 'b', 'type': 'string'}
...    ]
... }
>>> client.schemas.insert(schema, 1)
>>> serializer = await Serializer.register(
...     registry=client,
...     schema=schema)

Serialize messages:

>>> data = serializer({'a': 42, 'b': 'Hello world!'})

This serializer works well as key_serializer or value_serializer parameters to the aiokafka.AIOKafkaProducer. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.html

Methods Summary

__call__(data) Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
register(*, registry, schema[, subject]) Create a serializer ensuring that the schema is registered with the schema registry.

Methods Documentation

__call__(data)

Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.

Parameters:
data : object

An Avro-serializable object. The object must conform to the schema.

Returns:
message : bytes

Message in the Confluent Schema Registry wire format.

classmethod register(*, registry, schema, subject=None)

Create a serializer ensuring that the schema is registered with the schema registry.

Parameters:
registry : kafkit.registry.sansio.RegistryApi

A registry client.

schema : dict

An Avro schema.

subject : str, optional

The name of the Schema Registry subject that the schema is registered under. If not provided, the schema is automatically set from the fully-qualified name of the schema (the 'name' field of the schema’s record type). See kafkit.registry.sansio.RegistryApi.register_schema for details.

Returns:
serializer : Serializer

A serializer instance.

Notes

It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.