Skip to main content

Loop

LoopTask is a single Flow unit that repeats a small sequence of steps until:

  • the stop condition returns True, or
  • it hits max_loops (default: 5).

This is the easiest way to do looping without creating cycles in the main Flow graph.

Key parameters

  • max_loops: maximum iterations (default 5)
  • stop_condition(iteration, last_output, last_type, memory): stop rule after each loop
  • store_history_memory_key: save per-iteration outputs into memory (optional)

Example

import asyncio
from intelli.flow.flow import Flow
from intelli.flow.tasks.task import Task
from intelli.flow.agents.agent import Agent
from intelli.flow.input.task_input import TextTaskInput
from intelli.flow.tasks.loop_task import LoopTask

# 1. Define a regular Agent and Task
agent = Agent(
agent_type="text",
provider="openai",
mission="Summarize and expand the input",
model_params={"key": "YOUR_API_KEY", "model": "gpt-4o"}
)
task = Task(TextTaskInput("Generate an expanded summary"), agent)

# 2. Define a simple stop condition function
def stop_if_long_enough(iteration, last_output, last_type, memory):
return len(last_output) > 1000

# 3. Create the Loop unit
loop = LoopTask(
desc="iterative refinement",
steps=[task],
max_loops=3,
stop_condition=stop_if_long_enough
)

# 4. Execute in a Flow
flow = Flow(tasks={"loop_step": loop}, map_paths={})
result = asyncio.run(flow.start(initial_input="AI is the future of..."))

print(result["loop_step"]["output"])