wms.db#

Asynchronous Postgres client module.

The client maintains the underlying connection pool and provides some generic functions.

Migration operations are handled by a dedicated manager class. Both upgrades and rollbacks are supported. They are stored in the migrations subdirectory with the following format: <version>_<name>_<type>. The version is a monotonically increasing integer starting at 1. The name can be any string that doesn’t contain underscores. The type must be either update or rollback.

The metrics manager provides convenience methods over the asynchronous Postgres client by preparing data and executing prepared SQL statements transactionally.

Module Contents#

Classes#

AsyncPostgresClient

Asynchronous Postgres client used to persist metrics using prepared statements.

MigrationManager

Migration manager abstraction over the DB client.

MetricsManager

Metrics manager abstraction over the DB client.

Functions#

available_migration_specs_iter(...)

Get available migrations as a map of tuples.

filter_sort_migration_specs(...)

Filter migrations to some closed range, i.e. both start and end are included.

Attributes#

wms.db.logger#
wms.db.MigrationSpec :TypeAlias#
wms.db.MigrationType :TypeAlias#
wms.db.METRICS_TABLE :Final[str] = metrics#
wms.db.INSERT_METRICS_STMT :Final[str] = INSERT INTO metrics(host, url, request_dt, response_dt, status_code, pattern_expected,...#
wms.db.INSERT_STMT_RETURNING :Final[str]#
wms.db.available_migration_specs_iter(*, migration_type: MigrationType) collections.abc.Iterable[MigrationSpec]#

Get available migrations as a map of tuples.

wms.db.filter_sort_migration_specs(migrations_iter: collections.abc.Iterable[MigrationSpec], *, migration_type: MigrationType, start: int | None, end: int | None) collections.abc.Iterable[MigrationSpec]#

Filter migrations to some closed range, i.e. both start and end are included.

class wms.db.AsyncPostgresClient(dsn: yarl.URL, *, ssl: SSLContext | SSLMode | bool)#

Bases: wms.typing.ServiceProtocol

Asynchronous Postgres client used to persist metrics using prepared statements.

dsn() yarl.URL#

Return the Data Source Name used to access the underlying DB server.

ssl() SSLContext | SSLMode | bool#

Return the SSL context or mode used to access the underlying DB server.

property pool asyncpg.pool.Pool#

Return the underlying pool and raise if it hasn’t been defined yet.

async start() None#

Create a new connection pool.

async stop() None#

Close all connections in the pool.

async get_server_version() str#

Return the server version.

async get_tables() list[str]#

Return table names for the public schema.

async drop_table(table_name: str) None#

Drop table.

async drop_all_tables() None#

Naive wait to drop all tables, doesn’t take FK constraints into account.

async truncate_table(table_name: str) None#

Truncate table.

class wms.db.MigrationManager(db_client: AsyncPostgresClient)#

Bases: wms.typing.ServiceProtocol

Migration manager abstraction over the DB client.

async start() None#

Ensure the client is started.

async stop() None#

Ensure the client is stopped.

async get_migration_version() int | None#

Return table names for the public schema.

async execute_migration(ddl_statement: str, version: int, name: str) None#

Try to execute DDL transactionally.

async execute_versioned_ddl(*, migration_type: MigrationType, start: int | None = None, end: int | None = None) int#

Load DDL from the migrations dir and execute it transactionally.

async maybe_migrate() int#

Determine state of DB wrt schema DDL and attempt to migrate if possible.

class wms.db.MetricsManager(db_client: AsyncPostgresClient)#

Bases: wms.typing.ServiceProtocol

Metrics manager abstraction over the DB client.

async start() None#

Ensure the client is started.

async stop() None#

Ensure the client is stopped.

property migrations_done asyncio.locks.Event#

Mirror the client’s is ready property.

async insert_metrics(metrics: wms.models.WebsiteScrapeResult) int#

Insert a single sample in the metrics table.

async insert_metrics_tuple(metrics_tup: wms.models.WebsiteMonitorTuple) int#

Insert a single sample in the metrics table.

async batch_insert_metric_instances(metrics: collections.abc.Iterable[wms.models.WebsiteScrapeResult]) list[int]#

Insert a batch of monitoring result instances in the metrics table.

Unzip fields into their own tuples.

async batch_insert_metric_tuples(metrics_tups: collections.abc.Sequence[wms.models.WebsiteMonitorTuple]) list[int]#

Insert a batch of monitoring result instances in the metrics table.

Need to do two queries since executemany swallows the result and fetch doesn’t support multiple input records.

async delete_metrics(primary_keys: collections.abc.Sequence[int]) None#

Delete metrics with given primary keys.

async get_metrics_count() int#

Count the metrics’ table size.

async get_unique_metrics_count() int#

Count unique elements in the metrics table. Mainly used for testing.

async truncate_metrics_table() None#

Truncate metrics table.