wms.services#

Asynchronous services for scraping and persisting website monitoring information.

Module Contents#

Classes#

ScrapingService

Asynchronous scraping service.

PersistenceService

Asynchronous persistence service.

TableRunningStatsService

Table running statistics service.

HTTPServer

HTTP Server service wrapping an aiohttp App.

ServiceRunner

Simple event loop service scheduler.

Functions#

make_request_parse_response(...)

Handle the request-response workflow with some timeout.

monitor_site_loop(→ None)

Run an infinite loop that sends GET requests periodically according to some spec.

Attributes#

wms.services.logger#
async wms.services.make_request_parse_response(spec: wms.config.WebsiteSpec, timeout: aiohttp.client.ClientTimeout) wms.models.WebsiteScrapeResult#

Handle the request-response workflow with some timeout.

async wms.services.monitor_site_loop(spec: wms.config.WebsiteSpec, *, queue: asyncio.queues.Queue, metrics: wms.metrics.MonitorMetrics, stop_event: asyncio.locks.Event, timeout_s: float) None#

Run an infinite loop that sends GET requests periodically according to some spec.

The function sinks the result to the passed async Queue and stops iterating when the corresponding event is set by the caller.

class wms.services.ScrapingService(*, scraping_config: wms.config.ScrapingConfig, kafka_config: wms.config.KafkaConfig, certs_dir: Path | None)#

Bases: wms.typing.ServiceProtocol

Asynchronous scraping service.

async start() None#

Start the Kafka producer and schedule tasks for scraping service.

async process_publish_results() None#

Serialize results in JSON and publish them using the producer.

async stop() None#

Stop the scraping service gracefully.

Set the stop event and almost immediately cancel pending requests. Then wait for the results queue to be drained and flush the producer.

class wms.services.PersistenceService(*, metrics_manager: wms.db.MetricsManager, migrations_manager: wms.db.MigrationManager, persistence_config: wms.config.PersistenceConfig, kafka_config: wms.config.KafkaConfig, certs_dir: Path | None = None)#

Bases: wms.typing.ServiceProtocol

Asynchronous persistence service.

FIXME(aris): We might lose messages with the current way of day things since the offset

will be advanced automatically although the insert may fail. Either commit the offset manually, or reset to earlier state when some transaction fails.

property metrics wms.metrics.PersistenceMetrics#

Prometheus metrics.

property consumer AIOKafkaConsumer | None#

Consumer instance, or None if unitialized.

async start() None#

Start dependencies and ensure the schema is up-to-date.

async persist_results_loop() None#

Poll the metrics topic for new records and persist them in DB.

Records are accumulated in batches and offsets are committed to the cluster after the batch has been persisted in the metrics table.

Postgres and Kafka transaction/offsets commits are co-dependent, they either both succeed or both fail. Rollback is handled crudely by deleting the inserted metrics if the offsets commit fails.

async flush(metric_tups: collections.abc.Sequence[wms.models.WebsiteMonitorTuple]) list[int]#

Flush uncommited tuples to persistence.

async delete_uncommitted(primary_keys: collections.abc.Sequence[int]) None#

Delete uncommited tuples from the metrics table.

async stop() None#

Stop the persistence service gracefully.

Stop the consumer, wait for metrics queue to be drained and flush the last batch to DB.

class wms.services.TableRunningStatsService(metrics_manager: wms.db.MetricsManager)#

Table running statistics service.

async start() None#

Start running stats service gracefully.

async stop() None#

Stop running stats service gracefully.

async metrics_rowcount_loop(timer: float = 15) None#

Periodically output row count stats for the metrics table.

class wms.services.HTTPServer(app: aiohttp.web.Application, host: str | None = 'localhost', port: int | None = 8080)#

Bases: wms.typing.ServiceProtocol

HTTP Server service wrapping an aiohttp App.

async start() None#

Perform actions on start.

async stop() None#

Perform actions on stop.

class wms.services.ServiceRunner(services: collections.abc.Iterable[wms.typing.ServiceProtocol])#

Simple event loop service scheduler.

Schedules multiple service to run concurrently, handles signals and termination.

register_signal_handlers(signals: collections.abc.Sequence[enum.IntEnum] = (SIGHUP, SIGTERM, SIGINT)) None#

Register signal handlers.

async start() None#

Start all services.

async stop() None#

Stop all services.

async handle_signal(sig: enum.IntEnum, timeout_delay: float = 3) None#

Stop all services and cancel outstanding tasks after some timeout.

We assume only shutdown signals were registered.

async run() None#

Register signal handlers, schedule services and handle their completion.