Skip to content

Welcome to Pipeteer

Pipeteer simplifies the complexity of durable execution whilst not hiding the underlying persistence.

Why Pipeteer?

Use pipeteer if you need...

  • Persistance: your app can stop or crash and resume at any time without losing progress
  • Observability: you can see the state of your app at any time, and modify it programmatically at runtime
  • Exactly-once semantics: your app can be stopped and resumed without dropping or duplicating work
  • Fault tolerance: if a task fails, it'll keep working on other tasks and retry it later
  • Explicitness: pipeteer's high level API is a very thin abstraction over SQLModel (for persistance) and ZeroMQ (for inter-process communication)

Proof of Concept

Definition. You can define a durable workflow this easy:

from pipeteer import activity, workflow, WorkflowContext

@activity()
async def double(x: int) -> int:
  return 2*x

@workflow()
async def quad(x: int, ctx: WorkflowContext) -> int:
  x2 = await ctx.call(double, x)
  x4 = await ctx.call(double, x2)
  return x4

Worker. And here's how to run it:

import asyncio
from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

async def main():
  await asyncio.gather(
    double.run(ctx),
    quad.run(ctx),
    ctx.zmq.proxy(),
  )

Input. How to give it tasks?

from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

Input = quad.input(ctx)
with db.session as s:
  s.add(Input(key='task', value=1, output='my-output'))
  s.commit()

await quad.notify(ctx)

Output. How to get the results?

from sqlmodel import select
from pipeteer import DB, Context

db = DB.at('pipeline.db')
ctx = Context.of(db)

Output = quad.output(ctx, 'my-output')
while True:
  with db.session as s:
    for entry in s.exec(select(Output)):
      print(f'Output: {entry.key} -> {entry.value}')
      s.delete(entry)
    s.commit()
  await ctx.wait('my-output')