PydanticSchemaManager#

class kafkit.registry.manager.PydanticSchemaManager(*, registry, suffix='')#

Bases: object

A manager for schemas that are represented as Pydantic models in Python, and translated into Avro for the Confluent Schema Registry.

Parameters:
  • registry (RegistryApi) – The Registry API client instance. For an application build with aiohttp, use the kafkit.registry.aiohttp.RegistryApi type.

  • suffix (str, default: '') –

    A suffix that is added to the schema name (and thus subject name), for example _dev1.

    The suffix creates alternate subjects in the Schema Registry so schemas registered during testing and staging don’t affect the compatibility continuity of a production subject.

    For production, it’s best to not set a suffix.

Methods Summary

deserialize(data)

Deserialize the data.

register_model(model[, compatibility])

Register the model with the registry.

register_models(models[, compatibility])

Register the models with the registry.

serialize(data)

Serialize the data.

Methods Documentation

async deserialize(data)#

Deserialize the data.

Parameters:

data (bytes) – The data in the Confluent Wire Format to deserialize into a Pydantic object.

Returns:

The deserialized data.

Return type:

AvroBaseModel

Raises:

UnmanagedSchemaError – Raised if the Pydantic model corresponding to the message’s schema is not registered through the manager.

async register_model(model, compatibility=None)#

Register the model with the registry.

Parameters:
  • model (type[AvroBaseModel]) – The model to register.

  • compatibility (Optional[str], default: None) –

Return type:

None

async register_models(models, compatibility=None)#

Register the models with the registry.

Parameters:
  • models (Iterable[type[AvroBaseModel]]) – The models to register.

  • compatibility (Optional[str], default: None) –

Return type:

None

async serialize(data)#

Serialize the data.

Parameters:

data (AvroBaseModel) – The data to serialize.

Returns:

The serialized data in the Confluent Wire Format.

Return type:

bytes