wms.services
#
Asynchronous services for scraping and persisting website monitoring information.
Module Contents#
Classes#
Asynchronous scraping service. |
|
Asynchronous persistence service. |
|
Table running statistics service. |
|
HTTP Server service wrapping an aiohttp App. |
|
Simple event loop service scheduler. |
Functions#
Handle the request-response workflow with some timeout. |
|
|
Run an infinite loop that sends GET requests periodically according to some spec. |
Attributes#
- wms.services.logger#
- async wms.services.make_request_parse_response(spec: wms.config.WebsiteSpec, timeout: aiohttp.client.ClientTimeout) wms.models.WebsiteScrapeResult #
Handle the request-response workflow with some timeout.
- async wms.services.monitor_site_loop(spec: wms.config.WebsiteSpec, *, queue: asyncio.queues.Queue, metrics: wms.metrics.MonitorMetrics, stop_event: asyncio.locks.Event, timeout_s: float) None #
Run an infinite loop that sends GET requests periodically according to some spec.
The function sinks the result to the passed async Queue and stops iterating when the corresponding event is set by the caller.
- class wms.services.ScrapingService(*, scraping_config: wms.config.ScrapingConfig, kafka_config: wms.config.KafkaConfig, certs_dir: Path | None)#
Bases:
wms.typing.ServiceProtocol
Asynchronous scraping service.
- class wms.services.PersistenceService(*, metrics_manager: wms.db.MetricsManager, migrations_manager: wms.db.MigrationManager, persistence_config: wms.config.PersistenceConfig, kafka_config: wms.config.KafkaConfig, certs_dir: Path | None = None)#
Bases:
wms.typing.ServiceProtocol
Asynchronous persistence service.
- FIXME(aris): We might lose messages with the current way of day things since the offset
will be advanced automatically although the insert may fail. Either commit the offset manually, or reset to earlier state when some transaction fails.
- property metrics wms.metrics.PersistenceMetrics #
Prometheus metrics.
- async persist_results_loop() None #
Poll the metrics topic for new records and persist them in DB.
Records are accumulated in batches and offsets are committed to the cluster after the batch has been persisted in the metrics table.
Postgres and Kafka transaction/offsets commits are co-dependent, they either both succeed or both fail. Rollback is handled crudely by deleting the inserted metrics if the offsets commit fails.
- async flush(metric_tups: collections.abc.Sequence[wms.models.WebsiteMonitorTuple]) list[int] #
Flush uncommited tuples to persistence.
- async delete_uncommitted(primary_keys: collections.abc.Sequence[int]) None #
Delete uncommited tuples from the metrics table.
- class wms.services.TableRunningStatsService(metrics_manager: wms.db.MetricsManager)#
Table running statistics service.
- class wms.services.HTTPServer(app: aiohttp.web.Application, host: str | None = 'localhost', port: int | None = 8080)#
Bases:
wms.typing.ServiceProtocol
HTTP Server service wrapping an aiohttp App.
- class wms.services.ServiceRunner(services: collections.abc.Iterable[wms.typing.ServiceProtocol])#
Simple event loop service scheduler.
Schedules multiple service to run concurrently, handles signals and termination.
- register_signal_handlers(signals: collections.abc.Sequence[enum.IntEnum] = (SIGHUP, SIGTERM, SIGINT)) None #
Register signal handlers.
- async handle_signal(sig: enum.IntEnum, timeout_delay: float = 3) None #
Stop all services and cancel outstanding tasks after some timeout.
We assume only shutdown signals were registered.