Manthan

Water: The Agent Harness Framework

· Manthan Gupta

Building multi-agent systems with existing frameworks is painful. While OpenAI Swarm, Google ADK, LangChain, CrewAI, AutoGen, Agno, etc have orchestration capabilities, they are complex, verbose, and unintuitive. Simple workflows require dozens of lines of boilerplate code, and adding branching logic, retries, or conditional execution becomes a debugging nightmare.

Water is an agent harness framework — it provides the infrastructure around your AI agents, not the agents themselves. Orchestration, resilience, observability, approval gates, sandboxing, and deployment tooling so you can focus on what your agents actually do.

Instead of wrestling with framework-specific APIs and verbose configurations, you define workflows with clean, intuitive abstractions. Complex branching logic, error handling, and loops that would normally require hours of debugging work seamlessly out of the box.

At the core of Water are these three components:

  1. Tasks: The building blocks of your system. Each task has a typed input schema, output schema, and an execute function.
  2. Flows: The orchestration layer that coordinates tasks into workflows — sequential, parallel, branching, looping, DAG, and more.
  3. Execution Engine: The async runtime that runs tasks in the right order, handles errors, retries, checkpoints, and emits events along the way.

Table of Contents

Quick Start

Install Water:

pip install water-ai

Create a Task

A task is the smallest unit of work in Water. You define a typed input schema, a typed output schema, and a function that transforms one into the other. Schemas use Pydantic models, which means you get automatic validation, serialization, and documentation for free. If the output of one task doesn’t match the input schema of the next, Water catches it before execution.

import requests
from water import create_task
from pydantic import BaseModel

class WeatherRequest(BaseModel):
    city: str

class WeatherResult(BaseModel):
    city: str
    temperature: float
    description: str

def get_weather(params, context):
    city = params["input_data"]["city"]
    url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=demo&units=metric"
    response = requests.get(url)
    data = response.json()
    return {
        "city": city,
        "temperature": data["main"]["temp"],
        "description": data["weather"][0]["description"],
    }

weather_task = create_task(
    id="weather",
    description="Fetch current weather",
    input_schema=WeatherRequest,
    output_schema=WeatherResult,
    execute=get_weather,
)

Every task receives two arguments:

  • params — contains input_data (the output of the previous task or the initial input)
  • context — the execution context with flow_id, task_id, execution_id, step_number, flow_metadata, and methods like get_task_output(task_id) and get_all_task_outputs()

Create & Register a Flow

A flow is a container that orchestrates tasks into a workflow. You compose tasks using methods like .then(), .parallel(), .branch(), and others. Calling .register() makes the flow available to the execution engine, which handles scheduling, error recovery, and event emission.

from water import Flow

weather_flow = Flow(id="weather_check", description="Get weather for a city")
weather_flow.then(weather_task).register()

Run the Flow

Flows run asynchronously. You pass an input dict that matches the first task’s input schema, and the engine returns the final task’s output. Between those two points, every task in the chain executes in the order you defined, with data flowing automatically from one task to the next.

import asyncio

async def main():
    result = await weather_flow.run({"city": "London"})
    print(f"{result['city']}: {result['temperature']}°C, {result['description']}")

if __name__ == "__main__":
    asyncio.run(main())
python weather_flow.py

Flow Patterns

Water supports composable flow patterns that chain together with a fluent API. If output/input schemas match, data flows automatically between tasks. These patterns can be mixed freely within a single flow — a sequential chain can branch into parallel tasks, which can loop, which can fan out with map, all in the same pipeline.

Sequential

The most basic pattern. Chain tasks in sequence using .then(). Each task receives the output of the previous one as its input. This is how you build linear pipelines where each step depends on the result of the step before it — validate, then process, then save.

from water import Flow, create_task

flow = Flow(id="pipeline", description="Sequential pipeline")
flow.then(validate_task)\
    .then(process_task)\
    .then(save_task)\
    .register()

Parallel

When tasks are independent of each other, run them concurrently using .parallel(). All tasks receive the same input and execute simultaneously. Their outputs are collected into a single dict keyed by task ID, which the next task in the chain receives. This is useful when you need to fetch data from multiple sources, call multiple APIs, or run multiple analyses on the same input without waiting for each to finish before starting the next.

flow = Flow(id="parallel_flow", description="Parallel tasks")
flow.parallel([fetch_users, fetch_orders, fetch_inventory])\
    .then(merge_task)\
    .register()

# merge_task receives:
# {
#     "fetch_users": { ... },
#     "fetch_orders": { ... },
#     "fetch_inventory": { ... }
# }

Conditional Branching

Route execution to different tasks based on the data at runtime using .branch(). You provide a list of condition-task pairs, and the engine evaluates each condition against the current data, executing the first task whose condition returns true. This is how you build flows that behave differently based on user input, feature flags, or intermediate results — like routing notifications to email, SMS, or push based on user preferences.

flow = Flow(id="notifications", description="Send via best channel")
flow.branch([
    (lambda data: data["type"] == "email", send_email_task),
    (lambda data: data["type"] == "sms", send_sms_task),
    (lambda data: data["type"] == "push", send_push_task),
]).register()

Loops

Repeat a task while a condition holds using .loop(). The engine runs the task, checks the condition against the output, and runs it again if the condition is still true. A max_iterations guard prevents infinite loops. This is useful for polling APIs until a result is ready, refining agent outputs iteratively, or retrying operations with modified parameters until they succeed.

flow = Flow(id="retry_flow", description="Retry until success")
flow.loop(
    lambda data: data.get("status") != "success",
    retry_task,
    max_iterations=5,
).register()

Map (Fan-Out / Fan-In)

Process each item in a list in parallel using .map(). You specify which key in the input data contains the list, and the engine spawns one execution of the task per item, runs them all concurrently, and collects the results back into a list. This is the fan-out / fan-in pattern — take a batch of items, process each one independently, then gather the results. Useful for batch image processing, parallel document analysis, or running the same transformation across multiple records.

flow = Flow(id="image_pipeline", description="Process image batch")
flow.map(generate_thumbnail, over="images")\
    .then(summarize_batch)\
    .register()

# If input is {"images": [img1, img2, img3]}, the task runs 3 times
# in parallel, once per image. Results are collected back into a list.

DAG (Directed Acyclic Graph)

For pipelines that don’t fit neatly into sequential or parallel patterns, define tasks with explicit dependencies using .dag(). You list all tasks and specify which tasks depend on which others. The engine topologically sorts the graph, runs tasks as soon as their dependencies are satisfied, and parallelizes where possible. This handles diamond patterns (where two tasks depend on the same upstream, and a downstream depends on both), multi-stage ETL pipelines, and any workflow where the dependency structure is more complex than a straight line.

flow = Flow(id="data_pipeline", description="Diamond DAG")
flow.dag(
    [ingest_task, clean_task, enrich_task, publish_task],
    dependencies={
        "clean": ["ingest"],
        "enrich": ["ingest"],
        "publish": ["clean", "enrich"],
    },
).register()

# ingest runs first, then clean and enrich run in parallel,
# then publish runs after both complete.

SubFlow Composition

As your system grows, you’ll want to reuse entire flows as building blocks inside other flows. SubFlow wraps a registered flow as a task, so you can nest it inside another flow’s chain. This is how you build modular, reusable pipeline components — a validation flow, a transformation flow, and a notification flow that you compose into different combinations for different use cases.

Input and output mappings let you bridge schema differences between flows. If the outer flow uses raw_input but the inner flow expects text, the input mapping handles the translation. The output mapping works the same way in reverse. compose_flows() is a shortcut for chaining multiple flows sequentially without writing the boilerplate yourself.

from water import Flow, SubFlow, compose_flows

# Create reusable inner flows
validation_flow = Flow(id="validation")
validation_flow.then(check_format).then(check_content).register()

transform_flow = Flow(id="transform")
transform_flow.then(normalize_task).then(enrich_task).register()

# Nest a flow as a task with input/output mapping
sub = SubFlow(
    validation_flow,
    input_mapping={"text": "raw_input"},     # map inner "text" ← outer "raw_input"
    output_mapping={"clean_text": "text"},   # map outer "clean_text" ← inner "text"
)

outer = Flow(id="pipeline")
outer.then(sub.as_task()).then(process_task).register()

# Or compose multiple flows sequentially in one line
full_pipeline = compose_flows(validation_flow, transform_flow, id="full_pipeline")
result = await full_pipeline.run({"text": "hello"})

Conditional Execution & Fallbacks

Not every task needs to run every time. The when parameter lets you conditionally skip a task based on the data — if the condition returns false, the data passes through unchanged to the next task in the chain. The fallback parameter provides an alternative task to run if the primary task throws an exception. Together, these give you fine-grained control over which tasks execute without needing to restructure your entire flow.

flow = Flow(id="smart_flow", description="Conditional + fallback")

# Skip task if condition is false (data passes through unchanged)
flow.then(cache_task, when=lambda data: data.get("use_cache", False))

# Route to fallback if primary task fails
flow.then(primary_api, fallback=backup_api)

Try-Catch-Finally

For more complex error handling, Water provides try_catch() — the flow equivalent of Python’s try/except/finally. You define a list of tasks to attempt, a catch task that handles any errors, and an optional finally task that always runs regardless of success or failure. The catch task receives a dict with the error message, error type, the ID of the failed task, and the input data that caused the failure, giving you everything you need to log, alert, or attempt recovery.

This is particularly useful in agent pipelines where an LLM call might fail, an external API might be down, or a downstream system might reject your data. Instead of the entire flow crashing, the catch task can log the error, send a notification, return a default response, or attempt an alternative strategy.

flow = Flow(id="safe_pipeline", description="Error-handled pipeline")

flow.try_catch(
    try_tasks=[risky_api_call, process_response],
    catch_task=error_handler_task,     # receives the error info
    finally_task=cleanup_task,         # always runs
).register()

# If risky_api_call or process_response throws, catch_task runs with:
# {"error": "...", "error_type": "...", "task_id": "...", "input_data": {...}}
# finally_task always runs regardless of success or failure.

You can also use .on_error() for simpler cases where you just want a fallback task:

flow = Flow(id="with_fallback")
flow.then(primary_task).on_error(fallback_task).register()

Agent Harness

This is where Water differentiates from simple workflow engines. The agent harness provides infrastructure around your AI agents — resilience, observability, approval gates, sandboxing — so agents can focus on their job. You bring the LLM logic; Water handles everything else: prompt formatting, provider switching, error recovery, cost tracking, streaming, tool execution, and more.

LLM Tasks

An LLM task wraps any language model behind a unified interface. You provide a prompt template with variables, a system prompt, and a provider instance (OpenAI, Anthropic, or any custom provider). Water handles substituting the variables from the input data, formatting the messages for the specific provider’s API, making the API call, and extracting the response text into a structured output.

This means you can swap providers by changing one line of code, chain multiple agents into pipelines where each agent’s output feeds into the next agent’s prompt, and test everything locally with MockProvider without making real API calls.

from water.agents import create_agent_task, OpenAIProvider, AnthropicProvider, MockProvider

# Simple agent task with prompt template
agent = create_agent_task(
    id="explainer",
    description="Explains a topic",
    prompt_template="Explain {topic} in one sentence.",
    provider_instance=OpenAIProvider(model="gpt-4o"),
)

flow = Flow(id="explain", description="Explain things")
flow.then(agent).register()

result = await flow.run({"topic": "the Water framework"})
# result["response"] = "Water is a Python agent harness framework..."

Chain multiple agents into a pipeline — planner, coder, reviewer:

planner = create_agent_task(
    id="planner",
    description="Plans the implementation",
    prompt_template="Design a plan for: {feature}",
    system_prompt="You are a senior software architect.",
    provider_instance=OpenAIProvider(model="gpt-4o"),
)

coder = create_agent_task(
    id="coder",
    description="Writes the code",
    prompt_template="Implement this plan:\n{plan}",
    system_prompt="You are an expert Python developer.",
    provider_instance=AnthropicProvider(model="claude-sonnet-4-20250514"),
)

reviewer = create_agent_task(
    id="reviewer",
    description="Reviews the code",
    prompt_template="Review this code:\n{code}",
    system_prompt="You are a meticulous code reviewer.",
    provider_instance=OpenAIProvider(model="gpt-4o"),
)

flow = Flow(id="dev_pipeline", description="Plan → Code → Review")
flow.then(planner).then(coder).then(reviewer).register()

Supported providers: OpenAIProvider, AnthropicProvider, MockProvider (for testing), CustomProvider (bring your own).

Streaming LLM Agents

Standard LLM calls return the complete response only after the model finishes generating. For long responses, this means the user stares at a blank screen for seconds. Streaming fixes this by delivering tokens as they’re generated — the user sees the response being written in real time.

Water’s streaming agent tasks work exactly like regular agent tasks, but with an on_chunk callback that fires for every token. Each StreamChunk contains a delta (the new text fragment), a finish_reason (null until the model is done), and a metadata dict. The final result still contains the full accumulated response text, so downstream tasks work the same way whether you stream or not.

There are three streaming providers: OpenAIStreamProvider wraps OpenAI’s streaming API, AnthropicStreamProvider wraps Anthropic’s, and MockStreamProvider yields words one at a time for testing without making real API calls.

from water.agents import create_streaming_agent_task, MockStreamProvider

# Use OpenAI or Anthropic streaming providers in production
from water.agents import OpenAIStreamProvider, AnthropicStreamProvider

provider = OpenAIStreamProvider(model="gpt-4o", temperature=0.7)

# on_chunk fires for every token
agent = create_streaming_agent_task(
    id="streaming_writer",
    description="Writes content with streaming",
    prompt_template="Write a short story about {topic}",
    provider_instance=provider,
    on_chunk=lambda chunk: print(chunk.delta, end="", flush=True),
)

flow = Flow(id="stream_flow")
flow.then(agent).register()

result = await flow.run({"topic": "a robot learning to paint"})
# Tokens stream to stdout as they arrive
# result["response"] contains the full accumulated text

Multi-Agent Orchestration

Some tasks require multiple specialized agents working together — a researcher, a writer, and an editor, or a planner, a coder, and a reviewer. create_agent_team lets you define a team of agents, each with their own role, provider, and system prompt, and coordinates their execution using a strategy.

With "sequential" strategy, agents run one after another, each seeing the previous agent’s output. With "round_robin", the system cycles through agents across multiple rounds of interaction. With "dynamic", the system decides which agent to call next based on the current state. All agents share a SharedContext object, so any agent can read what other agents have produced — the researcher’s notes, the writer’s draft, the editor’s feedback.

from water.agents import create_agent_team, AgentRole, SharedContext

team = create_agent_team(
    team_id="research_team",
    roles=[
        AgentRole(
            id="researcher",
            provider=OpenAIProvider(model="gpt-4o"),
            system_prompt="Research the topic thoroughly.",
        ),
        AgentRole(
            id="writer",
            provider=AnthropicProvider(model="claude-sonnet-4-20250514"),
            system_prompt="Write a clear article based on the research.",
        ),
        AgentRole(
            id="editor",
            provider=OpenAIProvider(model="gpt-4o"),
            system_prompt="Edit the article for clarity and accuracy.",
        ),
    ],
    strategy="sequential",  # also: "round_robin", "dynamic"
)

flow = Flow(id="content_pipeline", description="Research → Write → Edit")
flow.then(team).register()

Tool Use

LLMs are good at reasoning but they can’t browse the web, query databases, or call APIs on their own. Tool use bridges this gap. You define tools as functions with a name, description, and input schema. The ToolExecutor sends the tool definitions to the LLM, the LLM decides which tools to call and with what arguments, the executor runs those tools and feeds the results back to the LLM, and the LLM generates its final response. This loop continues for up to max_rounds or until the LLM responds without requesting any tools.

Tools auto-generate schemas compatible with both OpenAI’s function calling format and Anthropic’s tool use format via tool.to_openai_schema() and tool.to_anthropic_schema(), so you write the tool once and it works with any provider. You can group related tools into a Toolkit for organization.

from water.agents import Tool, Toolkit, ToolExecutor

# Define tools
search_tool = Tool(
    name="web_search",
    description="Search the web for information",
    input_schema={"type": "object", "properties": {"query": {"type": "string"}}},
    execute=lambda args: {"results": search_web(args["query"])},
)

calc_tool = Tool(
    name="calculator",
    description="Evaluate a math expression",
    input_schema={"type": "object", "properties": {"expression": {"type": "string"}}},
    execute=lambda args: {"result": eval(args["expression"])},
)

# Group tools into a toolkit
toolkit = Toolkit(name="research_tools", tools=[search_tool, calc_tool])

# ToolExecutor manages the LLM ↔ tool call loop
executor = ToolExecutor(
    provider=OpenAIProvider(model="gpt-4o"),
    tools=toolkit,
    max_rounds=5,  # max tool-call rounds before stopping
)

result = await executor.run(messages=[
    {"role": "user", "content": "What is the population of France times 3?"}
])
# result["text"] = "The population of France is ~68 million, so 68M × 3 = 204 million"
# result["tool_calls"] = [{...}, {...}]  # history of tool invocations
# result["rounds"] = 2

Context Management

Every LLM has a context window limit — the maximum number of tokens it can process in a single request. As conversations grow longer or you accumulate more context from previous tasks, you’ll hit that limit. The ContextManager automatically truncates or compresses your message history to fit within the window while preserving the most important information.

TokenCounter handles the token counting itself. For OpenAI models it uses tiktoken for exact counts; for other providers it uses a character-based approximation. The reserve_tokens parameter ensures you always leave enough room for the model’s response.

Three truncation strategies are available:

  • SLIDING_WINDOW — drops the oldest messages first, keeping the most recent conversation. Best for chat-like interactions where recent context matters most.
  • SUMMARIZE — compresses older messages into a summary using a function you provide (typically another LLM call). Preserves the gist of the entire conversation at the cost of one extra API call.
  • PRIORITY — keeps system messages (which define the agent’s behavior) and the most recent messages, dropping messages from the middle. A good default when system instructions must always be present.
from water.agents import ContextManager, TokenCounter, TruncationStrategy

# Count tokens accurately (uses tiktoken for OpenAI models)
counter = TokenCounter(provider="openai", model="gpt-4o")
print(counter.count("Hello, world!"))  # 4
print(counter.count_messages(messages))  # total tokens across messages

# Manage context windows with automatic truncation
ctx = ContextManager(
    max_tokens=4096,
    strategy=TruncationStrategy.SLIDING_WINDOW,  # also: SUMMARIZE, PRIORITY
    reserve_tokens=500,  # reserve tokens for the response
)

# Automatically trims old messages to fit the context window
trimmed = ctx.prepare_messages(long_conversation)

Prompt Templates

Hardcoding prompts in your agent tasks makes them brittle and hard to maintain. Prompt templates separate the prompt structure from the data, using {{variable}} placeholders that get filled in at runtime. You can define default values for variables, validate that all required variables are provided before rendering, and compose multiple templates together.

The PromptLibrary acts as a central registry for all your templates. Register templates by name, render them with data, or compose multiple templates into a single combined template. This is especially useful when you have a standard system prompt structure that you want to reuse across many agents — register it once, compose it with task-specific templates, and every agent gets a consistent prompt format.

from water.agents import PromptTemplate, PromptLibrary

# Simple template with {{variable}} syntax
template = PromptTemplate(
    template="You are a {{role}}. Please {{action}} the following: {{content}}",
    defaults={"role": "helpful assistant"},
)

result = template.render(action="summarize", content="This is a long article...")
# "You are a helpful assistant. Please summarize the following: This is a long article..."

# Inspect variables
template.get_variables()      # ["role", "action", "content"]
template.validate({"action"}) # ["role", "content"]  (missing vars)

# Use a library to manage templates
library = PromptLibrary()
library.register("system", "You are a {{role}} specialized in {{domain}}.")
library.register("task", "Please {{action}}: {{input}}")

# Compose multiple templates into one
combined = library.compose("system", "task", separator="\n\n")
result = combined.render(role="expert", domain="Python", action="review", input="def foo(): pass")

Fallback Chains

In production, no single LLM provider is 100% reliable. OpenAI has outages, Anthropic has rate limits, and any provider can return errors under load. A FallbackChain wraps multiple providers and automatically tries the next one when the current one fails.

Three strategies control how providers are selected:

  • first_success — tries providers in the order you listed them. The first provider to return a successful response wins. Use this when you have a preferred provider and cheaper alternatives as backup.
  • round_robin — distributes calls evenly across all providers, cycling through them. Use this to spread load and avoid hitting rate limits on any single provider.
  • lowest_latency — tracks each provider’s average response time and routes new requests to the fastest one. Use this when response speed matters and you want the system to automatically learn which provider is performing best.

You can also attach circuit breakers to individual providers, so a provider that starts failing repeatedly gets temporarily removed from rotation instead of slowing down every request with timeout errors.

from water.agents import FallbackChain
from water.resilience import CircuitBreaker

chain = FallbackChain(
    providers=[
        OpenAIProvider(model="gpt-4o"),
        AnthropicProvider(model="claude-sonnet-4-20250514"),
        OpenAIProvider(model="gpt-4o-mini"),  # cheaper fallback
    ],
    strategy="first_success",  # also: "round_robin", "lowest_latency"
    circuit_breakers={
        0: CircuitBreaker(failure_threshold=3, recovery_timeout=60),
    },
)

# Tries providers in order until one succeeds
result = await chain.complete([{"role": "user", "content": "Hello"}])

# Check per-provider metrics
for m in chain.get_metrics():
    print(f"Provider {m['index']}: {m['calls']} calls, {m['failures']} failures, {m['avg_latency']:.2f}s")

Batch Processing

When you need to process hundreds or thousands of inputs through an LLM — summarizing a document library, classifying support tickets, translating a product catalog — doing them one at a time is painfully slow. BatchProcessor runs multiple inputs through a task concurrently with a configurable concurrency limit, so you can saturate your API rate limit without overwhelming it.

The processor handles retries for individual failures, tracks progress with an optional callback, and returns a BatchResult with detailed per-item status. You can inspect which items succeeded, which failed (and why), and the overall success rate. Wrapping it as a flow task with create_batch_task lets you embed batch processing into larger pipelines.

from water.agents import BatchProcessor, create_batch_task

# Direct batch processing
processor = BatchProcessor(
    max_concurrency=5,      # max parallel requests
    retry_failed=True,      # retry failures
    max_retries=2,
    on_progress=lambda done, total: print(f"{done}/{total}"),
)

result = await processor.run_batch(
    task=summarize_task,
    inputs=[{"text": doc} for doc in documents],
)

print(f"Success rate: {result.success_rate:.0%}")
for item in result.get_results():
    print(item.result)

# Or wrap as a flow task
batch_task = create_batch_task(
    id="batch_summarize",
    task=summarize_task,
    max_concurrency=10,
    input_key="documents",   # reads list from this key
    output_key="summaries",  # writes results to this key
)

flow = Flow(id="batch_flow")
flow.then(batch_task).register()
result = await flow.run({"documents": [{"text": "..."}, {"text": "..."}]})

Dynamic Planning

Sometimes you don’t know the exact sequence of tasks at design time. The user says “find the latest AI news and summarize it in Spanish,” and the system needs to figure out that this means: (1) search the web, (2) summarize the results, (3) translate the summary. The PlannerAgent handles this by using an LLM to decompose a natural-language goal into a sequence of concrete steps, then executing those steps using tasks from a TaskRegistry.

You register your available tasks with descriptions, give the planner a goal, and it generates an ExecutionPlan — a list of steps with task names and input mappings that describe how data flows from one step to the next. You can inspect the plan before executing it, or use plan_and_execute() to do both in one call. The planner also records execution history so you can debug what happened.

from water.agents import PlannerAgent, TaskRegistry, create_planner_task

# Register available tasks
registry = TaskRegistry()
registry.register("search", search_task, "Search the web for information")
registry.register("summarize", summarize_task, "Summarize a piece of text")
registry.register("translate", translate_task, "Translate text to another language")

# The planner uses an LLM to create and execute plans
planner = PlannerAgent(
    provider=OpenAIProvider(model="gpt-4o"),
    task_registry=registry,
    max_steps=10,
)

# Plan and execute in one call
result = await planner.plan_and_execute(
    goal="Find the latest news about AI and summarize it in Spanish",
    initial_data={"language": "Spanish"},
)

# Or plan first, inspect, then execute
plan = await planner.plan("Find and summarize AI news")
print(plan.reasoning)
for step in plan.steps:
    print(f"  {step.task_name}: {step.description}")
result = await planner.execute_plan(plan)

# Use as a flow task
planner_task = create_planner_task(
    id="auto_planner",
    provider=OpenAIProvider(model="gpt-4o"),
    task_registry=registry,
    goal_key="objective",
)

flow = Flow(id="planning_flow")
flow.then(planner_task).register()
result = await flow.run({"objective": "Research and summarize quantum computing"})

Approval Gates

When agents take actions with real-world consequences — deploying code, sending emails, modifying databases — you often want a human to sign off before the action happens. Approval gates pause the flow and wait for a human to approve or deny the pending action.

You define a policy that specifies which risk levels require manual approval and which can be auto-approved. Low-risk actions (reading data, generating reports) sail through automatically. High-risk and critical actions (production deployments, financial transactions) block until a human responds. The gate has a configurable timeout — if no one responds within the window, it either auto-approves or auto-denies based on your policy.

The approval gate exposes methods that a UI, Slack bot, or API endpoint can call: gate.get_pending() lists waiting approvals, and gate.approve(request_id) / gate.deny(request_id) respond to them.

from water.agents import create_approval_task, ApprovalGate, ApprovalPolicy, RiskLevel

gate = ApprovalGate(policy=ApprovalPolicy(
    auto_approve_below=RiskLevel.MEDIUM,  # auto-approve LOW
    timeout=300.0,                         # 5 min timeout
    timeout_action="deny",                 # deny if no response
))

staging_approval = create_approval_task(
    id="staging_gate",
    action_description="Deploy to staging",
    risk_level=RiskLevel.LOW,  # auto-approved
    gate=gate,
)

prod_approval = create_approval_task(
    id="prod_gate",
    action_description="Deploy to PRODUCTION",
    risk_level=RiskLevel.CRITICAL,  # requires manual approval
    gate=gate,
)

flow = Flow(id="deploy_pipeline", description="Staged deployment")
flow.then(test_task)\
    .then(staging_approval)\
    .then(staging_deploy)\
    .then(prod_approval)\
    .then(prod_deploy)\
    .register()

Human-in-the-Loop

While approval gates are binary (approve/deny), human-in-the-loop goes further — it pauses the flow and waits for a human to provide free-form input. This is useful when an agent generates a draft that needs human editing, when a classification is ambiguous and needs human judgment, or when you want a human to guide the next step of the workflow.

The HumanInputManager manages the communication channel. When the flow reaches a human task, it raises a HumanInputRequired signal with a prompt describing what’s needed. An external system (chat interface, web form, CLI) picks up the request, collects the human’s input, and feeds it back through the input manager. The flow then resumes with the human’s response as the task output.

from water.agents import create_human_task, HumanInputManager

input_manager = HumanInputManager()

review_task = create_human_task(
    id="human_review",
    description="Human reviews the generated content",
    input_manager=input_manager,
    prompt="Please review the following content and provide feedback:",
)

flow = Flow(id="content_review", description="Generate → Human Review → Publish")
flow.then(generate_task).then(review_task).then(publish_task).register()

# The flow pauses at review_task until a human responds via input_manager

Sandboxed Execution

Agents that write and execute code need isolation. You don’t want LLM-generated code to access the filesystem, make network calls, or consume unbounded resources on your server. Sandboxed execution runs code in an isolated environment with configurable time limits, memory caps, and output size restrictions.

The sandbox captures everything: stdout, stderr, the return value (via a special __result__ variable), and the exit code. If the code exceeds its time or memory budget, the sandbox kills it and returns the error.

Three backends offer different tradeoffs:

  • InMemorySandbox — uses Python’s exec() in the same process. Fastest, but no real isolation. Good for testing and trusted code.
  • SubprocessSandbox — spawns a separate Python process with timeout enforcement. Moderate isolation — the code runs in its own process but shares the OS.
  • DockerSandbox — runs code inside a Docker container with cgroup resource limits. Full isolation. Requires Docker to be installed.
from water.agents import create_sandboxed_task, SandboxConfig

sandbox_task = create_sandboxed_task(
    id="run_user_code",
    description="Execute code safely",
    sandbox=SubprocessSandbox(),  # also: InMemorySandbox(), DockerSandbox()
    config=SandboxConfig(
        timeout=10.0,          # 10 second timeout
        max_memory_mb=128,     # 128 MB memory limit
        max_output_size=5000,  # 5000 char output limit
    ),
)

flow = Flow(id="code_runner", description="Run code safely")
flow.then(sandbox_task).register()

result = await flow.run({"code": """
import math
values = [math.sqrt(i) for i in range(10)]
print("Square roots:", values)
__result__ = sum(values)
"""})

print(result["stdout"])        # "Square roots: [0.0, 1.0, 1.414...]"
print(result["return_value"])  # 16.68...
print(result["exit_code"])     # 0

Agentic Loop (ReAct)

The model controls the loop. create_agentic_task runs a Think-Act-Observe-Repeat cycle where the LLM decides which tools to call and when to stop — no hardcoded steps.

from water.agents import create_agentic_task, Tool

search = Tool(name="search", description="Search the web",
    input_schema={"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
    execute=lambda query: f"Results for {query}")

agent = create_agentic_task(
    id="researcher",
    provider=OpenAIProvider(model="gpt-4o"),
    tools=[search],
    system_prompt="You are a research assistant.",
    max_iterations=10,
    stop_tool=True,  # Inject a __done__ tool for explicit stop signaling
)

The loop supports callbacks for full observability:

agent = create_agentic_task(
    id="traced-agent",
    provider=provider,
    tools=tools,
    system_prompt="You are a helpful assistant.",
    max_iterations=10,
    # Step trace — fires after each Think-Act-Observe cycle
    on_step=lambda iteration, step: print(f"Step {iteration}: {step['think'][:50]}"),
    # Tool gating — approve, reject, or modify tool calls
    on_tool_call=lambda name, args: False if name == "dangerous_tool" else True,
    # Custom stop — stop early based on accumulated state
    stop_condition=lambda steps, history: len(history) >= 5,
    # Customize how tool results are shown to the LLM
    observation_formatter=lambda name, args, result: f"{name}: {result}",
)

Sub-Agent Isolation

Create child agents that run their own isolated ReAct loops. Each sub-agent has its own context window, tools, and conversation history — they appear as regular tools to the parent agent.

from water.agents import SubAgentConfig, create_sub_agent_tool

researcher = create_sub_agent_tool(SubAgentConfig(
    id="researcher",
    provider=OpenAIProvider(model="gpt-4o"),
    tools=[search_tool, read_file_tool],
    system_prompt="You are a research specialist. Search and summarize findings.",
    max_iterations=5,
))

# The parent agent uses the sub-agent as a regular tool
parent = create_agentic_task(
    id="orchestrator",
    provider=provider,
    tools=[researcher, write_file_tool, run_tests_tool],
    system_prompt="You are a coding assistant. Delegate research to your researcher.",
    max_iterations=10,
)

Layered Memory

Priority-ordered memory hierarchy (ORG > PROJECT > USER > SESSION > AUTO_LEARNED) that agents can read from and write to. Higher layers override lower ones.

from water.agents import MemoryManager, MemoryLayer, InMemoryBackend, create_memory_tools

memory = MemoryManager()

# Pre-load organizational and project context
await memory.add("code_style", "Use type hints everywhere", MemoryLayer.ORG)
await memory.add("stack", "Python 3.12, FastAPI, pytest", MemoryLayer.PROJECT)

# Same key in different layers — ORG wins
await memory.add("timeout", "30s", MemoryLayer.ORG)
await memory.add("timeout", "5s", MemoryLayer.SESSION)
entry = await memory.get("timeout")  # returns "30s" (ORG has priority)

# Inject memory into the system prompt
system_prompt = "You are a coding assistant.\n\n" + memory.to_system_prompt()

# Give agents tools to manage their own memory
memory_tools = create_memory_tools(memory)  # memory_store, memory_recall, memory_list

Supports InMemoryBackend and FileBackend for persistence. Entries can have TTLs for automatic expiration.

When an agent has many tools, sending all schemas to the LLM wastes context and hurts tool selection. SemanticToolSelector uses TF-IDF cosine similarity to pick the most relevant tools per reasoning step.

from water.agents import create_tool_selector

# 20+ tools available
selector = create_tool_selector(
    tools=all_tools,
    top_k=5,                                # Pick 5 most relevant per query
    always_include=["memory_recall", "bash"],  # Always available
)

# Use with the agentic loop — tools are narrowed automatically each iteration
agent = create_agentic_task(
    id="smart-agent",
    provider=provider,
    tools=all_tools,
    tool_selector=selector,  # Per-iteration dynamic tool selection
    system_prompt="You are a coding assistant.",
    max_iterations=10,
)

No external dependencies — pure Python TF-IDF implementation.

Guardrails

LLMs are unpredictable. They can leak PII, hallucinate JSON that doesn’t match your schema, go off-topic, generate toxic content, or blow through your token budget. Guardrails are validation checks that run on agent outputs before they reach users or downstream systems. Each guardrail returns a GuardrailResult with a pass/fail status, a reason, and optional details.

Guardrails support four actions when they fail:

  • block — raises a GuardrailViolation exception, halting the flow
  • warn — logs a warning but allows the output through
  • retry — signals that the agent should try again (pair with RetryWithFeedback)
  • fallback — triggers a fallback handler

Guardrail Chain

In practice you’ll want multiple guardrails running together — check for PII, validate the schema, verify the topic, and enforce cost limits all in one pass. GuardrailChain composes multiple guardrails into a pipeline. Each guardrail runs independently against the same data, and you get back a list of results so you can see exactly which checks passed and which failed.

from water.guardrails import GuardrailChain, ContentFilter, SchemaGuardrail, CostGuardrail

chain = GuardrailChain()
chain.add(ContentFilter(block_pii=True, block_injection=True))
chain.add(SchemaGuardrail(schema=OutputModel))
chain.add(CostGuardrail(max_tokens=4000))

results = chain.check(output_data)
for r in results:
    if not r.passed:
        print(f"[{r.guardrail_name}] FAILED: {r.reason}")

Content Filter

The content filter scans agent output for sensitive patterns. It detects PII including email addresses, phone numbers, Social Security numbers, and credit card numbers using regex patterns. It detects prompt injection attempts — patterns where the user or the model tries to override the system prompt. And it checks against a configurable profanity word list. When any of these are detected, the guardrail fails with a descriptive reason telling you exactly what was found.

from water.guardrails import ContentFilter

guard = ContentFilter(
    block_pii=True,          # detect emails, phones, SSNs, credit cards
    block_injection=True,    # detect prompt injection attempts
    block_profanity=True,    # custom word list
    action="block",          # "block", "warn", "retry", "fallback"
)

result = guard.check({"response": "Contact me at john@example.com"})
# result.passed = False, result.reason = "PII detected: email"

Schema Guardrail

LLMs often need to produce structured output — JSON that conforms to a specific schema. But LLMs don’t always get the structure right. They might omit required fields, use wrong types, or wrap the JSON in markdown code fences. The schema guardrail validates the agent’s output against a Pydantic model. In strict mode, it rejects any extra fields; in non-strict mode, it allows additional fields. It also handles common LLM quirks like wrapping JSON in ```json code blocks.

from water.guardrails import SchemaGuardrail
from pydantic import BaseModel

class ExpectedOutput(BaseModel):
    answer: str
    confidence: float

guard = SchemaGuardrail(schema=ExpectedOutput, strict=True)

result = guard.check({"response": '{"answer": "42", "confidence": 0.95}'})
# result.passed = True — valid JSON matching the schema

Cost Guardrail

LLM calls cost money, and a buggy loop or an unexpectedly verbose response can burn through your budget fast. The cost guardrail tracks cumulative token usage and cost across tasks, failing when either exceeds your threshold. Set a max_tokens limit to catch runaway generation, and a max_cost_usd limit to enforce a dollar budget. The guardrail tracks totals across multiple checks, so it catches gradual budget creep as well as single large responses.

from water.guardrails import CostGuardrail

guard = CostGuardrail(
    max_tokens=8000,
    max_cost_usd=0.50,
    cost_per_1k_tokens=0.03,
)

result = guard.check({"tokens_used": 10000})
# result.passed = False — exceeds token limit

Topic Guardrail

If your agent is supposed to answer questions about Python programming, you don’t want it giving financial advice or discussing politics. The topic guardrail checks the agent’s response for the presence of allowed topics and the absence of blocked topics using keyword matching. You can specify an allowlist (response must mention at least one), a blocklist (response must not mention any), or both.

from water.guardrails import TopicGuardrail

guard = TopicGuardrail(
    allowed_topics=["python", "programming", "data science"],
    blocked_topics=["politics", "religion"],
)

result = guard.check({"response": "Let me explain Python decorators..."})
# result.passed = True

Retry with Feedback

When a guardrail fails, the naive approach is to just retry the same prompt and hope the LLM gets it right the second time. RetryWithFeedback does something smarter — it tells the LLM why its previous response was rejected. On each retry, it formats the guardrail violation reasons into a feedback message using your template and appends it to the agent’s input. This means the LLM knows “your response contained PII” or “the JSON was missing the ‘confidence’ field” and can correct the specific issue.

The retry strategy supports configurable max retries, backoff between attempts, and a callback for custom retry logic. If all retries are exhausted, the result includes a __retry_exhausted flag and the list of violations so you can handle the failure gracefully.

from water.guardrails import RetryWithFeedback, GuardrailChain, SchemaGuardrail

retry = RetryWithFeedback(
    max_retries=3,
    feedback_template="Your response failed validation: {violations}. Please fix and try again.",
    backoff_factor=1.0,
)

result = await retry.execute_with_retry(
    execute_fn=agent_execute,
    check_fn=lambda output: guardrail_chain.check(output),
    params={"input_data": {"question": "What is 2+2?"}},
    context=None,
)

# On each retry, the agent receives feedback about why its previous response was rejected
# result contains the first output that passes all guardrails

Evaluation

You can’t improve what you can’t measure. The eval module lets you define test cases with expected outputs, run them against your flow, and score the results using multiple evaluators. This is how you catch regressions, compare prompt changes, benchmark different models, and build confidence that your agent actually works before shipping it to production.

Eval Suite

An EvalSuite ties together a flow, a list of evaluators, and a list of test cases. When you run it, the suite executes the flow for each test case, passes the output and expected result to every evaluator, and produces an EvalReport with per-case and per-evaluator scores.

Four evaluators are built in:

  • ExactMatch — checks if a specific key in the output exactly matches the expected value. Binary pass/fail.
  • ContainsMatch — checks if the output contains specific substrings or keys. Useful for verifying that the response mentions certain concepts.
  • LLMJudge — uses another LLM to score the output against a rubric you define. Returns a normalized score from 0.0 to 1.0. Use this for subjective quality assessments that can’t be captured by string matching.
  • SemanticSimilarity — computes token-level Jaccard similarity between the output and expected value, with a configurable threshold. Useful when the exact wording doesn’t matter but the meaning should be close.
from water.eval import EvalSuite, EvalCase, ExactMatch, ContainsMatch, LLMJudge, SemanticSimilarity

suite = EvalSuite(
    name="math_agent_eval",
    flow=math_flow,
    evaluators=[
        ExactMatch(key="answer"),                           # exact string match
        ContainsMatch(substrings=["because", "therefore"]), # check for reasoning
        LLMJudge(                                           # LLM-based scoring
            provider=OpenAIProvider(model="gpt-4o"),
            rubric="Is the answer mathematically correct and well-explained?",
            scale=5,
        ),
        SemanticSimilarity(key="answer", threshold=0.7),   # token-level similarity
    ],
    cases=[
        EvalCase(input={"question": "What is 2+2?"}, expected={"answer": "4"}, name="simple_add"),
        EvalCase(input={"question": "What is 15% of 200?"}, expected={"answer": "30"}, name="percentage"),
    ],
)

report = await suite.run()
print(f"Pass rate: {report.pass_rate:.0%}")
for case_result in report.results:
    print(f"  {case_result.case.name}: {case_result.passed}")

Eval CLI

For CI/CD integration and quick iteration, the eval CLI lets you define eval suites in YAML or JSON config files and run them from the command line. The config specifies the flow to evaluate (as a module:variable path), the evaluators to use, and the test cases. The compare command diffs two eval runs to show regressions and improvements, which is useful when you’re testing prompt changes or model upgrades.

# eval_config.yaml
suite_name: math_eval
flow_spec: my_app.flows:math_flow
evaluators:
  - type: exact_match
    key: answer
  - type: llm_judge
    rubric: "Is the answer correct?"
cases:
  - input: {question: "2+2"}
    expected: {answer: "4"}
  - input: {question: "3*5"}
    expected: {answer: "15"}
# Run an eval suite
water eval run eval_config.yaml

# Compare two eval runs
water eval compare run_001.json run_002.json

# List available eval configs
water eval list ./evals/

Resilience

Production agent systems talk to external APIs, LLM providers, databases, and third-party services. Any of these can fail, slow down, or return errors. Water provides battle-tested resilience patterns so your flows degrade gracefully instead of crashing. These patterns work at the task level (retry, timeout, circuit breaker) and at the flow level (checkpointing, dead-letter queues, caching).

Retry & Timeout

The most basic resilience pattern. Any task can be configured with automatic retries and timeouts. If a task fails, the engine waits retry_delay seconds and tries again, up to retry_count times. The retry_backoff multiplier increases the delay exponentially between retries (1s, 2s, 4s, …) to avoid hammering a recovering service. The timeout kills any single attempt that takes too long, preventing a slow API from blocking your entire flow.

api_task = create_task(
    id="call_api",
    description="Call external API",
    input_schema=APIRequest,
    output_schema=APIResponse,
    execute=call_api,
    retry_count=3,         # retry up to 3 times
    retry_delay=1.0,       # 1 second between retries
    retry_backoff=2.0,     # exponential backoff multiplier
    timeout=30.0,          # 30 second timeout per attempt
)

Circuit Breaker

Retries help with transient failures, but if a service is down, retrying just adds load and slows everything down. A circuit breaker tracks consecutive failures and “opens” when the threshold is reached, immediately rejecting subsequent calls without even attempting them. After a recovery timeout, it moves to a “half-open” state and lets one request through to test if the service has recovered. If it succeeds, the breaker closes and traffic resumes normally. If it fails, the breaker opens again.

This prevents cascading failures — if your LLM provider is down, your flow fails fast instead of queueing up hundreds of timeout-bound requests.

from water.resilience import CircuitBreaker, CircuitBreakerOpen

breaker = CircuitBreaker(
    failure_threshold=3,    # open after 3 consecutive failures
    recovery_timeout=30.0,  # try again after 30 seconds
)

# Wrap calls with the circuit breaker
async with breaker:
    result = await call_external_service()

# Or use as a task-level config
task = create_task(
    id="api_call",
    execute=call_api,
    circuit_breaker=breaker,
)

States: Closed (normal) → Open (rejecting calls) → Half-Open (testing recovery).

Rate Limiter

APIs have rate limits. If you exceed them, you get 429 errors, temporary bans, or degraded service. The rate limiter controls how many calls your flow makes within a time window. When the limit is reached, subsequent calls block until a slot opens up. This is simpler and more reliable than handling 429 errors after the fact.

from water.resilience import RateLimiter

limiter = RateLimiter(max_calls=100, period=60)  # 100 calls per minute

async with limiter:
    result = await call_rate_limited_api()

Provider Rate Limiting

LLM providers enforce two kinds of limits: requests per minute (RPM) and tokens per minute (TPM). Different models have different limits — GPT-4o might allow 60 RPM while GPT-4o-mini allows 500. ProviderRateLimiter tracks both dimensions per model and blocks acquire() calls when either limit would be exceeded.

It also supports adaptive throttling: when you get a 429 response with a Retry-After header, call record_retry_after() and the limiter will respect that backoff for the specified model. Metrics tracking tells you how many requests were throttled and the average wait time, helping you tune your limits.

from water.resilience import ProviderRateLimiter, ProviderLimits

limiter = ProviderRateLimiter(
    limits={
        "gpt-4o": {"rpm": 60, "tpm": 150_000},
        "claude-sonnet-4-20250514": {"rpm": 40, "tpm": 100_000},
    },
    default_limits=ProviderLimits(rpm=30, tpm=50_000),
    respect_retry_after=True,  # honor 429 Retry-After headers
)

# Acquire a slot before making an API call (blocks if rate limited)
wait_time = await limiter.acquire("gpt-4o", estimated_tokens=500)

# Record 429 responses for adaptive throttling
limiter.record_retry_after("gpt-4o", seconds=30)

# Check metrics
metrics = limiter.get_metrics("gpt-4o")
print(f"Throttled: {metrics['throttled_requests']} times, avg wait: {metrics['avg_wait_time']:.2f}s")

Flow Caching

If the same input always produces the same output (deterministic flows, or flows where you’re OK with stale results), caching the entire flow result saves you from re-executing all the tasks. FlowCache stores results keyed by a hash of the input data, with configurable TTL and size limits. The next time the same input arrives, it returns the cached result instantly.

This is particularly effective for agent flows that make expensive LLM calls — if a user asks the same question twice within the TTL window, the second response is free. The CacheStats object tracks hit/miss rates so you can monitor cache effectiveness.

from water.resilience import FlowCache, InMemoryFlowCache

cache = FlowCache(
    backend=InMemoryFlowCache(),
    ttl=600,          # cache for 10 minutes
    max_size=1000,    # max cached entries
)

# Check cache before running
cached = cache.get({"query": "What is Python?"})
if cached:
    return cached

result = await flow.run({"query": "What is Python?"})
cache.set({"query": "What is Python?"}, result)

# Cache statistics
stats = cache.get_stats()
print(f"Hit rate: {stats.hit_rate:.0%} ({stats.hits} hits, {stats.misses} misses)")

Checkpointing

Long-running flows (ETL pipelines, multi-step agent workflows) can fail partway through. Without checkpointing, you’d have to re-execute every task from the beginning. Checkpointing saves the output of each completed task to a persistent store. If the flow crashes and restarts, the engine detects the checkpoint, skips already-completed tasks, and resumes from where it left off.

This is essential for flows that take minutes or hours to complete, where re-executing expensive API calls or LLM tasks wastes both time and money.

from water.resilience import InMemoryCheckpoint

checkpoint = InMemoryCheckpoint()

flow = Flow(id="etl_pipeline", description="Recoverable ETL")
flow.checkpoint = checkpoint
flow.then(extract_task).then(transform_task).then(load_task).register()

# If the flow crashes during transform_task, the next run skips extract_task
# and resumes from transform_task using the checkpointed data.
result = await flow.run({"records": 1000})

Dead-Letter Queue

When a task fails after all retries are exhausted, the data that caused the failure is important — you need it to debug the issue, fix the bug, and reprocess the failed items. A dead-letter queue (DLQ) captures these failures with the full context: the task ID, the input data, the error message, and the timestamp. You can inspect the DLQ to understand failure patterns, and replay individual items once the underlying issue is fixed.

from water.resilience import InMemoryDLQ

dlq = InMemoryDLQ()

flow = Flow(id="processing", description="With DLQ")
flow.dlq = dlq
flow.then(risky_task).register()

# After a failure, inspect the DLQ
for letter in dlq.list():
    print(f"Failed: {letter.task_id}, error: {letter.error}")
    # Optionally replay: await dlq.replay(letter.id, flow)

Task Caching

Similar to flow caching but at the individual task level. If a specific task is expensive (an LLM call, a database query, an API request) and produces the same output for the same input, caching its result avoids redundant work. The TTL controls how long cached results are valid. This is useful when the same task appears in multiple flows or when a flow is re-executed with partially overlapping inputs.

from water.resilience import InMemoryCache

cache = InMemoryCache(ttl=300)  # cache for 5 minutes

task = create_task(
    id="expensive_lookup",
    execute=lookup_fn,
    cache=cache,
)

Integrations

MCP (Model Context Protocol)

MCP is an emerging standard for AI tool interop — think of it as the “USB-C of AI agents.” Any agent that speaks MCP can discover and use tools from any MCP server, regardless of the underlying framework. Water supports MCP in both directions: you can expose your flows as MCP tools that other agents can call, and you can consume tools from external MCP servers as tasks in your flows.

When you expose flows via MCPServer, each flow becomes a tool with its input schema automatically derived from the flow’s first task. External agents can discover your tools via the tools/list endpoint and call them via tools/call, both using JSON-RPC 2.0.

When you consume external tools via MCPClient, each tool becomes a Water task that you can chain into flows like any other task.

from water.integrations import MCPServer

# Any flow becomes an MCP-callable tool
server = MCPServer(flows=[text_analysis_flow, weather_flow])
# Serves JSON-RPC 2.0 endpoints: tools/list, tools/call
from water.integrations import MCPClient, create_mcp_task

client = MCPClient(server_url="http://localhost:3000")
search_task = create_mcp_task(
    id="web_search",
    client=client,
    tool_name="search",
)

flow = Flow(id="research", description="Research pipeline")
flow.then(search_task).then(summarize_task).register()

A2A (Agent-to-Agent Protocol)

Google’s Agent-to-Agent protocol is a standard for agents to discover each other and delegate tasks across organizational boundaries. Unlike MCP (which is about tools), A2A is about entire agents — an agent publishes an “agent card” describing its capabilities, and other agents can discover it and send it tasks.

Water’s A2AServer turns any flow into an A2A-compatible agent. It serves the agent card at /.well-known/agent.json (the standard discovery endpoint) and handles task submission, status tracking, and cancellation via JSON-RPC 2.0 at /a2a. The A2AClient lets your flows delegate work to remote A2A agents — discover their capabilities, send tasks, poll for results, or cancel pending work.

from water.integrations import A2AServer, AgentSkill

server = A2AServer(
    flow=research_flow,
    name="Research Agent",
    description="Researches topics and returns summaries",
    url="https://my-agent.example.com",
    skills=[
        AgentSkill(
            id="research",
            name="Web Research",
            description="Research any topic",
            tags=["research", "summarization"],
        ),
    ],
)

# Add to a FastAPI app
server.add_routes(app)
# Serves:
#   GET  /.well-known/agent.json  — agent card (discovery)
#   POST /a2a                     — JSON-RPC 2.0 (tasks/send, tasks/get, tasks/cancel)
from water.integrations import A2AClient, create_a2a_task

client = A2AClient(agent_url="https://remote-agent.example.com")

# Discover the agent's capabilities
card = await client.discover()
print(f"Agent: {card['name']}, Skills: {[s['name'] for s in card['skills']]}")

# Send a task
task = await client.send_task(input_data={"topic": "quantum computing"})
print(f"Status: {task.state}, Result: {task.result}")

# Or use as a flow task
a2a_task = create_a2a_task(
    id="call_research_agent",
    client=client,
    description="Delegate research to remote agent",
)

flow = Flow(id="delegated_research")
flow.then(a2a_task).then(format_task).register()

Chat Adapters

Chat adapters connect your flows to messaging platforms so users can interact with your agents through Slack, Discord, or Telegram. The ChatBot class routes incoming messages to the appropriate flow based on configurable routing rules, executes the flow with the message as input, and sends the response back through the same channel. Each adapter handles the platform-specific authentication, message formatting, and event handling.

from water.integrations import ChatBot, SlackAdapter, DiscordAdapter, TelegramAdapter

# Slack
bot = ChatBot(
    adapter=SlackAdapter(token="xoxb-..."),
    flows=[support_flow, faq_flow],
)

# Discord
bot = ChatBot(
    adapter=DiscordAdapter(token="..."),
    flows=[moderation_flow],
)

# Telegram
bot = ChatBot(
    adapter=TelegramAdapter(token="..."),
    flows=[assistant_flow],
)

SSE Streaming

When a flow has multiple tasks, users want to know what’s happening — which task is running, which completed, and whether there were errors. SSE (Server-Sent Events) streaming pushes these execution events to the client in real time over a persistent HTTP connection. Each event includes the event type (task_started, task_completed, task_failed), the task ID, timestamps, and any relevant data.

You can consume events programmatically via StreamingFlow.run_and_stream(), or add SSE endpoints to your FastAPI app with add_streaming_routes() for browser and frontend consumption.

from water.integrations import StreamManager, StreamingFlow, add_streaming_routes

stream_manager = StreamManager()
streaming_flow = StreamingFlow(flow, stream_manager)

# Get result + events in one call
result, events = await streaming_flow.run_and_stream({"value": 10})

for event in events:
    print(f"[{event.event_type}] task={event.task_id}")

# Or add SSE endpoints to a FastAPI app
from fastapi import FastAPI
app = FastAPI()
add_streaming_routes(app, stream_manager)
# GET /api/stream — stream all events
# GET /api/stream/{execution_id} — stream events for one execution

Event Triggers

Flows don’t always start from a manual flow.run() call. In production, flows are triggered by external events — a GitHub webhook fires when code is pushed, a cron job generates a daily report, or a message arrives in a queue from another service. Event triggers start flows automatically in response to these external events.

Webhook Trigger — listens for incoming HTTP requests. When a request arrives, the trigger optionally verifies an HMAC-SHA256 signature (to ensure the request came from a trusted source), transforms the payload, and fires the flow. Use this for GitHub/GitLab webhooks, Stripe payment notifications, or any system that sends HTTP callbacks.

from water.triggers import WebhookTrigger, TriggerRegistry

webhook = WebhookTrigger(
    flow_name="deploy_flow",
    path="/hooks/deploy",
    secret="my-shared-secret",  # HMAC-SHA256 signature verification
    methods=["POST"],
)

# Verify and handle incoming requests
event = webhook.handle_request(
    payload={"repo": "my-app", "branch": "main"},
    signature=request.headers["X-Signature"],
    raw_body=request.body,
)

Cron Trigger — runs flows on a schedule using standard cron syntax (minute hour day month weekday). The trigger checks every minute whether the current time matches the schedule, and fires the flow with the configured input data. Use this for daily reports, periodic data syncs, or scheduled maintenance tasks.

from water.triggers import CronTrigger

# Run every weekday at 9 AM
cron = CronTrigger(
    flow_name="daily_report",
    schedule="0 9 * * 1-5",
    input_data={"report_type": "daily"},
    timezone="America/New_York",
)

await cron.start(callback=lambda event: flow.run(event.payload))

Queue Trigger — processes messages from an in-memory async queue. Other parts of your system push messages into the queue, and the trigger consumes them one at a time, firing the flow for each message. Use this to decouple producers (API endpoints, background jobs) from consumers (your agent flows).

from water.triggers import QueueTrigger

queue = QueueTrigger(flow_name="process_orders", max_size=1000)

# Push messages (from an API endpoint, background job, etc.)
queue.push({"order_id": "12345", "items": [...]})

# Consume messages
await queue.start(callback=lambda event: order_flow.run(event.payload))
print(f"Pending messages: {queue.pending}")

Trigger Registry — manages all your triggers in one place, with start_all() and stop_all() for clean startup and shutdown:

from water.triggers import TriggerRegistry

registry = TriggerRegistry()
registry.register(webhook)
registry.register(cron)
registry.register(queue)

await registry.start_all()  # start all triggers
await registry.stop_all()   # graceful shutdown

Observability

When something goes wrong in a multi-step agent pipeline, you need to know what happened, where it happened, and why. Water’s observability module gives you distributed tracing, structured logging, cost tracking, and auto-instrumentation — all the tools you need to debug, monitor, and optimize your agent systems.

Telemetry

Water integrates with OpenTelemetry, the industry standard for distributed tracing and metrics. When enabled, every flow execution and task execution generates traces and spans that are exported to your configured backend (Jaeger, Zipkin, Datadog, etc.). This gives you a visual timeline of how your flow executed, how long each task took, and where bottlenecks or errors occurred.

from water.observability import TelemetryManager

telemetry = TelemetryManager(service_name="my-agent-service")
# Automatically instruments flow and task execution with spans and metrics

Dashboard

Every FlowServer includes a built-in observability dashboard at /dashboard. It shows flow execution history, success/failure rates, average execution times, and per-task details. Powered by data from your storage backend, it gives you a quick overview of system health without setting up external monitoring.

from water.observability import FlowDashboard

dashboard = FlowDashboard(storage=my_storage)
# Served at /dashboard when using FlowServer
# Shows flow execution history, success/failure rates, timing, and task details

Cost Tracking

LLM costs add up fast, especially with multiple agents, retries, and batch processing. CostTracker is a middleware that automatically tracks token usage and calculates costs for every task in your flow, using built-in pricing data for popular models (GPT-4o, GPT-4o-mini, Claude Sonnet, Claude Haiku, and more).

You can set a budget limit and choose whether to warn or stop when it’s exceeded. The get_summary() method returns a detailed breakdown of total cost, total tokens (input vs. output), and per-task costs. Use this to monitor spending, set per-flow budgets, and identify which tasks or models are costing the most.

from water.observability import CostTracker, TokenUsage

tracker = CostTracker(
    budget_limit=10.0,              # $10 budget
    on_budget_exceeded="warn",      # "warn" or "stop" (raises BudgetExceededError)
)

# Use as middleware — automatically tracks all tasks in the flow
flow.use(tracker)

result = await flow.run({"question": "Explain quantum computing"})

# Get cost summary
summary = tracker.get_summary()
print(summary.summary())
# Total cost: $0.0234
# Total tokens: 1,520 (input: 420, output: 1,100)
# Tasks: 3

# Record costs manually
tracker.record(
    task_id="my_task",
    model="gpt-4o",
    tokens=TokenUsage(input_tokens=100, output_tokens=500),
)

Structured Logging

Traditional print() debugging doesn’t scale. Structured logging outputs each log entry as a JSON object with consistent fields — timestamp, level, message, flow ID, execution ID, task ID, and any custom fields you add. This makes logs machine-parseable, searchable, and filterable.

The logger automatically injects context (which flow, which execution, which task) into every log entry, so you can trace a single request through the entire pipeline. Sensitive fields like passwords and API keys are automatically redacted. A sample rate parameter lets you reduce log volume in high-throughput environments without losing visibility.

from water.observability import StructuredLogger, LogExporter

logger = StructuredLogger(
    level="info",
    format="json",        # "json" or "text"
    redact_fields=["password", "api_key"],  # auto-redact sensitive fields
    sample_rate=1.0,      # log 100% of events (reduce for high-throughput)
)

logger.set_context(flow_id="my_flow", execution_id="exec_001")
logger.info("Processing started", user_id="u123", step="validation")
# {"timestamp": "...", "level": "info", "message": "Processing started",
#  "flow_id": "my_flow", "execution_id": "exec_001", "user_id": "u123", "step": "validation"}

# Export logs
exporter = LogExporter(destination="file", file_path="logs/app.jsonl")
exporter.export(logger.get_logs())

Auto-Instrumentation

Setting up tracing manually for every flow and task is tedious. Auto-instrumentation does it for you with a single function call. It wraps every task execution with timing spans, optionally captures inputs and outputs, and exports the data to your tracing backend. You get a complete execution trace showing how long each task took, what data flowed through it, and whether it succeeded or failed — without adding any instrumentation code to your tasks.

If OpenTelemetry is installed, spans are automatically exported to your configured backend (Jaeger, Zipkin, etc.). Without OTel, spans are collected in-memory by an InstrumentationCollector that you can query programmatically.

from water.observability import auto_instrument

# One-line setup — instruments all flow and task execution
instrumentor = auto_instrument(
    service_name="my-agent-service",
    capture_input=True,     # log task inputs
    capture_output=True,    # log task outputs
    sample_rate=1.0,
)

# Use as middleware
flow.use(instrumentor)

result = await flow.run({"query": "hello"})

# Inspect collected spans
collector = instrumentor.get_collector()
for span in collector.get_spans():
    print(f"{span.name}: {span.duration_ms:.1f}ms [{span.status}]")
    for child in span.children:
        print(f"  └─ {child.name}: {child.duration_ms:.1f}ms")

Execution Replay

When a production flow fails or produces unexpected output, you need to reproduce the issue. Execution replay loads a previous flow session from storage and re-executes it, using cached outputs for tasks you don’t want to re-run and live execution for the tasks you’re investigating. You can start replay from a specific task (skipping everything before it), override specific task inputs to test different scenarios, and skip tasks entirely.

This is invaluable for debugging — instead of trying to reproduce the exact conditions that caused a failure, you replay the exact execution with surgical modifications.

from water.core.replay import ReplayEngine, ReplayConfig

engine = ReplayEngine(storage=my_storage)

# Replay from a specific task (skip earlier tasks, use cached outputs)
result = await engine.replay(
    flow=my_flow,
    session_id="exec_abc123",
    config=ReplayConfig(
        from_task="transform",                         # start from this task
        override_inputs={"transform": {"mode": "v2"}}, # override specific inputs
        skip_tasks=["notify"],                         # skip tasks entirely
    ),
)

print(f"Replayed from: {result.replayed_from}")
print(f"Cached steps: {result.cached_steps}")
print(f"Re-executed steps: {result.re_executed_steps}")
print(f"Result: {result.result}")

Middleware & Lifecycle

Hooks

Hooks let you inject custom logic at specific points in the flow and task lifecycle without modifying your task code. Register callbacks for events like task start, task completion, task error, flow start, flow completion, and flow error. Callbacks can be sync or async — the hook manager handles both.

This is how you add cross-cutting concerns like logging, metrics, alerting, or audit trails. Instead of adding print() statements or monitoring code inside each task, you register a hook once and it fires for every task in the flow.

from water.middleware import HookManager

hooks = HookManager()

@hooks.on("on_task_start")
async def log_start(task_id, **kwargs):
    print(f"Starting task: {task_id}")

@hooks.on("on_task_error")
async def alert_error(task_id, error, **kwargs):
    send_alert(f"Task {task_id} failed: {error}")

flow.hooks = hooks

Supported events: on_task_start, on_task_complete, on_task_error, on_flow_start, on_flow_complete, on_flow_error.

Events

While hooks are fire-and-forget callbacks, the event system is a pub-sub model for real-time event streams. The EventEmitter pushes FlowEvent objects (with event type, flow ID, task ID, timestamp, and data) to all subscribers. Each subscriber gets its own async queue, so slow consumers don’t block the flow execution.

This is the foundation for SSE streaming, real-time dashboards, and any system that needs to react to flow events as they happen. Subscribe from a background task, a WebSocket handler, or any async context.

from water.middleware import EventEmitter, FlowEvent

emitter = EventEmitter()
flow.events = emitter

# Subscribe to events
subscription = emitter.subscribe()

# Consume events (e.g., in a background task)
async for event in subscription:
    print(f"[{event.event_type}] flow={event.flow_id} task={event.task_id}")

Plugins

As your usage of Water grows, you’ll want to extend it with custom storage backends, LLM providers, middleware, guardrails, and integrations. The plugin system provides a standardized way to package and distribute these extensions.

A plugin is a class that extends WaterPlugin with a name, plugin_type, and register() method. The register method receives the application context and hooks into Water’s registries. Plugins can be registered manually or discovered automatically via Python entry points — add your plugin to pyproject.toml under the water.plugins group, and the PluginRegistry.discover() method finds and loads it.

from water.plugins import PluginRegistry, WaterPlugin, PluginType

class MyStoragePlugin(WaterPlugin):
    name = "redis_storage"
    plugin_type = PluginType.STORAGE
    version = "1.0.0"

    def register(self, app):
        app.register_storage("redis", RedisStorage(...))

# Register manually
registry = PluginRegistry()
registry.register(MyStoragePlugin())

# Or discover plugins automatically via entry points
# In pyproject.toml: [project.entry-points."water.plugins"]
# my_plugin = "my_package:MyPlugin"
plugins = registry.discover()

Flow Versioning

As your flows evolve — new fields added, old fields removed, task schemas changed — you need to manage these changes without breaking existing data or running flows. Flow versioning lets you snapshot the schemas of a flow at a point in time, compare two versions to find breaking changes, and migrate data from one version to another.

snapshot_flow_schemas() captures the input/output schemas of every task in a flow. CompatibilityChecker compares two snapshots and identifies added fields, removed fields, type changes, and which changes are breaking (e.g., a required field was removed). SchemaRegistry stores versions and migration functions, so you can automatically upgrade data from v1 to v2 to v3 by chaining migrations.

from water import FlowVersion, SchemaRegistry, CompatibilityChecker, snapshot_flow_schemas

registry = SchemaRegistry()

# Snapshot current flow schemas
schemas_v1 = snapshot_flow_schemas(my_flow)
registry.register_version("my_flow", "1.0.0", schemas_v1)

# After making changes, check compatibility
schemas_v2 = snapshot_flow_schemas(my_flow_v2)
registry.register_version("my_flow", "2.0.0", schemas_v2)

changes = registry.check_compatibility("my_flow", "1.0.0", "2.0.0")
for change in changes:
    print(f"{'BREAKING' if change.breaking else 'compatible'}: {change.change_type} "
          f"{change.field_name} in {change.task_id} ({change.direction})")

# Register migrations between versions
registry.add_migration("my_flow", "1.0.0", "2.0.0",
    migrate_fn=lambda data: {**data, "new_field": "default"},
    description="Add new_field with default value",
)

# Migrate data
migrated = registry.migrate_data("my_flow", old_data, "1.0.0", "2.0.0")

Server & Playground

Turn your flows into a production REST API with a single class. FlowServer wraps all your registered flows in a FastAPI application with auto-generated endpoints for listing, inspecting, and executing flows. It also serves the observability dashboard and health check endpoint. The interactive Swagger docs at /docs let you test flows directly from the browser.

from water.server import FlowServer

server = FlowServer(flows=[registration_flow, notification_flow, retry_flow])
app = server.get_app()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("playground:app", host="0.0.0.0", port=8000, reload=True)

Endpoints:

Method Path Description
GET /flows List all registered flows
GET /flows/{flow_id} Flow details (tasks, schemas)
POST /flows/{flow_id}/run Execute a flow with input data
GET /health Health check
GET /dashboard Observability dashboard UI

Access the interactive docs at http://localhost:8000/docs to test your flows.

CLI

Water includes a CLI for running, testing, evaluating, and deploying flows without writing any server code. Point it at a Python module and flow variable, and it handles the import, execution, and output formatting.

# Run a flow from the terminal
water run cookbook.core.sequential_flow:registration_flow \
  --input '{"email": "a@b.com", "password": "secret123", "first_name": "Water"}'

# Generate a Mermaid diagram of a flow
water visualize cookbook.core.dag_flow:pipeline_flow

# Validate a flow without executing (checks schemas, dependencies)
water dry-run cookbook.core.sequential_flow:registration_flow \
  --input '{"email": "a@b.com", "password": "secret123", "first_name": "Water"}'

# List all flows in a module
water list cookbook.core.sequential_flow

# Run evaluations
water eval run eval_config.yaml
water eval compare run_001.json run_002.json
water eval list ./evals/

# Deploy to Render
water flow prod:render --app playground

Architecture

Water is organized into 14 subpackages with clean separation of concerns:

water/
├── __init__.py          # Top-level re-exports (~200 public APIs)
├── core/                # Flow, Task, ExecutionEngine, Context, SubFlow, Replay, Versioning
├── agents/              # LLM tasks, streaming, multi-agent, tools, context, prompts,
│                        #   fallback, batch, planner, approval, human-in-the-loop, sandbox
├── guardrails/          # Content filter, schema, cost, topic guardrails, retry-with-feedback
├── eval/                # EvalSuite, evaluators (ExactMatch, LLMJudge, etc.), CLI, config
├── storage/             # InMemory, SQLite, Redis, Postgres backends
├── resilience/          # Circuit breaker, rate limiter, cache, checkpoint, DLQ,
│                        #   flow cache, provider rate limiter
├── middleware/           # Middleware, hooks, events
├── integrations/        # MCP, A2A protocol, chat adapters, SSE streaming
├── triggers/            # Webhook, cron, queue triggers with registry
├── observability/       # Telemetry, dashboard, cost tracking, structured logging,
│                        #   auto-instrumentation
├── plugins/             # Plugin registry with entry-point discovery
├── server/              # FlowServer (FastAPI)
├── tasks/               # Built-in task library (HTTP, JSON transform, file I/O, etc.)
└── utils/               # Testing, scheduler, declarative loader, secrets, CLI

Everything is importable from the top-level water package for convenience:

from water import Flow, create_task, FlowServer, CircuitBreaker, SubFlow, compose_flows

Or import from subpackages for explicit control:

from water.core import Flow, create_task, SubFlow, compose_flows
from water.resilience import CircuitBreaker, FlowCache, ProviderRateLimiter
from water.agents import create_agent_task, OpenAIProvider, FallbackChain, BatchProcessor
from water.guardrails import GuardrailChain, ContentFilter, RetryWithFeedback
from water.eval import EvalSuite, ExactMatch, LLMJudge
from water.triggers import WebhookTrigger, CronTrigger, TriggerRegistry
from water.observability import CostTracker, auto_instrument, StructuredLogger
from water.integrations import MCPServer, A2AServer, StreamingFlow

Examples

The cookbook has 73 runnable examples organized by category:

Core (cookbook/core/)

Example What it shows
sequential_flow.py .then() chaining
parallel_flow.py Concurrent task execution
branched_flow.py Conditional routing
loop_flow.py Repeat-until-done loops
map_flow.py Fan-out / fan-in processing
dag_flow.py Diamond dependency graphs
subflow_flow.py Nested flow composition
subflow_composition_flow.py SubFlow with input/output mapping
try_catch_flow.py Error handling with try-catch-finally
replay_flow.py Execution replay and debugging
versioning_flow.py Flow versioning and migration
validation_flow.py Schema validation
contracts_flow.py Input/output contracts

Agents (cookbook/agents/)

Example What it shows
agent_task_flow.py LLM-powered tasks
streaming_agent_flow.py Token-by-token streaming
multi_agent_flow.py Agent team coordination
tool_use_flow.py Tool calling and execution
fallback_chain_flow.py Provider failover strategies
prompt_template_flow.py Reusable prompt templates
batch_processing_flow.py Batch LLM processing
planner_flow.py Dynamic planning agent
approval_flow.py Human approval gates
human_in_the_loop_flow.py Pause & wait for human input
sandbox_flow.py Sandboxed code execution
agentic_loop_flow.py ReAct loop (Think-Act-Observe-Repeat)
sub_agent_flow.py Sub-agent isolation
memory_flow.py Layered memory hierarchy
tool_search_flow.py Semantic tool search (TF-IDF)
guardrails_flow.py Output guardrails
guardrail_retry_flow.py Auto-retry with feedback
eval_flow.py Evaluation suite
eval_cli_flow.py CLI-based evaluation

Resilience (cookbook/resilience/)

Example What it shows
circuit_breaker_flow.py Circuit breaker pattern
rate_limit_flow.py Rate limiting
provider_rate_limit_flow.py Per-model RPM/TPM limits
flow_cache_flow.py Flow-level result caching
checkpoint_flow.py Crash recovery
dlq_flow.py Dead-letter queue
caching_flow.py Task output caching
retry_timeout_flow.py Retry & timeout config

Observability (cookbook/observability/)

Example What it shows
cost_tracking_flow.py Token cost tracking & budgets
auto_instrument_flow.py Zero-code instrumentation
structured_logging_flow.py JSON structured logging
trace_flow.py Distributed tracing
telemetry_flow.py OpenTelemetry integration
dashboard_flow.py Observability dashboard

Integrations (cookbook/integrations/)

Example What it shows
mcp_flow.py MCP tool interop
a2a_flow.py Agent-to-Agent protocol
chat_flow.py Slack/Discord/Telegram bots
streaming_flow.py Real-time SSE events
trigger_flow.py Webhook, cron, queue triggers

Server & Utils (cookbook/server/, cookbook/utils/)

Example What it shows
playground.py Multi-flow REST server
deploy_flow.py Production deployment
testing_flow.py MockTask & FlowTestRunner
secrets_flow.py Secrets management
plugin_flow.py Plugin system
declarative_flow.py YAML/JSON flow definitions

Real-World (cookbook/real_world/)

Example What it shows
claude_code_agent.py Claude Code-style coding agent with sub-agents, memory, and tool search

You can support the project by starring the repo and sharing it with your friends. I am actively working on the project and would love to hear your feedback. You can reach out to me on X or LinkedIn or email me at guptaamanthan01@gmail.com.