Write an Agent¶
This guide walks you through building a Foreman-compatible agent from scratch.
Agents are HTTP services that receive task nudges from the harness, claim the task from the queue, process it,
and report a decision back — all via ForemanClient.
Prerequisites¶
- Python 3.12+
- A running Foreman harness (see Installation)
uvorpipfor package management
Install foreman-client¶
foreman-client has two runtime dependencies: httpx and pydantic>=2.
The Three-Method API¶
ForemanClient exposes exactly three methods an agent needs.
ForemanClient(harness_url, agent_url)¶
| Argument | Type | Description |
|---|---|---|
harness_url |
str |
Base URL of the Foreman harness (e.g. "http://localhost:8000"). |
agent_url |
str |
This agent's own base URL (e.g. "http://localhost:9001"). Sent when claiming tasks so the harness knows which agent holds each claim. |
Use it as a context manager to ensure the HTTP connection pool is closed on exit:
with ForemanClient(harness_url="http://localhost:8000", agent_url="http://localhost:9001") as client:
...
next_task() → TaskMessage | None¶
Claims and returns the next pending task from the harness queue.
Returns None when the queue is empty (harness responds 204 No Content).
Raises ForemanClientError on any non-2xx response.
complete_task(task_id, decision)¶
Stores the completed DecisionMessage in the queue and wakes the harness drain loop.
Call this once per task, after all processing is done.
| Argument | Type | Description |
|---|---|---|
task_id |
str |
The task_id from the TaskMessage returned by next_task(). |
decision |
DecisionMessage |
Your agent's decision, rationale, and action list. |
Note: Always pass
decision.task_idas thetask_idargument. Passing a different value causes the nudge and the stored decision to reference different tasks; the harness will not raise an error, but the drain loop will not find the intended result.
from foremanclient import DecisionMessage, DecisionType
decision = DecisionMessage(
task_id=task.task_id,
decision=DecisionType.label_and_respond,
rationale="Classified as a bug based on the stack trace.",
actions=[{"type": "add_label", "label": "bug"}],
)
client.complete_task(task.task_id, decision)
heartbeat(task_id)¶
Extends the claim window for an in-progress task.
The harness defaults to a 300-second claim timeout (claim_timeout_seconds in QueueConfig).
If your agent hasn't called complete_task() within that window, the harness re-queues the task for another attempt.
Call heartbeat() at least once every 30 seconds during long LLM calls or any blocking work.
import threading
def _heartbeat_loop(client, task_id, stop_event):
while not stop_event.wait(timeout=25):
client.heartbeat(task_id)
stop = threading.Event()
t = threading.Thread(target=_heartbeat_loop, args=(client, task.task_id, stop), daemon=True)
t.start()
try:
decision = run_llm(task)
finally:
stop.set()
Idempotency¶
task_id is the idempotency key for every task.
The harness writes each decision to action_log before executing GitHub API calls, keyed on task_id.
If next_task() returns a task your agent has already completed
(for example, after an unclean restart), check your own records before processing again:
task = client.next_task()
if task and not already_processed(task.task_id):
decision = process(task)
client.complete_task(task.task_id, decision)
The simplest approach is to keep a short in-memory set of recently completed task_id values.
Across restarts, rely on the harness: if the decision is already in action_log, the executor skips duplicate actions.
Minimal Working Example¶
A complete, runnable agent in under 35 lines. The lifespan ensures the client is created once and that any tasks queued while the agent was down are claimed immediately on startup (see Startup Poll for why this matters):
import os
from contextlib import asynccontextmanager
from fastapi import BackgroundTasks, FastAPI
from foremanclient import DecisionMessage, DecisionType, ForemanClient
from pydantic import BaseModel
def _decide(task):
return DecisionMessage(
task_id=task.task_id, decision=DecisionType.skip, rationale="No action needed."
)
def _run(client):
task = client.next_task()
if task:
client.complete_task(task.task_id, _decide(task))
@asynccontextmanager
async def lifespan(app):
client = ForemanClient(os.environ["FOREMAN_HARNESS_URL"], os.environ["AGENT_URL"])
# Drain any tasks queued while the agent was down
while True:
task = client.next_task()
if task is None:
break
client.complete_task(task.task_id, _decide(task))
app.state.client = client
yield
client.close()
app = FastAPI(lifespan=lifespan)
class TaskNudge(BaseModel):
task_id: str
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/task", status_code=202)
async def handle_task(nudge: TaskNudge, background_tasks: BackgroundTasks):
background_tasks.add_task(_run, app.state.client)
return {"status": "accepted"}
Run it with:
FOREMAN_HARNESS_URL=http://localhost:8000 AGENT_URL=http://localhost:9001 uvicorn myagent:app --port 9001
Required Endpoints¶
Every agent must expose:
| Method | Path | Description |
|---|---|---|
POST |
/task |
Accept a nudge {"task_id": "..."} and return 202 Accepted. |
GET |
/health |
Health check. Must return 200 OK with {"status": "ok"}. |
The harness sends a POST /task nudge (body: {"task_id": "..."}) when a new task is enqueued.
The agent should return 202 immediately and process the task in a background thread or task.
Startup Poll¶
On startup, loop next_task() until it returns None to pick up all tasks that were enqueued
while your agent was down:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
client = ForemanClient(...)
while True:
task = client.next_task()
if task is None:
break
_decide_and_complete(task)
yield
client.close()
app = FastAPI(lifespan=lifespan)
A single next_task() call only claims one task — if N tasks accumulated while the agent was offline,
N-1 remain permanently stuck until the harness requeue cycle fires (up to claim_timeout_seconds later).
Looping until None is the correct pattern.
This is the key mechanism for zero task loss under agent restarts.
The harness re-queues stale claimed tasks after claim_timeout_seconds,
and the startup drain ensures your agent claims them immediately on boot.
Reference¶
See the Agent Protocol Reference for the full TaskMessage, DecisionMessage,
and ActionItem schemas.