Using the RecordNameSchemaManager

The RecordNameSchemaManager is an opinionated tool for managing Avro Schemas with the Confluent Schema Registry and for serializing data using those schemas. This page will help you understand what the RecordNameSchemaManager does and how to add the RecordNameSchemaManager into your application.

Overview of the RecordNameSchemaManager

The role of a schema manager

Your application needs to serialize data with the exact Avro schemas that it is developed and tested with, rather than the latest schema for a subject in the Confluent Schema Registry. It’s good practice, therefore, to package schemas with your application.

At the same time, you cannot serialize messages without using the Confluent Schema Registry: the registered ID of a schema is included in every Avro-encoded message (see the Confluent Wire Format).

The RecordNameSchemaManager helps you by automatically registering schema maintained inside your app with a Confluent Schema Registry, and automatically associating schemas with their ID in a Confluent Schema Registry while serializing a message.

The “record name” subject naming strategy

The RecordNameSchemaManager specifically adheres to the RecordNameStrategy option for the Schema Registry’s subject name strategy. Under this naming strategy, the name of a subject in the Schema Registry is the same as the fully-qualified name of the Avro schema.

For example, the following Avro schema would be registered in a subject named myapp.a:

{
  "type": "record",
  "name": "a",
  "namespace": "myapp",
  "fields": [
    {"name": "field1", "type": "int"},
    {"name": "field2", "type": "string"}
  ]
}

With the “record name” subject naming strategy, Schema Registry subjects are decoupled from Kafka topics: schemas for a given Schema Registry subject can appear in multiple Kafka topics, and a single Kafka topic can contain messages encoded with schemas from multiple subjects.

By adhering to the “record name” subject naming strategy, the RecordNameSchemaManager lets you specify a schema through its fully-qualified name. Combined with RecordNameSchemaManager’s control of schema versioning, this makes serialization applications convenient to write.

Note

Other subject naming strategies exist, such as TopicNameStrategy and TopicRecordNameStrategy. In fact, TopicNameStrategy (which requires that subjects be named {topic}-key and {topic}-value) is the default. Although schema managers designed around these strategies aren’t currently available, but they could be contributed.

How to use the RecordNameSchemaManager

This section outlines the essential steps for integrating the RecordNameSchemaManager with your application.

Step 1. Collect Avro schemas in a local directory

This workflow assumes that all the Avro schemas your application uses to serialize messages are maintained in the app’s codebase. Gather all those schemas into one directory.

This is a possible file layout:

.
└── src
    └── myapp
        ├── __init__.py
        ├── app.py
        └── avro_schemas/
            ├── myapp.a.json
            └── myapp.b.json

Inside your application, store the path to the directory containing the Avro schemas:

from pathlib import Path

schema_root = Path(__file__).parent / "avro_schemas"

Step 2. Initialize the RecordNameSchemaManager

The RecordNameSchemaManager needs a Schema Registry API client:

from kafkit.registry.aiohttp import RegistryApi

async with aiohttp.ClientSession() as http_session:
    registry_api = RegistryApi(
        session=http_session, url="http://localhost:8081"
    )
    ...

See kafkit.registry.aiohttp.RegistryApi for details.

Then create the schema manager:

schema_manager = RecordNameSchemaManager(
    root=schema_root,
    registry=registry_api,
)

Step 3. Register schemas

Next, register the locally-maintained schemas with the Schema Registry using the register_schemas method:

await manager.register_schemas(compatibility="FORWARD")

The compatibility parameter allows you to set the compatibility settings for each schema’s subject. If you do not wish to update the compatibility settings of subjects, or to use the registry’s defaults, leave the compatibility parameter as None.

Note

The FORWARD setting means that data serialized with the newer schema can be read by an application using an older version of that schema. This setting is useful if schemas are managed by producers, and consumers are gradually updated to keep up.

See Confluent’s documentation on Schema Evolution and Compatibility for information about this and other compatibility options.

It’s safe to use the register_schemas method with schemas that are already registered. The schema is automatically associated with its existing ID in the Schema Registry if it was previously registered.

Step 4. Serialize messages using schema names

Now the fun part — serializing messages into Avro:

data = {"field1": 42, "field2": "Hello world"}
message = await schema_manager.serialize(data=data, name="myapp.a")

Serializing messages is straightforward because you don’t need to maintain schemas or schema IDs in the code for producing messages. Instead, you only need to declare the name of the schema you are using to serialize data.

The same serialize method can serialize both the key and value of a Kafka message.

Now that your data is serialized, you can pass the message bytes object to aiokafka.AIOKafkaProducer.send_and_wait, or similar, method to produce a Kafka message.