RecordNameSchemaManager

class kafkit.registry.manager.RecordNameSchemaManager(*, root: Path, registry: RegistryApi, suffix: str = '')

Bases: object

A manager for schemas embedded in the application itself in conjunction with a Confluent Schema Registry, for the case of a record name subject name strategy.

Parameters
  • root (pathlib.Path) – The root directory containing schemas. Schemas can be located within this directory, or in a child directory. Schemas must have a .json filename extension.

  • registry (kafkit.registry.sansio.RegistryApi) – The Registry API client instance. For an application build with aiohttp, use the kafkit.registry.aiohttp.RegistryApi type.

  • suffix (str, optional) –

    A suffix that is added to the schema name (and thus subject name), for example _dev1.

    The suffix creates an alternate subjects in the Schema Registry so schemas registered during testing and staging don’t affect the compatibility continuity of a production subject.

    For production, it’s best to not set a suffix.

Notes

The RecordNameSchemaManager helps you manage Avro serialization in your application. This class implements an opinionated workflow that combines lower-level Kafkit components such as the RegistryApi, serializers and deserializers. The key assumptions made by this manager are:

  • Your application holds a local copy of the schemas it serializes and deserializes data with. This is useful so that your application can be developed and tested independently of the Schema Registry.

  • Your application’s schemas are located within a directory on the local filesystem with *.json extensions.

  • The Schema Registry subjects that your application uses have the same compatibility setting.

  • Subjects follow the record name naming strategy (RecordNameStrategy). That is, the subject name is the fully-qualified name of the Avro Schema.

    Note that this is different from the default naming strategy, TopicNameStrategy, where subjects are named for the topic with -key or -value suffixes.

For more information, see Using the RecordNameSchemaManager in the user guide.

Methods Summary

register_schemas(*[, compatibility])

Register all local schemas with the Confluent Schema Registry.

serialize(*, data, name)

Serialize data using the preregistered schema for a Schema Registry subject.

Methods Documentation

async register_schemas(*, compatibility: Optional[str] = None) → None

Register all local schemas with the Confluent Schema Registry.

Parameters

compatibility (str, optional) –

The compatibility setting to apply to all subjects. Allowed values:

  • "BACKWARD"

  • "BACKWARD_TRANSITIVE"

  • "FORWARD"

  • "FORWARD_TRANSITIVE"

  • "FULL"

  • "FULL_TRANSITIVE"

  • "NONE"

  • None

If None (as opposed to "NONE"), then no compatibility setting is set during schema registration:

  • If the subject doesn’t already exist, it will inherit the Schema Registry’s global compatibility setting.

  • If the subject already exists, it will continue to its existing compatibility setting.

Learn more about schema compatibility in the Confluent documentation.

async serialize(*, data: Any, name: str) → bytes

Serialize data using the preregistered schema for a Schema Registry subject.

Parameters
  • data (dict) – An Avro-serializable object.

  • name (str) –

    The name of the schema to serialize the data with. This is also the name of the subject that the schema is associated with in the Confluent Schema Registry, following the record name strategy

    The specific schema that is used will be the one that was locally registered by the RecordNameSchemaManager.

Returns

The Avro-encoded message in the Confluent Wire Format (which identifies the schema in the Schema Registry that was used to encode the message).

Return type

bytes

Raises

ValueError – Raised if there isn’t a locally-available schema with that record name / subject name.