v0.2.0

GraphIngest SDK Documentation

The GraphIngest SDK provides decorators and wrappers that turn ordinary functions into orchestrated, observable pipeline steps. Each node is a single unit of work that runs in isolation. Each graph coordinates nodes with retries, timeouts, fan-out, and subgraph nesting.

Architecture Overview

Your Code

extract

@node

transform

@node

load

@node

SDK Layer
πŸ”„Lifecycle
πŸ“‘Log streaming
☁️Result storage
πŸ”Retry + backoff
⚑Fan-out (.map)
πŸš€Async (.submit)
πŸ“¦Serialization
πŸ“ŠObservability
HTTP
Control Plane
Scheduling
Orchestration
Execution

Quick Start

from graphingest import node, graph, RetryPolicy

@node(name="extract")
def extract(url: str) -> dict:
    return {"url": url, "rows": 100}

@node(name="load")
def load(data: dict) -> str:
    return f"loaded {data['rows']} rows"

@graph(name="my-pipeline", retry_policy=RetryPolicy(max_retries=2, delay_seconds=1))
def pipeline(url: str):
    data = extract(url)
    result = load(data)
    return result

# Run it
pipeline("https://api.example.com/data")

Installation

# From Git
pip install "graphingest @ git+https://github.com/yourorg/ingest.git#subdirectory=sdk/python"

# Local development (editable)
cd sdk/python && pip install -e .

# With LangGraph support
pip install graphingest[langgraph]

@node / node()

A node is a single unit of work that runs in isolation. The SDK handles lifecycle management, logging, result serialization, and automatic retries so you can focus on your business logic.

Parameters

ParameterTypeDefaultDescription
namestringFunction nameUnique key identifying this node
cache_ttlint / numberNoneCache TTL in seconds
max_retriesint / number3Max automatic retries on failure
tagslist / string[][]Metadata tags for dashboard
versionstringNoneSemantic version string
@node(name="extract-data", cache_ttl=3600, max_retries=5, tags=["etl"])
def extract(url: str) -> dict:
    response = requests.get(url)
    return response.json()

# Async support
@node(name="async-fetch")
async def fetch(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(url)
        return resp.json()

@graph / graph()

A graph is a pipeline entrypoint β€” the top-level function that orchestrates nodes with retries (exponential backoff), timeouts, parameter validation, state hooks, run context, and streaming logs.

Parameters

ParameterTypeDefaultDescription
namestringFunction nameGraph name for dashboard
retry_policyRetryPolicyNoneRetry configuration
timeoutint / numberNoneMax execution time (sec / ms)
on_completionhook[][]Success callbacks
on_failurehook[][]Failure callbacks
on_cancellationhook[][]Timeout/cancel callbacks
tagslist / string[][]Metadata tags
versionstringNoneSemantic version

.map() β€” Parallel Fan-Out

Fan-out a node across multiple inputs in parallel. Each input runs as a separate isolated execution. Results are collected in order. Must be called from within a @graph function.

@graph(name="parallel-pipeline")
def pipeline(urls: list[str]):
    # Fan-out: dispatches len(urls) parallel workers
    results = extract.map(urls)
    # results[0] corresponds to urls[0], etc.
    return results

.submit() β€” Async Dispatch

Dispatch a single node asynchronously and get a NodeFuture back. The node runs in the background while your graph continues.

@graph(name="async-pipeline")
def pipeline(data: dict):
    future = slow_transform.submit(data)  # returns immediately
    do_other_stuff()                       # do work while it runs
    result = future.result(timeout=120)    # block until ready
    return result

NodeFuture

ParameterTypeDefaultDescription
result()Any / Promiseβ€”Block until done, return result

RetryPolicy

Configurable retry strategy with exponential backoff and jitter to avoid thundering herd.

RetryPolicy(
    max_retries=4,          # Total retry attempts
    delay_seconds=2,        # Initial delay
    backoff_factor=2.0,     # Multiplier per attempt
    max_delay_seconds=120,  # Upper bound
    jitter=True,            # Β±50% randomization
)

# Delay formula:
# min(delay Γ— backoff^attempt, max_delay) Γ— random(0.5, 1.5)

Presets

Conservative (API calls): RetryPolicy(max_retries=3, delay_seconds=1, backoff_factor=2)Delays: ~1s, ~2s, ~4s
Aggressive (flaky service): RetryPolicy(max_retries=6, delay_seconds=0.5, backoff_factor=3, max_delay_seconds=60)Delays: ~0.5s, ~1.5s, ~4.5s, ~13.5s, ~40.5s, ~60s
Fixed delay: RetryPolicy(max_retries=3, delay_seconds=5, backoff_factor=1, jitter=False)Delays: 5s, 5s, 5s

Subgraphs

Call a @graph from within another @graph. The child graph gets its own run ID, retry policy, timeout, and hooks β€” with parent_graph_run_id automatically linked.

@graph(name="sub-etl", retry_policy=RetryPolicy(max_retries=2))
def sub_etl(url: str) -> dict:
    data = extract(url)
    return transform(data)

@graph(name="main-pipeline")
def main_pipeline(urls: list[str]):
    results = []
    for url in urls:
        result = sub_etl(url)  # each creates a child graph run
        results.append(result)
    return results

Run Contexts

Thread-safe (Python) / async-safe (TypeScript) context objects available from anywhere inside a running graph or node.

GraphRunContext

@graph(name="my-pipeline", version="2.0", tags=["prod"])
def pipeline(source: str):
    ctx = GraphRunContext.get()

    ctx.graph_run_id         # "uuid-..."
    ctx.graph_name           # "my-pipeline"
    ctx.graph_version        # "2.0"
    ctx.parameters           # {"source": "..."}
    ctx.tags                 # ["prod"]
    ctx.parent_graph_run_id  # None (or parent's ID)

NodeRunContext

@node(name="extract")
def extract(url: str):
    ctx = NodeRunContext.get()

    ctx.node_run_id    # "uuid-..."
    ctx.node_key       # "extract"
    ctx.graph_run_id   # parent graph's run ID
    ctx.map_index      # None (or int if .map())
    ctx.retry_count    # 0

Streaming Logger

Real-time log streaming to the dashboard. Logs appear automatically as your pipeline runs.

import logging
logger = logging.getLogger(__name__)

@node(name="extract")
def extract(url: str):
    logger.info("Starting extraction")   # β†’ appears in dashboard
    logger.warning("Rate limit hit")
    logger.error("Connection failed")

# Or use the dedicated logger:
from graphingest import get_run_logger
log = get_run_logger()
log.info("Pipeline started")

LangGraph / AI Agent Orchestration

GraphIngest integrates with LangGraph to orchestrate AI agents at scale. Wrap any LangGraph agent as a @node, then use .map() to fan-out agents in parallel.

python
from graphingest.langgraph import agent_node, agent_graph, AgentConfig

researcher = agent_node(
    name="researcher",
    graph_builder=build_research_agent,
    config=AgentConfig(
        model="gpt-4o",
        temperature=0.0,
        max_iterations=10,
        system_prompt="You are a research assistant.",
        stream_steps=True,
    ),
    cache_ttl=600,
    retries=2,
)

# Use like any node:
result = researcher("What is quantum computing?")

# Fan-out: run 50 agents in parallel
results = researcher.map(["query1", "query2", ..., "query50"])

Multi-Agent Pattern

Question

Planner

breaks into N sub-tasks

.map()

Researcher

Agent 1

Researcher

Agent 2

Researcher

Agent N

Synthesizer

combines all findings

Configuration

Set these environment variables before running your pipeline. You can find your API URL and key in the dashboard settings.

VariableRequiredDescription
GRAPHINGEST_API_URLYesYour GraphIngest API endpoint
GRAPHINGEST_API_KEYYesYour API key (found in dashboard)

Ready to get started?

Deploy your first pipeline in under 5 minutes.