Water: A multi-agent orchestration framework
Building multi-agent systems with existing frameworks is painful. While OpenAI Swarm, Google ADK, LangChain, CrewAI, AutoGen, Agno, etc have orchestration capabilities, they are complex, verbose, and unintuitive. Simple workflows require dozens of lines of boilerplate code, and adding branching logic, retries, or conditional execution becomes a debugging nightmare.
Water makes multi-agent orchestration actually enjoyable. Instead of wrestling with framework specific APIs and verbose configurations, you define workflows with clean, intuitive abstractions. Complex branching logic, error handling, and loops that would normally require hours of debugging work seamlessly out of the box.
At the core of Water are these three components:
- Tasks: Tasks are the building blocks of your multi-agent system. They are the smallest unit of work that can be executed.
- Flows: Flows are the orchestration layer that coordinates the execution of tasks.
- Execution Engine: Execution engine is the entity that executes the tasks. It is responsible for executing the tasks in the right order and handling the errors.
Table of Contents
Quick Start
To use water, you need to install the library
pip install water-ai
Create Task
Task is the smallest unit of work that can be executed and the building block of Flows. Create a task using create_task
function.
import requests
from water import create_task
from pydantic import BaseModel
# Define data schemas
class WeatherRequest(BaseModel):
city: str
class WeatherResult(BaseModel):
city: str
temperature: float
description: str
# Task: Fetch weather data
def get_weather(params, context):
# context is the execution context that each task receives
# it contains the flow id, task id, execution_id, step number, flow_metadata
# you can access all the task outputs till now using .get_all_task_outputs()
city = params["input_data"]["city"]
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid=demo&units=metric"
response = requests.get(url)
data = response.json()
return {
"city": city,
"temperature": data["main"]["temp"],
"description": data["weather"][0]["description"]
}
# Create task
weather_task = create_task(
id="weather",
description="Fetch current weather",
input_schema=WeatherRequest,
output_schema=WeatherResult,
execute=get_weather
)
Create & Register Flow
Compose tasks into a flow using Flow
class and then register the flow to the execution engine using register
method.
from water import Flow
# Create and register flow
weather_flow = Flow(id="weather_check", description="Get weather for a city")
weather_flow.then(weather_task).register()
Run the Flow
Flow works asynchronously and returns a Future
object. You can use await
to get the result of the flow. Use the run
method to run the flow.
import asyncio
async def main():
result = await weather_flow.run({"city": "London"})
print(f"{result['city']}: {result['temperature']}°C, {result['description']}")
if __name__ == "__main__":
asyncio.run(main())
Run as a python script
python weather_flow.py
Control Flow
When you build a flow, you usually break it down into smaller tasks that can be chained into different flows. Tasks provide a structured definition to manage your flow by defining the input schema, output schema, and execution logic.
If the schemas match, the output schema from each task is automatically passed to the input schema of the next task. If they don’t match then an error is thrown.
Sequential Flow
Chain your tasks in sequence using .then()
from water import Flow, create_task
# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()
flow = Flow(id="sequential_flow", description="test sequential flow")
flow.then(task_1)\
.then(task_2)\
.then(task_3)\
.then(task_4)\
.register()
Parallel Flow
Run your tasks in parallel using .parallel()
from water import Flow, create_task
# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()
flow = Flow(id="parallel_flow", description="test parallel flow")
flow.parallel([task_1, task_2, task_3])\
.then(task_4)\
.register()
# The outputs from parallel tasks are passed to the next task
# {
# "task_1": output from task_1,
# "task_2": output from task_2,
# "task_3": output from task_3
# }
Conditional Flow
Run conditional tasks using .branch()
from water import Flow, create_task
# Create tasks
task_1 = create_task()
task_2 = create_task()
task_3 = create_task()
task_4 = create_task()
flow = Flow(id="conditional_flow", description="test conditional flow")
flow.branch(
[condition_1, task_1],
[condition_2, task_2],
[condition_3, task_3]
)\
.then(task_4)\
.register()
# The output from branch tasks is the first condition that is true
Loop Flow
Run a task in a loop using .loop()
from water import Flow, create_task
# Create tasks
task_1 = create_task()
task_2 = create_task()
flow = Flow(id="loop_flow", description="test loop flow")
flow.loop(condition, task_1)\
.then(task_2)\
.register()
Playground
You can serve your flow as a lightweight FastAPI async server.
from water import FlowServer
import uvicorn
# Create server with flows
app = FlowServer(flows=[flow_1, flow_2, flow_3]).get_app()
if __name__ == "__main__":
uvicorn.run("playground:app", host="0.0.0.0", port=8000, reload=True)
# playground here is the name of the file
You can access the playground at http://localhost:8000/docs to see the available endpoints and test your flows quickly.
Examples
You can find examples of flows in the cookbook directory.
Roadmap
- Storage Layer
- Human in the Loop
- Retry Mechanism for individual tasks
- Streaming response from tasks
You can support the project by starring the repo and sharing it with your friends. I am actively working on the project and would love to hear your feedback. You can reach out to me on X or LinkedIn or email me at guptaamanthan01@gmail.com.