RegistryApi

class kafkit.registry.sansio.RegistryApi(*, url: str)

Bases: object

A baseclass for Confluent Schema Registry clients.

Attributes Summary

schema_cache

The schema cache (SchemaCache).

subject_cache

The subject cache (SubjectCache).

Methods Summary

delete(url[, url_vars, data])

Send an HTTP DELETE request.

get(url[, url_vars])

Send an HTTP GET request.

get_schema_by_id(schema_id)

Get a schema from the registry given its ID.

get_schema_by_subject(subject[, version])

Get a schema for a subject in the registry.

patch(url[, url_vars])

Send an HTTP PATCH request.

post(url[, url_vars])

Send an HTTP POST request.

put(url[, url_vars, data])

Send an HTTP PUT request.

register_schema(schema[, subject])

Register a schema or get the ID of an existing schema.

Attributes Documentation

schema_cache

The schema cache (SchemaCache).

subject_cache

The subject cache (SubjectCache).

Methods Documentation

async delete(url: str, url_vars: Optional[Mapping[str, str]] = None, *, data: Any = b'') Any

Send an HTTP DELETE request.

Parameters:
  • url (str) – The endpoint path, usually relative to the RegistryApi.url attribute (an absolute URL is also okay). The url can be templated (/a{/b}/c, where b is a variable).

  • url_vars (dict, optional) – A dictionary of variable names and values to expand the templated url parameter.

Returns:

data – The response body. If the response is JSON, the data is parsed into a Python object.

Return type:

bytes

Raises:
async get(url: str, url_vars: Optional[Mapping[str, str]] = None) Any

Send an HTTP GET request.

Parameters:
  • url (str) – The endpoint path, usually relative to the RegistryApi.url attribute (an absolute URL is also okay). The url can be templated (/a{/b}/c, where b is a variable).

  • url_vars (dict, optional) – A dictionary of variable names and values to expand the templated url parameter.

Returns:

data – The response body. If the response is JSON, the data is parsed into a Python object.

Return type:

bytes

Raises:
async get_schema_by_id(schema_id: int) Dict[str, Any]

Get a schema from the registry given its ID.

Wraps GET /schemas/ids/{int: id}.

Parameters:

schema_id (int) – The ID of the schema in the registry.

Returns:

schema – The Avro schema. The schema is pre-parsed by fastavro.parse_schema.

Return type:

dict

Notes

The schema and ID are cached locally so that repeated calls are fast. This cache is shared by other high-level methods, like register_schema.

async get_schema_by_subject(subject: str, version: Union[str, int] = 'latest') Dict[str, Any]

Get a schema for a subject in the registry.

Wraps GET /subjects/(string: subject)/versions/(versionId: version)

Parameters:
  • subject (str) – Name of the subject in the Schema Registry.

  • version (int or str, optional) – The version of the schema with respect to the subject. To get the latest schema, supply "latest" (default).

Returns:

schema_info – A dictionary with the schema and metadata about the schema. The keys are:

"schema"

The schema itself, preparsed by fastavro.parse_schema.

"subject"

The subject this schema is registered under in the registry.

"version"

The version of this schema with respect to the subject.

"id"

The ID of this schema (compatible with get_schema_by_id).

Return type:

dict

See also

get_schema_by_id

Notes

Results from this method are cached locally, so repeated calls are fast. Keep in mind that any call with the version parameter set to "latest" will always miss the cache. The schema is still cached, though, under it’s true subject version. If you app repeatedly calls this method, and you want to make use of caching, replace "latest" versions with integer versions once they’re known.

async patch(url: str, url_vars: Optional[Mapping[Any, Any]] = None, *, data: Any) Any

Send an HTTP PATCH request.

Parameters:
  • url (str) – The endpoint path, usually relative to the RegistryApi.url attribute (an absolute URL is also okay). The url can be templated (/a{/b}/c, where b is a variable).

  • url_vars (dict, optional) – A dictionary of variable names and values to expand the templated url parameter.

  • data (object) – The body of the request as a JSON-serializable object.

Returns:

data – The response body. If the response is JSON, the data is parsed into a Python object.

Return type:

bytes

Raises:
async post(url: str, url_vars: Optional[Mapping[str, str]] = None, *, data: Any) Any

Send an HTTP POST request.

Parameters:
  • url (str) – The endpoint path, usually relative to the RegistryApi.url attribute (an absolute URL is also okay). The url can be templated (/a{/b}/c, where b is a variable).

  • url_vars (dict, optional) – A dictionary of variable names and values to expand the templated url parameter.

  • data (object) – The body of the request as a JSON-serializable object.

Returns:

data – The response body. If the response is JSON, the data is parsed into a Python object.

Return type:

bytes

Raises:
async put(url: str, url_vars: Optional[Mapping[str, str]] = None, data: Any = b'') Any

Send an HTTP PUT request.

Parameters:
  • url (str) – The endpoint path, usually relative to the RegistryApi.url attribute (an absolute URL is also okay). The url can be templated (/a{/b}/c, where b is a variable).

  • url_vars (dict, optional) – A dictionary of variable names and values to expand the templated url parameter.

  • data (bytes, optional) – The body of the request as a JSON-serializable object.

Returns:

data – The response body. If the response is JSON, the data is parsed into a Python object.

Return type:

bytes

Raises:
async register_schema(schema: Mapping[str, Any], subject: Optional[str] = None) int

Register a schema or get the ID of an existing schema.

Wraps POST /subjects/(string: subject)/versions.

Parameters:
  • schema (dict) – An Avro schema as a Python dictionary.

  • subject (str, optional) – The subject to register the schema under. If not provided, the fully-qualified name of the schema is adopted as the subject name.

Returns:

schema_id – The ID of the schema in the registry.

Return type:

int

Notes

The schema and ID are cached locally so that repeated calls are fast. This cache is shared by other high-level methods, like get_schema_by_id.