Serializer¶
-
class
kafkit.registry.
Serializer
(*, schema: Dict[str, Any], schema_id: int)¶ Bases:
object
An Avro message serializer that writes in the Confluent Wire Format.
Use the
Serializer.register
class method to create a Serializer instance.Parameters: Notes
A
Serializer
instance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.This is an example using the MockRegistryApi client. Real-world applications would use the
kafkit.registry.aiohttp.RegistryApi
instead.>>> from kafkit.registry.sansio import MockRegistryApi >>> client = MockRegistryApi()
For demonstration purposes, assume the schema is already cached:
>>> schema = { ... 'type': 'record', ... 'name': 'schema1', ... 'namespace': 'test-schemas', ... 'fields': [ ... {'name': 'a', 'type': 'int'}, ... {'name': 'b', 'type': 'string'} ... ] ... } >>> client.schemas.insert(schema, 1) >>> serializer = await Serializer.register( ... registry=client, ... schema=schema)
Serialize messages:
>>> data = serializer({'a': 42, 'b': 'Hello world!'})
This serializer works well as
key_serializer
orvalue_serializer
parameters to theaiokafka.AIOKafkaProducer
. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.htmlMethods Summary
__call__
(data)Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix. register
(*, registry, schema, Any], subject)Create a serializer ensuring that the schema is registered with the schema registry. Methods Documentation
-
__call__
(data: Any) → bytes¶ Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
Parameters: data (object) – An Avro-serializable object. The object must conform to the schema. Returns: message – Message in the Confluent Schema Registry wire format. Return type: bytes
-
classmethod
register
(*, registry: RegistryApi, schema: Dict[str, Any], subject: Optional[str] = None) → Serializer¶ Create a serializer ensuring that the schema is registered with the schema registry.
Parameters: - registry (
kafkit.registry.sansio.RegistryApi
) – A registry client. - schema (
dict
) – An Avro schema. - subject (
str
, optional) – The name of the Schema Registry subject that the schema is registered under. If not provided, the schema is automatically set from the fully-qualified name of the schema (the'name'
field of the schema’s record type). Seekafkit.registry.sansio.RegistryApi.register_schema
for details.
Returns: serializer – A serializer instance.
Return type: Notes
It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.
- registry (
-