Serializer#
- class kafkit.registry.Serializer(*, schema, schema_id)#
Bases:
object
An Avro message serializer that writes in the Confluent Wire Format.
Use the
Serializer.register
class method to create a Serializer instance.- Parameters:
Notes
A
Serializer
instance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.This is an example using the MockRegistryApi client. Real-world applications would use the
kafkit.registry.aiohttp.RegistryApi
instead.>>> from kafkit.registry.sansio import MockRegistryApi >>> client = MockRegistryApi()
For demonstration purposes, assume the schema is already cached:
>>> schema = { ... 'type': 'record', ... 'name': 'schema1', ... 'namespace': 'test-schemas', ... 'fields': [ ... {'name': 'a', 'type': 'int'}, ... {'name': 'b', 'type': 'string'} ... ] ... } >>> client.schemas.insert(schema, 1) >>> serializer = await Serializer.register( ... registry=client, ... schema=schema)
Serialize messages:
>>> data = serializer({'a': 42, 'b': 'Hello world!'})
This serializer works well as
key_serializer
orvalue_serializer
parameters to theaiokafka.AIOKafkaProducer
. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.htmlMethods Summary
__call__
(data)Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
register
(*, registry, schema[, subject])Create a serializer ensuring that the schema is registered with the schema registry.
Methods Documentation
- __call__(data)#
Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
- async classmethod register(*, registry, schema, subject=None)#
Create a serializer ensuring that the schema is registered with the schema registry.
- Parameters:
registry (
kafkit.registry.sansio.RegistryApi
) – A registry client.schema (
dict
) – An Avro schema.subject (
str
, optional) – The name of the Schema Registry subject that the schema is registered under. If not provided, the schema is automatically set from the fully-qualified name of the schema (the'name'
field of the schema’s record type). Seekafkit.registry.sansio.RegistryApi.register_schema
for details.
- Returns:
serializer – A serializer instance.
- Return type:
Notes
It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.