Handler
Architecture
System design and data flow for Handler
Handler is a Python 3.13 FastAPI service that runs ETL pipelines and writes normalized data into Cloud Spanner.
System context
Spanner Change Streams -> Dataflow -> Pub/Sub -> Stream -> Cloud Tasks
|
v
Handler (FastAPI)
|
v
Cloud Spanner
Bubble triggers -> Webhooks -> Cloud Tasks -> HandlerService layout
services/handler/
main.py FastAPI app + lifespan
api/v1/endpoints/ Dynamic pipeline router + webhooks endpoints
pipelines/ Vendor ETL pipelines + webhook pipelines
core/ Extractors, loaders, Cloud Tasks helpers
util/ Spanner helpers, transforms, batch write utilities
models/ Partner client models and request payloadsRequest flow (PIMS pipelines)
- Stream schedules Cloud Tasks for a PIMS pipeline.
- Cloud Tasks calls
POST /api/v1/{vendor}/{resource}on Handler. - Handler builds a pipeline client via PartnerCreds and Secret Manager.
- Pipeline extracts, transforms, and loads data into Spanner.
- Pipeline status is persisted back to Spanner.
Dynamic routing and pipeline registry
- Pipelines register themselves with
register_task_endpoint. - The registry describes vendor, resource, constructor params, and status policy.
factory.pygenerates one POST route per pipeline at startup.- Request bodies use
TaskInfoto carry org + pipeline metadata.
Pipeline lifecycle
Each pipeline implements BaseDataPipeline:
- Extract: call partner APIs using vendor extractors.
- Transform: normalize records using Polars.
- Load: upsert data into Spanner with
SpannerLoader.
SpannerLoader uses UpsertStep definitions and SpannerBatchWriteService
to stay under Spanner mutation limits.
Pipeline catalog (non-exhaustive)
ezyVet
patients,households,teamappointments,appointment-types,appointment-statusesinvoices
Bitwerx
patients,households,team,productsappointments,appointment-types,appointment-statusesinvoices,invoices-ondemand(and on-demand mode for appointments)
HAP
clinics,patients,households,team,productsappointments,appointment-types,appointment-statusesinvoices,deleted-invoices,deleted-appointments
Webhooks
webhooks.invoices(invoice payloads)webhooks.nest-bub(entity event dispatcher)
Request flow (Bubble webhooks)
- Webhooks schedules Cloud Tasks for
/api/v1/webhooks/invoicesor/api/v1/webhooks/nest-bub. - Handler executes the corresponding webhook pipeline.
- Bubble entities are upserted into Spanner.
- Optionally, processed invoices are forwarded to Bubble via Cloud Tasks.
Components
services/handler/main.py: app setup, Spanner lifecycle, Sentry.services/handler/api/v1/endpoints/factory.py: dynamic pipeline route generation.services/handler/pipelines/: PIMS and webhooks pipelines.services/handler/core/: security, Cloud Tasks dispatch, and helpers.services/handler/util/: Spanner status updates and helpers.services/handler/core/loader/: Spanner upsert orchestration.services/handler/core/extractors/: partner API extractors and data processors.
Reliability and scaling
- Pipelines enforce a max runtime (
PIPELINE_MAX_RUNTIME_SECONDS). - Cloud Tasks retries are handled upstream; duplicate tasks are tolerated.
- Spanner connection pool and index metadata are warmed at startup.
- Pipeline status updates land in the
Pipelinestable for incremental sync.
Last updated on