Manthan

Water: A multi-agent orchestration framework

· Manthan Gupta

Building multi-agent systems with existing frameworks is painful. While OpenAI Swarm, Google ADK, LangChain, CrewAI, AutoGen, Agno, etc have orchestration capabilities, they are complex, verbose, and unintuitive. Simple workflows require dozens of lines of boilerplate code, and adding branching logic, retries, or conditional execution becomes a debugging nightmare.

Water makes multi-agent orchestration actually enjoyable. Instead of wrestling with framework specific APIs and verbose configurations, you define workflows with clean, intuitive abstractions. Complex branching logic, error handling, and loops that would normally require hours of debugging work seamlessly out of the box.

At the core of Water are these three components:

  1. Tasks: Tasks are the building blocks of your multi-agent system. They are the smallest unit of work that can be executed.
  2. Flows: Flows are the orchestration layer that coordinates the execution of tasks.
  3. Execution Engine: Execution engine is the entity that executes the tasks. It is responsible for executing the tasks in the right order and handling the errors.

Table of Contents

Quick Start

To use water, you need to install the library

pip install water-ai

Create Task

Task is the smallest unit of work that can be executed and the building block of Flows. Create a task using create_task function.

import requests
from water import create_task
from pydantic import BaseModel

# Define data schemas
class WeatherRequest(BaseModel):
    city: str

class WeatherResult(BaseModel):
    city: str
    temperature: float
    description: str

# Task: Fetch weather data
def get_weather(params, context):
    # context is the execution context that each task receives
    # it contains the flow id, task id, execution_id, step number, flow_metadata
    # you can access all the task outputs till now using .get_all_task_outputs()
    city = params["input_data"]["city"]

    url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=demo&units=metric"
    
    response = requests.get(url)
    data = response.json()
    
    return {
        "city": city,
        "temperature": data["main"]["temp"],
        "description": data["weather"][0]["description"]
    }

# Create task
weather_task = create_task(
    id="weather",
    description="Fetch current weather",
    input_schema=WeatherRequest,
    output_schema=WeatherResult,
    execute=get_weather
)

Create & Register Flow

Compose tasks into a flow using Flow class and then register the flow to the execution engine using register method.

from water import Flow

# Create and register flow
weather_flow = Flow(id="weather_check", description="Get weather for a city")
weather_flow.then(weather_task).register()

Run the Flow

Flow works asynchronously and returns a Future object. You can use await to get the result of the flow. Use the run method to run the flow.

import asyncio

async def main():
    result = await weather_flow.run({"city": "London"})
    print(f"{result['city']}: {result['temperature']}°C, {result['description']}")

if __name__ == "__main__":
    asyncio.run(main())

Run as a python script

python weather_flow.py

Control Flow

When you build a flow, you usually break it down into smaller tasks that can be chained into different flows. Tasks provide a structured definition to manage your flow by defining the input schema, output schema, and execution logic.

If the schemas match, the output schema from each task is automatically passed to the input schema of the next task. If they don’t match then an error is thrown.

Sequential Flow

Chain your tasks in sequence using .then()

from water import Flow, create_task

# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()

flow = Flow(id="sequential_flow", description="test sequential flow")
flow.then(task_1)\
    .then(task_2)\
    .then(task_3)\
    .then(task_4)\
    .register()

Parallel Flow

Run your tasks in parallel using .parallel()

from water import Flow, create_task

# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()

flow = Flow(id="parallel_flow", description="test parallel flow")
flow.parallel([task_1, task_2, task_3])\
    .then(task_4)\
    .register()

# The outputs from parallel tasks are passed to the next task
# {
#     "task_1": output from task_1,
#     "task_2": output from task_2,
#     "task_3": output from task_3
# }

Conditional Flow

Run conditional tasks using .branch()

from water import Flow, create_task

# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()

flow = Flow(id="conditional_flow", description="test conditional flow")
flow.branch(
    [condition_1, task_1],
    [condition_2, task_2],
    [condition_3, task_3]
    )\
    .then(task_4)\
    .register()

# The output from branch tasks is the first condition that is true

Loop Flow

Run a task in a loop using .loop()

from water import Flow, create_task

# Create tasks
task_1 = create_task()
task_2 = create_task()

flow = Flow(id="loop_flow", description="test loop flow")
flow.loop(condition, task_1)\
    .then(task_2)\
    .register()

Playground

You can serve your flow as a lightweight FastAPI async server.

from water import FlowServer
import uvicorn

# Create server with flows
app = FlowServer(flows=[flow_1, flow_2, flow_3]).get_app()

if __name__ == "__main__":
    uvicorn.run("playground:app", host="0.0.0.0", port=8000, reload=True)
    # playground here is the name of the file

You can access the playground at http://localhost:8000/docs to see the available endpoints and test your flows quickly.

Examples

You can find examples of flows in the cookbook directory.

Roadmap

  • Storage Layer
  • Human in the Loop
  • Retry Mechanism for individual tasks
  • Streaming response from tasks

You can support the project by starring the repo and sharing it with your friends. I am actively working on the project and would love to hear your feedback. You can reach out to me on X or LinkedIn or email me at guptaamanthan01@gmail.com.