Serializer¶
-
class
kafkit.registry.
Serializer
(*, schema: Dict[str, Any], schema_id: int)¶ Bases:
object
An Avro message serializer that writes in the Confluent Wire Format.
Use the
Serializer.register
class method to create a Serializer instance.Notes
A
Serializer
instance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.This is an example using the MockRegistryApi client. Real-world applications would use the
kafkit.registry.aiohttp.RegistryApi
instead.>>> from kafkit.registry.sansio import MockRegistryApi >>> client = MockRegistryApi()
For demonstration purposes, assume the schema is already cached:
>>> schema = { ... 'type': 'record', ... 'name': 'schema1', ... 'namespace': 'test-schemas', ... 'fields': [ ... {'name': 'a', 'type': 'int'}, ... {'name': 'b', 'type': 'string'} ... ] ... } >>> client.schemas.insert(schema, 1) >>> serializer = await Serializer.register( ... registry=client, ... schema=schema)
Serialize messages:
>>> data = serializer({'a': 42, 'b': 'Hello world!'})
This serializer works well as
key_serializer
orvalue_serializer
parameters to theaiokafka.AIOKafkaProducer
. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.htmlMethods Summary
__call__
(data)Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
register
(*, registry, schema[, subject])Create a serializer ensuring that the schema is registered with the schema registry.
Methods Documentation
-
__call__
(data: Any) → bytes¶ Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
-
async classmethod
register
(*, registry: RegistryApi, schema: Dict[str, Any], subject: Optional[str] = None) → Serializer¶ Create a serializer ensuring that the schema is registered with the schema registry.
- Parameters
registry (
kafkit.registry.sansio.RegistryApi
) – A registry client.schema (
dict
) – An Avro schema.subject (
str
, optional) – The name of the Schema Registry subject that the schema is registered under. If not provided, the schema is automatically set from the fully-qualified name of the schema (the'name'
field of the schema’s record type). Seekafkit.registry.sansio.RegistryApi.register_schema
for details.
- Returns
serializer – A serializer instance.
- Return type
Notes
It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.
-