GraphIngest SDK Documentation
The GraphIngest SDK provides decorators and wrappers that turn ordinary functions into orchestrated, observable pipeline steps. Each node is a single unit of work that runs in isolation. Each graph coordinates nodes with retries, timeouts, fan-out, and subgraph nesting.
Architecture Overview
extract
@node
transform
@node
load
@node
Quick Start
from graphingest import node, graph, RetryPolicy
@node(name="extract")
def extract(url: str) -> dict:
return {"url": url, "rows": 100}
@node(name="load")
def load(data: dict) -> str:
return f"loaded {data['rows']} rows"
@graph(name="my-pipeline", retry_policy=RetryPolicy(max_retries=2, delay_seconds=1))
def pipeline(url: str):
data = extract(url)
result = load(data)
return result
# Run it
pipeline("https://api.example.com/data")Installation
# From Git
pip install "graphingest @ git+https://github.com/yourorg/ingest.git#subdirectory=sdk/python"
# Local development (editable)
cd sdk/python && pip install -e .
# With LangGraph support
pip install graphingest[langgraph]@node / node()
A node is a single unit of work that runs in isolation. The SDK handles lifecycle management, logging, result serialization, and automatic retries so you can focus on your business logic.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name | string | Function name | Unique key identifying this node |
cache_ttl | int / number | None | Cache TTL in seconds |
max_retries | int / number | 3 | Max automatic retries on failure |
tags | list / string[] | [] | Metadata tags for dashboard |
version | string | None | Semantic version string |
@node(name="extract-data", cache_ttl=3600, max_retries=5, tags=["etl"])
def extract(url: str) -> dict:
response = requests.get(url)
return response.json()
# Async support
@node(name="async-fetch")
async def fetch(url: str) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()@graph / graph()
A graph is a pipeline entrypoint β the top-level function that orchestrates nodes with retries (exponential backoff), timeouts, parameter validation, state hooks, run context, and streaming logs.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name | string | Function name | Graph name for dashboard |
retry_policy | RetryPolicy | None | Retry configuration |
timeout | int / number | None | Max execution time (sec / ms) |
on_completion | hook[] | [] | Success callbacks |
on_failure | hook[] | [] | Failure callbacks |
on_cancellation | hook[] | [] | Timeout/cancel callbacks |
tags | list / string[] | [] | Metadata tags |
version | string | None | Semantic version |
.map() β Parallel Fan-Out
Fan-out a node across multiple inputs in parallel. Each input runs as a separate isolated execution. Results are collected in order. Must be called from within a @graph function.
@graph(name="parallel-pipeline")
def pipeline(urls: list[str]):
# Fan-out: dispatches len(urls) parallel workers
results = extract.map(urls)
# results[0] corresponds to urls[0], etc.
return results.submit() β Async Dispatch
Dispatch a single node asynchronously and get a NodeFuture back. The node runs in the background while your graph continues.
@graph(name="async-pipeline")
def pipeline(data: dict):
future = slow_transform.submit(data) # returns immediately
do_other_stuff() # do work while it runs
result = future.result(timeout=120) # block until ready
return resultNodeFuture
| Parameter | Type | Default | Description |
|---|---|---|---|
result() | Any / Promise | β | Block until done, return result |
RetryPolicy
Configurable retry strategy with exponential backoff and jitter to avoid thundering herd.
RetryPolicy(
max_retries=4, # Total retry attempts
delay_seconds=2, # Initial delay
backoff_factor=2.0, # Multiplier per attempt
max_delay_seconds=120, # Upper bound
jitter=True, # Β±50% randomization
)
# Delay formula:
# min(delay Γ backoff^attempt, max_delay) Γ random(0.5, 1.5)Presets
RetryPolicy(max_retries=3, delay_seconds=1, backoff_factor=2)Delays: ~1s, ~2s, ~4sRetryPolicy(max_retries=6, delay_seconds=0.5, backoff_factor=3, max_delay_seconds=60)Delays: ~0.5s, ~1.5s, ~4.5s, ~13.5s, ~40.5s, ~60sRetryPolicy(max_retries=3, delay_seconds=5, backoff_factor=1, jitter=False)Delays: 5s, 5s, 5sSubgraphs
Call a @graph from within another @graph. The child graph gets its own run ID, retry policy, timeout, and hooks β with parent_graph_run_id automatically linked.
@graph(name="sub-etl", retry_policy=RetryPolicy(max_retries=2))
def sub_etl(url: str) -> dict:
data = extract(url)
return transform(data)
@graph(name="main-pipeline")
def main_pipeline(urls: list[str]):
results = []
for url in urls:
result = sub_etl(url) # each creates a child graph run
results.append(result)
return resultsRun Contexts
Thread-safe (Python) / async-safe (TypeScript) context objects available from anywhere inside a running graph or node.
GraphRunContext
@graph(name="my-pipeline", version="2.0", tags=["prod"])
def pipeline(source: str):
ctx = GraphRunContext.get()
ctx.graph_run_id # "uuid-..."
ctx.graph_name # "my-pipeline"
ctx.graph_version # "2.0"
ctx.parameters # {"source": "..."}
ctx.tags # ["prod"]
ctx.parent_graph_run_id # None (or parent's ID)NodeRunContext
@node(name="extract")
def extract(url: str):
ctx = NodeRunContext.get()
ctx.node_run_id # "uuid-..."
ctx.node_key # "extract"
ctx.graph_run_id # parent graph's run ID
ctx.map_index # None (or int if .map())
ctx.retry_count # 0Streaming Logger
Real-time log streaming to the dashboard. Logs appear automatically as your pipeline runs.
import logging
logger = logging.getLogger(__name__)
@node(name="extract")
def extract(url: str):
logger.info("Starting extraction") # β appears in dashboard
logger.warning("Rate limit hit")
logger.error("Connection failed")
# Or use the dedicated logger:
from graphingest import get_run_logger
log = get_run_logger()
log.info("Pipeline started")LangGraph / AI Agent Orchestration
GraphIngest integrates with LangGraph to orchestrate AI agents at scale. Wrap any LangGraph agent as a @node, then use .map() to fan-out agents in parallel.
from graphingest.langgraph import agent_node, agent_graph, AgentConfig
researcher = agent_node(
name="researcher",
graph_builder=build_research_agent,
config=AgentConfig(
model="gpt-4o",
temperature=0.0,
max_iterations=10,
system_prompt="You are a research assistant.",
stream_steps=True,
),
cache_ttl=600,
retries=2,
)
# Use like any node:
result = researcher("What is quantum computing?")
# Fan-out: run 50 agents in parallel
results = researcher.map(["query1", "query2", ..., "query50"])Multi-Agent Pattern
Planner
breaks into N sub-tasks
Researcher
Agent 1
Researcher
Agent 2
Researcher
Agent N
Synthesizer
combines all findings
Configuration
Set these environment variables before running your pipeline. You can find your API URL and key in the dashboard settings.
| Variable | Required | Description |
|---|---|---|
GRAPHINGEST_API_URL | Yes | Your GraphIngest API endpoint |
GRAPHINGEST_API_KEY | Yes | Your API key (found in dashboard) |