How to Design an OpenHarness Style Agent Runtime with Tools, Memory, Permissions, Skills, and Multi-Agent Coordination
AI
This tutorial builds an OpenHarness-style agent runtime from scratch, covering tools, memory, permissions, skills, and multi-agent coordination with runnable code.
Intelligence Insights
Context + impact, normalized for TechCulture.
The Big Picture
The article provides a step-by-step tutorial to construct an OpenHarness agent runtime, explaining core components like tool use with typed schemas, permission checking, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. It emphasizes transparency by exposing the full control flow from task reception to completion, using a virtual filesystem and mock LLM for safe experimentation. The implementation includes a PermissionChecker for sensitive operations, a CostMeter for token tracking, and a ToolRegistry for managing tools. The tutorial aims to demystify agent frameworks, making the architecture accessible without requiring API keys or complex infrastructure.
Why It Matters
This tutorial demystifies agent frameworks by building one from scratch, giving developers full control over tool use, permissions, memory, and multi-agent coordination. Understanding these internals is crucial as AI agents move from experimental demos to production systems that must be secure, auditable, and cost-efficient. By exposing the harness's control flow, the article empowers teams to customize and debug agent behavior rather than relying on opaque black-box solutions.
Deepen your understanding
Use our AI to break down complex signals.
Select an AI action to generate more depth.
In this tutorial, we buildOpenHarness from scratch to better understand how a practical agent harness works. We recreate the major building blocks that make an agent system useful, including tool use, typed tool schemas, permissions, lifecycle hooks, memory, skills, context compaction, retry logic, cost tracking, and multi-agent coordination. Instead of treating an agent framework as a black box, we expose the full control flow and watch how the harness receives a user task, lets the model decide the next action, validates and executes tool calls, returns observations, and continues the loop until the task is complete. We also keep the implementation runnable so we can experiment with the architecture without needing API keys or complex infrastructure.
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import fnmatch
import io
import json
import os
import re
import tempfile
import textwrap
import time
import traceback
import types
import typing
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from enum import Enum
MISSING = dataclasses.MISSING
UnionType = getattr(types, "UnionType", None)
def run_async(coro):
"""Run a coroutine to completion from sync code, even inside a live loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply()
return loop.run_until_complete(coro)
except Exception:
import threading
box: dict = {}
def _runner():
new_loop = asyncio.new_event_loop()
try:
box["value"] = new_loop.run_until_complete(coro)
finally:
new_loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
return box["value"]
return asyncio.run(coro)
BANNER = "β" * 78
def banner(title: str) -> None:
print("\n" + BANNER)
print(f" {title}")
print(BANNER)
def explain(title: str, body: str) -> None:
banner(title)
print(textwrap.fill(textwrap.dedent(body).strip(), width=78))
print("-" * 78)
def short(text: str, n: int = 240) -> str:
text = " ".join(str(text).split())
return text if len(text) <= n else text[: n - 1] + "β¦"
@dataclass
class Usage:
input_tokens: int = 0
output_tokens: int = 0
def __add__(self, other: "Usage") -> "Usage":
return Usage(self.input_tokens + other.input_tokens,
self.output_tokens + other.output_tokens)
@dataclass
class ToolCall:
id: str
name: str
arguments: dict
@dataclass
class AssistantTurn:
"""One turn produced by the model: some text + zero or more tool calls."""
text: str = ""
tool_calls: list = field(default_factory=list)
stop_reason: str = "end_turn"
usage: Usage = field(default_factory=Usage)
@dataclass
class Message:
"""A single message in the running conversation transcript."""
role: str
content: str = ""
tool_calls: list = field(default_factory=list)
tool_call_id: str = ""
name: str = ""
def count_tokens(text: str) -> int:
"""Cheap, provider-agnostic token estimate (~4 chars/token)."""
if not text:
return 0
return max(1, round(len(text) / 4))
PRICE_BOOK = {
"mock-sonnet": (3.00, 15.00),
"claude-sonnet-4": (3.00, 15.00),
"gpt-4.1": (2.00, 8.00),
"default": (1.00, 3.00),
}
class CostMeter:
"""Accumulates token usage and converts it to an estimated dollar cost."""
def __init__(self, model: str):
self.model = model
self.total = Usage()
self.calls = 0
def add(self, usage: Usage) -> None:
self.total = self.total + usage
self.calls += 1
@property
def dollars(self) -> float:
pin, pout = PRICE_BOOK.get(self.model, PRICE_BOOK["default"])
return (self.total.input_tokens / 1e6) * pin + \
(self.total.output_tokens / 1e6) * pout
def summary(self) -> str:
return (f"{self.calls} model call(s) | "
f"in={self.total.input_tokens} out={self.total.output_tokens} tok | "
f"~${self.dollars:.5f} ({self.model})")
def fld(description: str = "", default=MISSING, default_factory=MISSING):
"""Declare a tool-input field with a description (and optional default)."""
md = {"description": description}
if default_factory is not MISSING:
return field(default_factory=default_factory, metadata=md)
if default is not MISSING:
return field(default=default, metadata=md)
return field(metadata=md)
def _is_optional(t) -> bool:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
return type(None) in typing.get_args(t)
return False
def _py_to_json_type(t) -> dict:
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
args = [a for a in typing.get_args(t) if a is not type(None)]
return _py_to_json_type(args[0]) if args else {"type": "string"}
if t is str:
return {"type": "string"}
if t is bool:
return {"type": "boolean"}
if t is int:
return {"type": "integer"}
if t is float:
return {"type": "number"}
if origin is list or t is list:
args = typing.get_args(t)
item = _py_to_json_type(args[0]) if args else {"type": "string"}
return {"type": "array", "items": item}
if origin is dict or t is dict:
return {"type": "object"}
return {"type": "string"}
def build_json_schema(model_cls) -> dict:
"""Turn a dataclass input model into a JSON Schema (object with properties)."""
hints = typing.get_type_hints(model_cls)
props, required = {}, []
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
js = dict(_py_to_json_type(t))
desc = f.metadata.get("description", "")
if desc:
js["description"] = desc
props[f.name] = js
has_default = (f.default is not MISSING) or (f.default_factory is not MISSING)
if not has_default and not _is_optional(t):
required.append(f.name)
schema = {"type": "object", "properties": props}
if required:
schema["required"] = required
return schema
def _coerce(v, t):
origin = typing.get_origin(t)
if origin is typing.Union or (UnionType is not None and origin is UnionType):
if v is None:
return None
args = [a for a in typing.get_args(t) if a is not type(None)]
return _coerce(v, args[0]) if args else v
if t is str:
return v if isinstance(v, str) else str(v)
if t is bool:
if isinstance(v, bool):
return v
if isinstance(v, str):
return v.strip().lower() in ("1", "true", "yes", "y", "on")
return bool(v)
if t is int:
return int(v)
if t is float:
return float(v)
if origin is list or t is list:
args = typing.get_args(t)
it = args[0] if args else str
if not isinstance(v, list):
v = [v]
return [_coerce(x, it) for x in v]
if origin is dict or t is dict:
return dict(v) if v else {}
return v
def instantiate(model_cls, raw: dict):
"""Validate + coerce raw JSON args into a typed input instance."""
hints = typing.get_type_hints(model_cls)
raw = raw or {}
kwargs = {}
for f in dataclasses.fields(model_cls):
t = hints.get(f.name, str)
if f.name in raw and raw[f.name] is not None:
try:
kwargs[f.name] = _coerce(raw[f.name], t)
except (TypeError, ValueError) as e:
raise ValueError(f"Bad value for '{f.name}': {e}")
elif f.default is not MISSING:
kwargs[f.name] = f.default
elif f.default_factory is not MISSING:
kwargs[f.name] = f.default_factory()
elif _is_optional(t):
kwargs[f.name] = None
else:
raise ValueError(f"Missing required argument '{f.name}'")
return model_cls(**kwargs)
class PermissionKind(Enum):
"""How dangerous a tool is β drives the default permission policy."""
READ = "read"
WRITE = "write"
EXECUTE = "execute"
META = "meta"
@dataclass
class ToolResult:
output: str
is_error: bool = False
metadata: dict = field(default_factory=dict)
class ToolContext:
"""Everything a tool may need at runtime (services + shared state)."""
def __init__(self, **services):
self.__dict__.update(services)
class BaseTool:
"""Base class for all tools. Subclasses set name/description/InputModel/kind
and implement `execute`. Schema + validation are handled here."""
name: str = "base"
description: str = ""
InputModel = None
kind: PermissionKind = PermissionKind.READ
def schema(self) -> dict:
return {
"name": self.name,
"description": self.description,
"kind": self.kind.value,
"input_schema": (build_json_schema(self.InputModel)
if self.InputModel else
{"type": "object", "properties": {}}),
}
async def run(self, raw_args: dict, ctx: ToolContext) -> ToolResult:
args = instantiate(self.InputModel, raw_args) if self.InputModel else None
return await self.execute(args, ctx)
async def execute(self, args, ctx: ToolContext) -> ToolResult:
raise NotImplementedError
class ToolRegistry:
def __init__(self):
self._tools: dict = {}
def register(self, tool: BaseTool) -> "ToolRegistry":
self._tools[tool.name] = tool
return self
def get(self, name: str) -> BaseTool | None:
return self._tools.get(name)
def schemas(self) -> list:
return [t.schema() for t in self._tools.values()]
def names(self) -> list:
return list(self._tools)
class VirtualFS:
"""In-memory filesystem. Keeps the tutorial safe & deterministic in Colab."""
def __init__(self):
self.files: dict = {}
@staticmethod
def norm(path: str) -> str:
return path.lstrip("./").strip()
def write(self, path: str, content: str) -> None:
self.files[self.norm(path)] = content
def read(self, path: str) -> str:
return self.files[self.norm(path)]
def exists(self, path: str) -> bool:
return self.norm(path) in self.files
def list(self, pattern: str = "*") -> list:
return sorted(p for p in self.files if fnmatch.fnmatch(p, pattern))
def tree(self) -> str:
if not self.files:
return "(empty)"
return "\n".join(f" {p} ({len(c)} bytes)"
for p, c in sorted(self.files.items()))
class PermissionMode(Enum):
DEFAULT = "default"
AUTO = "auto"
PLAN = "plan"
@dataclass
class PermissionDecision:
action: str
reason: str = ""
SENSITIVE_PATTERNS = [
"/etc/*", "*/.ssh/*", "*.pem", "*id_rsa*", "*/.aws/*",
"*credentials*", "*.env", "*/secrets/*",
]
class PermissionChecker:
def __init__(self, mode: PermissionMode = PermissionMode.DEFAULT,
path_rules: list | None = None,
denied_commands: list | None = None):
self.mode = mode
self.path_rules = path_rules or []
self.denied_commands = denied_commands or []
def _check_path(self, path: str) -> PermissionDecision | None:
for pat in SENSITIVE_PATTERNS:
if fnmatch.fnmatch(path, pat):
return PermissionDecision("deny", f"sensitive path '{path}' ({pat})")
for rule in self.path_rules:
if fnmatch.fnmatch(path, rule["pattern"]):
if rule.get("allow", True):
return PermissionDecision("allow", f"path rule allows '{rule['pattern']}'")
return PermissionDecision("deny", f"path rule blocks '{rule['pattern']}'")
return None
def _check_command(self, command: str) -> PermissionDecision | None:
for pat in self.denied_commands:
if re.search(pat, command):
return PermissionDecision("deny", f"denied command matched /{pat}/")
return None
def check(self, tool: BaseTool, args: dict) -> PermissionDecision:
if "path" in args and tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
d = self._check_path(str(args["path"]))
if d:
return d
if "command" in args:
d = self._check_command(str(args["command"]))
if d:
return d
if self.mode is PermissionMode.AUTO:
return PermissionDecision("allow", "auto mode")
if self.mode is PermissionMode.PLAN:
if tool.kind in (PermissionKind.WRITE, PermissionKind.EXECUTE):
return PermissionDecision("deny", "plan mode blocks writes/executes")
return PermissionDecision("allow", "plan mode allows reads")
if tool.kind in (PermissionKind.READ, PermissionKind.META):
return PermissionDecision("allow", "safe tool")
return PermissionDecision("ask", f"{tool.kind.value} requires approval")
async def auto_approve(tool, args, reason) -> bool:
print(f" π approval needed: {tool.name} ({reason}) -> [auto-approved]")
return True
async def interactive_approve(tool, args, reason) -> bool:
ans = input(f" π Allow {tool.name}({short(json.dumps(args), 80)})? [y/N] ")
return ans.strip().lower().startswith("y")
@dataclass
class HookOutcome:
blocked: bool = False
reason: str = ""
arguments: dict | None = None
class HookManager:
"""Lifecycle events around every tool call (like PreToolUse/PostToolUse)."""
def __init__(self):
self.pre: list = []
self.post: list = []
def add_pre(self, fn):
self.pre.append(fn); return self
def add_post(self, fn):
self.post.append(fn); return self
def run_pre(self, call: ToolCall, tool: BaseTool, ctx: ToolContext) -> HookOutcome:
args = dict(call.arguments)
for fn in self.pre:
out = fn(call, tool, ctx)
if out is None:
continue
if out.blocked:
return out
if out.arguments is not None:
args = out.arguments
return HookOutcome(arguments=args)
def run_post(self, call, tool, result: ToolResult, ctx) -> ToolResult:
for fn in self.post:
new = fn(call, tool, result, ctx)
if new is not None:
result = new
return result
We begin by establishing the foundation for the OpenHarness-style tutorial, including imports, async execution support, helper functions, and core data models. We define messages, tool calls, usage tracking, token counting, cost estimation, permission modes, hooks, and the virtual filesystem that keeps execution safe. We use this snippet to establish the basic architecture on which all subsequent tools, agent loops, and demos depend.
@dataclass
class WriteFileInput:
path: str = fld("File path to write")
content: str = fld("Full file content")
class WriteFileTool(BaseTool):
name = "write_file"
description = "Create or overwrite a file with the given content."
InputModel = WriteFileInput
kind = PermissionKind.WRITE
async def execute(self, args: WriteFileInput, ctx) -> ToolResult:
ctx.vfs.write(args.path, args.content)
return ToolResult(f"Wrote {len(args.content)} bytes to {args.path}")
@dataclass
class ReadFileInput:
path: str = fld("File path to read")
class ReadFileTool(BaseTool):
name = "read_file"
description = "Read the full contents of a file."
InputModel = ReadFileInput
kind = PermissionKind.READ
async def execute(self, args: ReadFileInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
return ToolResult(ctx.vfs.read(args.path))
@dataclass
class EditInput:
path: str = fld("File to edit")
old: str = fld("Exact substring to replace")
new: str = fld("Replacement text")
class EditTool(BaseTool):
name = "edit"
description = "Replace the first occurrence of `old` with `new` in a file."
InputModel = EditInput
kind = PermissionKind.WRITE
async def execute(self, args: EditInput, ctx) -> ToolResult:
if not ctx.vfs.exists(args.path):
return ToolResult(f"No such file: {args.path}", is_error=True)
text = ctx.vfs.read(args.path)
if args.old not in text:
return ToolResult(f"`old` not found in {args.path}", is_error=True)
ctx.vfs.write(args.path, text.replace(args.old, args.new, 1))
return ToolResult(f"Edited {args.path}: replaced 1 occurrence.")
@dataclass
class ListFilesInput:
pattern: str = fld("Glob pattern", default="*")
class ListFilesTool(BaseTool):
name = "list_files"
description = "List files matching a glob pattern."
InputModel = ListFilesInput
kind = PermissionKind.READ
async def execute(self, args: ListFilesInput, ctx) -> ToolResult:
files = ctx.vfs.list(args.pattern)
return ToolResult("\n".join(files) if files else "(no matches)")
@dataclass
class GrepInput:
pattern: str = fld("Regex to search for")
path_glob: str = fld("Which files to search", default="*")
class GrepTool(BaseTool):
name = "grep"
description = "Search file contents with a regular expression."
InputModel = GrepInput
kind = PermissionKind.READ
async def execute(self, args: GrepInput, ctx) -> ToolResult:
rx = re.compile(args.pattern)
hits = []
for p in ctx.vfs.list(args.path_glob):
for i, line in enumerate(ctx.vfs.read(p).splitlines(), 1):
if rx.search(line):
hits.append(f"{p}:{i}: {line.strip()}")
return ToolResult("\n".join(hits) if hits else "(no matches)")
@dataclass
class RunPythonInput:
files: list = fld("VFS files to exec in order in one namespace",
default_factory=list)
code: str = fld("Extra Python code to run after the files", default="")
class RunPythonTool(BaseTool):
name = "run_python"
description = ("Execute Python from the virtual filesystem (and/or an inline "
"snippet) and capture stdout. Used to run tests.")
InputModel = RunPythonInput
kind = PermissionKind.EXECUTE
async def execute(self, args: RunPythonInput, ctx) -> ToolResult:
source_parts = []
for p in args.files:
if not ctx.vfs.exists(p):
return ToolResult(f"No such file: {p}", is_error=True)
source_parts.append(ctx.vfs.read(p))
if args.code:
source_parts.append(args.code)
source = "\n\n".join(source_parts)
buf = io.StringIO()
sandbox_globals = {"__name__": "__main__", "__builtins__": __builtins__}
try:
with contextlib.redirect_stdout(buf):
exec(compile(source, "<agent_code>", "exec"), sandbox_globals)
except Exception as e:
frames = [f for f in traceback.extract_tb(e.__traceback__)
if f.filename == "<agent_code>"]
loc = ""
if frames:
last = frames[-1]
src_lines = source.splitlines()
text = (src_lines[last.lineno - 1].strip()
if 0 < last.lineno <= len(src_lines) else "")
loc = f" (line {last.lineno}: {text})" if text else f" (line {last.lineno})"
out = buf.getvalue()
msg = f"{type(e).__name__}: {e}{loc}"
return ToolResult(f"{out}\n{msg}".strip(), is_error=True)
return ToolResult(buf.getvalue().strip() or "(ran with no output)")
@dataclass
class ShellInput:
command: str = fld("Shell command to run")
class ShellTool(BaseTool):
name = "shell"
description = "Run a shell command (simulated over the virtual filesystem)."
InputModel = ShellInput
kind = PermissionKind.EXECUTE
async def execute(self, args: ShellInput, ctx) -> ToolResult:
cmd = args.command.strip()
if cmd.startswith("ls"):
return ToolResult("\n".join(ctx.vfs.list("*")) or "(empty)")
if cmd.startswith("cat "):
p = cmd[4:].strip()
if ctx.vfs.exists(p):
return ToolResult(ctx.vfs.read(p))
return ToolResult(f"cat: {p}: No such file", is_error=True)
if cmd.startswith("echo "):
return ToolResult(cmd[5:])
return ToolResult(f"(simulated) `{cmd}` executed.")
_FAKE_WEB = {
"vector database":
"Vector databases (FAISS, Milvus, pgvector) index embeddings for "
"approximate nearest-neighbour search powering RAG.",
"agent harness":
"An agent harness is the infrastructure around an LLM: tools, memory, "
"permissions, and a loop that turns model output into real actions.",
"exponential backoff":
"Exponential backoff retries failed calls after 2^n growing delays to "
"avoid hammering a struggling service.",
}
@dataclass
class WebSearchInput:
query: str = fld("Search query")
class WebSearchTool(BaseTool):
name = "web_search"
description = "Search the web for up-to-date information. (Mocked in tutorial.)"
InputModel = WebSearchInput
kind = PermissionKind.READ
async def execute(self, args: WebSearchInput, ctx) -> ToolResult:
await asyncio.sleep(0.05)
q = args.query.lower()
for key, val in _FAKE_WEB.items():
if key in q:
return ToolResult(f"[mock result] {val}")
return ToolResult(f"[mock result] No canned answer for '{args.query}'. "
f"(Wire a real search API here.)")
@dataclass
class SkillInput:
name: str = fld("Name of the skill to load")
class SkillTool(BaseTool):
name = "skill"
description = ("Load an on-demand skill (markdown playbook) into context. "
"Only load a skill right before you need its guidance.")
InputModel = SkillInput
kind = PermissionKind.META
async def execute(self, args: SkillInput, ctx) -> ToolResult:
body = ctx.skills.load(args.name)
if body is None:
return ToolResult(f"Unknown skill: {args.name}", is_error=True)
return ToolResult(f"Loaded skill '{args.name}':\n\n{body}")
@dataclass
class RememberInput:
note: str = fld("A durable fact/preference to remember across sessions")
class RememberTool(BaseTool):
name = "remember"
description = "Persist a durable fact to long-term memory (MEMORY.md)."
InputModel = RememberInput
kind = PermissionKind.META
async def execute(self, args: RememberInput, ctx) -> ToolResult:
ctx.memory.append(args.note)
return ToolResult(f"Remembered: {args.note}")
@dataclass
class AskUserInput:
question: str = fld("A question to ask the human")
class AskUserTool(BaseTool):
name = "ask_user"
description = "Ask the human a clarifying question."
InputModel = AskUserInput
kind = PermissionKind.META
async def execute(self, args: AskUserInput, ctx) -> ToolResult:
canned = getattr(ctx, "canned_answers", {})
ans = canned.get(args.question, "(no answer configured)")
return ToolResult(f"User answered: {ans}")
@dataclass
class SpawnAgentInput:
role: str = fld("Which agent profile to spawn (e.g. 'researcher')")
task: str = fld("The task to delegate to that agent")
class SpawnAgentTool(BaseTool):
name = "spawn_agent"
description = ("Delegate a subtask to a specialized subagent and get its "
"final result. Multiple spawns in one turn run in parallel.")
InputModel = SpawnAgentInput
kind = PermissionKind.META
async def execute(self, args: SpawnAgentInput, ctx) -> ToolResult:
result = await ctx.spawn(args.role, args.task)
return ToolResult(f"[{args.role}] {result}")
@dataclass
class Skill:
name: str
description: str
body: str
class SkillLibrary:
"""Holds skills. The *summary* (name + description) is injected into the
system prompt; the *body* is only pulled in when the model loads it."""
def __init__(self):
self._skills: dict = {}
self.loaded: list = []
@staticmethod
def parse_markdown(md: str) -> Skill:
"""Parse a skill .md with YAML-ish frontmatter (name/description)."""
name, desc, body = "unnamed", "", md
m = re.match(r"^---\s*\n(.*?)\n---\s*\n(.*)$", md, re.S)
if m:
front, body = m.group(1), m.group(2)
for line in front.splitlines():
if ":" in line:
k, v = line.split(":", 1)
if k.strip() == "name":
name = v.strip()
elif k.strip() == "description":
desc = v.strip()
return Skill(name=name, description=desc, body=body.strip())
def add_markdown(self, md: str) -> "SkillLibrary":
s = self.parse_markdown(md)
self._skills[s.name] = s
return self
def summary(self) -> str:
if not self._skills:
return "(none)"
return "\n".join(f"- {s.name}: {s.description}"
for s in self._skills.values())
def load(self, name: str) -> str | None:
s = self._skills.get(name)
if s is None:
return None
if name not in self.loaded:
self.loaded.append(name)
return s.body
class MemoryStore:
def __init__(self, path: str):
self.path = path
def read(self) -> str:
if os.path.exists(self.path):
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read().strip()
return ""
def append(self, note: str) -> None:
with open(self.path, "a", encoding="utf-8") as fh:
fh.write(f"- {note}\n")
def reset(self) -> None:
if os.path.exists(self.path):
os.remove(self.path)
We build the practical tool layer that allows the harness to read files, write files, edit content, list files, search text, run Python code, simulate shell commands, search mock web data, load skills, remember notes, ask the user, and spawn subagents. We define each tool with typed inputs, descriptions, permissions, and executable behavior so the agent can interact with its environment in a structured way. We also add the skill library and persistent memory store, which help the agent load knowledge on demand and preserve useful information across sessions.
class TransientLLMError(Exception):
"""A retryable error (timeout / 5xx / rate-limit)."""
class LLMBrain:
"""Interface: given the transcript + tool schemas, produce the next turn."""
model = "abstract"
async def stream(self, messages: list, tools: list, on_event=None) -> AssistantTurn:
raise NotImplementedError
def Say(text: str) -> dict:
return {"final": True, "text": text, "calls": []}
def Use(thought: str, calls: list) -> dict:
return {"final": False, "text": thought, "calls": calls}
def last_tool_results(messages: list) -> list:
"""The tool results produced since the last assistant turn."""
out = []
for m in reversed(messages):
if m.role == "tool":
out.append({"name": m.name, "content": m.content})
elif m.role == "assistant":
break
return list(reversed(out))
class ScriptedBrain(LLMBrain):
model = "mock-sonnet"
def __init__(self, script: list, name: str = "mock-sonnet"):
self.script = script
self.i = 0
self.model = name
async def stream(self, messages, tools, on_event=None) -> AssistantTurn:
if self.i >= len(self.script):
return AssistantTurn(text="(script exhausted)", stop_reason="end_turn")
step = self.script[self.i]
self.i += 1
action = step(messages) if callable(step) else step
text = action["text"]
if on_event and text:
on_event("text", text)
calls = []
for j, (tname, targs) in enumerate(action["calls"]):
tc = ToolCall(id=f"call_{self.i}_{j}", name=tname, arguments=targs)
calls.append(tc)
if on_event:
on_event("tool_call", tc)
in_tok = sum(count_tokens(m.content) for m in messages) + 200
out_tok = count_tokens(text) + sum(count_tokens(json.dumps(c.arguments))
for c in calls)
return AssistantTurn(
text=text,
tool_calls=calls,
stop_reason="tool_use" if calls else "end_turn",
usage=Usage(in_tok, out_tok),
)
class FlakyBrain(LLMBrain):
"""Wraps a brain and fails the first `fail_times` calls with a transient
error, so we can watch the engine retry with exponential backoff."""
def __init__(self, inner: LLMBrain, fail_times: int = 1):
self.inner = inner
self.failures_left = fail_times
self.model = inner.model
async def stream(self, messages, tools, on_event=None) -> AssistantTurn:
if self.failures_left > 0:
self.failures_left -= 1
raise TransientLLMError("simulated 503 from provider")
return await self.inner.stream(messages, tools, on_event)
class RetryingBrain(LLMBrain):
def __init__(self, inner: LLMBrain, retries: int = 4, base_delay: float = 0.05):
self.inner = inner
self.retries = retries
self.base_delay = base_delay
self.model = inner.model
async def stream(self, messages, tools, on_event=None) -> AssistantTurn:
attempt = 0
while True:
try:
return await self.inner.stream(messages, tools, on_event)
except TransientLLMError as e:
if attempt >= self.retries:
raise
delay = self.base_delay * (2 ** attempt)
print(f" β transient error ({e}); "
f"retry {attempt + 1}/{self.retries} in {delay:.2f}s")
await asyncio.sleep(delay)
attempt += 1
class RealLLMBrain(LLMBrain):
def __init__(self, api_format: str, model: str, api_key: str,
base_url: str, system: str = ""):
self.api_format = api_format
self.model = model
self.api_key = api_key
self.base_url = base_url.rstrip("/")
self.system = system
def _post(self, url: str, headers: dict, payload: dict) -> dict:
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code in (429, 500, 502, 503, 504):
raise TransientLLMError(f"HTTP {e.code}")
raise
except urllib.error.URLError as e:
raise TransientLLMError(str(e))
def _tools_anthropic(self, tools: list) -> list:
return [{"name": t["name"], "description": t["description"],
"input_schema": t["input_schema"]} for t in tools]
def _tools_openai(self, tools: list) -> list:
return [{"type": "function", "function": {
"name": t["name"], "description": t["description"],
"parameters": t["input_schema"]}} for t in tools]
def _msgs_anthropic(self, messages: list) -> list:
out = []
for m in messages:
if m.role == "user":
out.append({"role": "user", "content": m.content})
elif m.role == "assistant":
blocks = []
if m.content:
blocks.append({"type": "text", "text": m.content})
for tc in m.tool_calls:
blocks.append({"type": "tool_use", "id": tc.id,
"name": tc.name, "input": tc.arguments})
out.append({"role": "assistant", "content": blocks or m.content})
elif m.role == "tool":
out.append({"role": "user", "content": [
{"type": "tool_result", "tool_use_id": m.tool_call_id,
"content": m.content}]})
return out
def _msgs_openai(self, messages: list) -> list:
out = []
if self.system:
out.append({"role": "system", "content": self.system})
for m in messages:
if m.role == "user":
out.append({"role": "user", "content": m.content})
elif m.role == "assistant":
msg = {"role": "assistant", "content": m.content or None}
if m.tool_calls:
msg["tool_calls"] = [{
"id": tc.id, "type": "function",
"function": {"name": tc.name,
"arguments": json.dumps(tc.arguments)}}
for tc in m.tool_calls]
out.append(msg)
elif m.role == "tool":
out.append({"role": "tool", "tool_call_id": m.tool_call_id,
"content": m.content})
return out
async def stream(self, messages, tools, on_event=None) -> AssistantTurn:
loop = asyncio.get_event_loop()
if self.api_format == "anthropic":
payload = {"model": self.model, "max_tokens": 1024,
"system": self.system,
"messages": self._msgs_anthropic(messages),
"tools": self._tools_anthropic(tools)}
headers = {"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json"}
url = f"{self.base_url}/v1/messages"
data = await loop.run_in_executor(None, self._post, url, headers, payload)
text, calls = "", []
for block in data.get("content", []):
if block.get("type") == "text":
text += block["text"]
elif block.get("type") == "tool_use":
calls.append(ToolCall(block["id"], block["name"],
block.get("input", {})))
u = data.get("usage", {})
usage = Usage(u.get("input_tokens", 0), u.get("output_tokens", 0))
stop = "tool_use" if calls else "end_turn"
else:
payload = {"model": self.model,
"messages": self._msgs_openai(messages),
"tools": self._tools_openai(tools)}
headers = {"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"}
url = f"{self.base_url}/chat/completions"
data = await loop.run_in_executor(None, self._post, url, headers, payload)
choice = data["choices"][0]["message"]
text = choice.get("content") or ""
calls = []
for tc in (choice.get("tool_calls") or []):
fn = tc["function"]
try:
cargs = json.loads(fn.get("arguments") or "{}")
except json.JSONDecodeError:
cargs = {}
calls.append(ToolCall(tc["id"], fn["name"], cargs))
u = data.get("usage", {})
usage = Usage(u.get("prompt_tokens", 0), u.get("completion_tokens", 0))
stop = "tool_use" if calls else "end_turn"
if on_event and text:
on_event("text", text)
for c in calls:
if on_event:
on_event("tool_call", c)
return AssistantTurn(text=text, tool_calls=calls,
stop_reason=stop, usage=usage)
def make_real_brain(system: str = "") -> LLMBrain | None:
"""Build a real brain from env vars, or return None to fall back to mock."""
if os.environ.get("USE_REAL_LLM", "0") not in ("1", "true", "True"):
return None
if os.environ.get("ANTHROPIC_API_KEY"):
return RealLLMBrain(
api_format="anthropic",
model=os.environ.get("MODEL", "claude-sonnet-4-6"),
api_key=os.environ["ANTHROPIC_API_KEY"],
base_url=os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com"),
system=system)
if os.environ.get("OPENAI_API_KEY"):
return RealLLMBrain(
api_format="openai",
model=os.environ.get("MODEL", "gpt-4.1"),
api_key=os.environ["OPENAI_API_KEY"],
base_url=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1"),
system=system)
return None
We define the model brain layer that decides what the agent does next in the loop. We create a scripted mock brain for deterministic execution, a flaky brain to simulate provider errors, a retrying wrapper with exponential backoff, and a real provider brain for Anthropic- or OpenAI-compatible APIs. We use this snippet to show that the harness remains the same while the intelligence layer can switch between mock execution and real LLM calls.
Assembling the System Prompt and the QueryEngine Agent Loop
def assemble_system_prompt(*, base: str, project_context: str, memory: str,
skills_summary: str, tool_names: list) -> str:
"""Mirror OpenHarness prompt assembly: base + CLAUDE.md + MEMORY.md +
on-demand skill list + available tools."""
parts = [base.strip()]
if project_context:
parts.append(f"## Project context (CLAUDE.md)\n{project_context.strip()}")
if memory:
parts.append(f"## Long-term memory (MEMORY.md)\n{memory.strip()}")
if skills_summary and skills_summary != "(none)":
parts.append("## Available skills (load on demand with the `skill` tool)\n"
+ skills_summary)
parts.append("## Available tools\n" + ", ".join(tool_names))
return "\n\n".join(parts)
def estimate_messages_tokens(messages: list) -> int:
return sum(count_tokens(m.content) +
sum(count_tokens(json.dumps(c.arguments)) for c in m.tool_calls)
for m in messages)
def maybe_compact(messages: list, *, max_tokens: int, keep_last: int = 4,
verbose: bool = True) -> list:
"""Auto-compaction: when the transcript grows past `max_tokens`, summarize
the older middle into one note while preserving the original task and the
most recent turns. Real OpenHarness asks the model to write the summary and
also preserves task state + channel logs; we use a heuristic here."""
tok = estimate_messages_tokens(messages)
if tok <= max_tokens or len(messages) <= keep_last + 1:
return messages
first = messages[0]
tail = messages[-keep_last:]
middle = messages[1:-keep_last]
facts = []
for m in middle:
if m.role == "tool" and not m.content.startswith("ERROR"):
facts.append(f"{m.name}: {short(m.content, 80)}")
elif m.role == "assistant" and m.content:
facts.append(f"assistant: {short(m.content, 80)}")
summary = Message(
role="system",
content=("[Auto-compacted context] Earlier "
f"{len(middle)} messages summarized. Key results:\n - "
+ "\n - ".join(facts[-8:])),
)
compacted = [first, summary] + tail
if verbose:
print(f" π auto-compaction: {len(messages)} msgs "
f"(~{tok} tok) -> {len(compacted)} msgs "
f"(~{estimate_messages_tokens(compacted)} tok)")
return compacted
def console_printer(event: str, data) -> None:
if event == "text" and data:
print(f" π€ {short(data, 400)}")
elif event == "tool_call":
print(f" β³ call {data.name}({short(json.dumps(data.arguments), 120)})")
elif event == "tool_result":
flag = "β" if data.get("is_error") else "β"
print(f" {flag} {short(data['output'], 200)}")
class QueryEngine:
def __init__(self, *, brain: LLMBrain, registry: ToolRegistry,
ctx: ToolContext, perms: PermissionChecker, hooks: HookManager,
system_prompt: str, cost: CostMeter | None = None,
approve=auto_approve, max_turns: int = 12,
compact_at_tokens: int | None = None):
self.brain = brain
self.registry = registry
self.ctx = ctx
self.perms = perms
self.hooks = hooks
self.system_prompt = system_prompt
self.cost = cost or CostMeter(getattr(brain, "model", "default"))
self.approve = approve
self.max_turns = max_turns
self.compact_at_tokens = compact_at_tokens
async def execute_tool(self, call: ToolCall, on_event=None) -> ToolResult:
tool = self.registry.get(call.name)
if tool is None:
return ToolResult(f"Unknown tool: {call.name}", is_error=True)
decision = self.perms.check(tool, call.arguments)
if decision.action == "deny":
r = ToolResult(f"DENIED by permissions: {decision.reason}", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
if decision.action == "ask":
ok = await self.approve(tool, call.arguments, decision.reason)
if not ok:
r = ToolResult("DENIED by user.", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
pre = self.hooks.run_pre(call, tool, self.ctx)
if pre.blocked:
r = ToolResult(f"BLOCKED by hook: {pre.reason}", is_error=True)
if on_event:
on_event("tool_result", {"output": r.output, "is_error": True})
return r
args = pre.arguments
try:
result = await tool.run(args, self.ctx)
except Exception as e:
result = ToolResult(f"Tool raised: {e}", is_error=True)
result = self.hooks.run_post(call, tool, result, self.ctx)
if on_event:
on_event("tool_result", {"output": result.output,
"is_error": result.is_error})
return result
async def run(self, task: str, on_event=console_printer) -> str:
messages: list = [Message(role="user", content=task)]
for turn in range(self.max_turns):
if self.compact_at_tokens:
messages = maybe_compact(messages, max_tokens=self.compact_at_tokens)
assistant = await self.brain.stream(messages, self.registry.schemas(),
on_event)
self.cost.add(assistant.usage)
if assistant.stop_reason != "tool_use" or not assistant.tool_calls:
return assistant.text
messages.append(Message(role="assistant", content=assistant.text,
tool_calls=assistant.tool_calls))
results = await asyncio.gather(
*[self.execute_tool(tc, on_event) for tc in assistant.tool_calls]
)
for tc, res in zip(assistant.tool_calls, results):
content = res.output
if res.is_error and not content.startswith(("DENIED", "BLOCKED")):
content = "ERROR: " + content
elif res.is_error:
content = "ERROR: " + content
messages.append(Message(role="tool", name=tc.name,
tool_call_id=tc.id, content=content))
return "(stopped: reached max_turns)"
DEFAULT_TOOLS = [
WriteFileTool, ReadFileTool, EditTool, ListFilesTool, GrepTool,
RunPythonTool, ShellTool, WebSearchTool, SkillTool, RememberTool,
AskUserTool, SpawnAgentTool,
]
def build_registry(tool_classes=None) -> ToolRegistry:
reg = ToolRegistry()
for cls in (tool_classes or DEFAULT_TOOLS):
reg.register(cls())
return reg
BASE_SYSTEM = (
"You are an autonomous coding/research agent running inside OpenHarness. "
"Use tools to take real actions. Think step by step, verify your work by "
"running it, and only stop when the task is truly complete."
)
We assemble the system prompt, estimate transcript size, compact long conversations, print streaming events, and define the main QueryEngine. We make the engine responsible for asking the brain what to do, checking permissions, running hooks, executing tools, collecting results, and looping until the task is finished. We also register the default tools and create the base system instruction that guides the agent toward verified and tool-based work.
Running the Agent Loop, Permission Governance, and On-Demand Skills Demos
async def demo_agent_loop():
explain(
"DEMO 1 β The Agent Loop (write code β run β see it fail β fix β pass)",
"""This is the centerpiece. A scripted 'brain' drives the real harness:
each turn it emits tool calls, the engine runs them through permissions +
hooks, feeds results back, and the brain REACTS. It writes a factorial
module with an off-by-one bug, writes a test, runs it, sees the failure
in the tool result, fixes the bug, re-runs to green, and saves a note to
memory. We also wrap the brain in retry/backoff and a one-time transient
failure so you can watch recovery β and we track token cost throughout.""")
buggy = (
"def factorial(n):\n"
" result = 1\n"
" for i in range(1, n):\n"
" result *= i\n"
" return result\n"
)
test_code = (
'assert factorial(0) == 1, "0! should be 1"\n'
'assert factorial(5) == 120, "5! should be 120 but got " + str(factorial(5))\n'
'print("All tests passed: 0!=1 and 5!=120")\n'
)
def reactive_fix(messages):
outs = last_tool_results(messages)
failed = any("ERROR" in o["content"] or "Assert" in o["content"]
for o in outs)
if failed:
return Use("The test failed β classic off-by-one in the loop range. "
"range(1, n) stops at n-1, so I'll make it range(1, n + 1).",
[("edit", {"path": "mathutils.py",
"old": "range(1, n)",
"new": "range(1, n + 1)"})])
return Use("Tests already pass; re-running to confirm.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})])
script = [
Use("First I'll create the factorial module.",
[("write_file", {"path": "mathutils.py", "content": buggy})]),
Use("Now a test that pins down the expected behavior.",
[("write_file", {"path": "test_math.py", "content": test_code})]),
Use("Let me run the test to see if it works.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
reactive_fix,
Use("Re-running the test after the fix.",
[("run_python", {"files": ["mathutils.py", "test_math.py"]})]),
Use("I'll remember the lesson for next time.",
[("remember",
{"note": "factorial(n): loop must be range(1, n+1); "
"range(1, n) gives n!/n (off-by-one)."})]),
lambda m: Say("Done β Created mathutils.py with factorial(), wrote a "
"test, caught an off-by-one bug (range(1, n) β "
"range(1, n + 1)), fixed it, and the test now passes. "
"Saved the lesson to memory."),
]
vfs = VirtualFS()
memory = MemoryStore(os.path.join(tempfile.gettempdir(), "oh_demo1_mem.md"))
memory.reset()
skills = SkillLibrary()
registry = build_registry()
ctx = ToolContext(vfs=vfs, memory=memory, skills=skills)
hooks = HookManager()
hooks.add_pre(lambda call, tool, c:
print(f" πͺ pre-hook saw {tool.name}") or None)
brain = RetryingBrain(FlakyBrain(ScriptedBrain(script), fail_times=1))
cost = CostMeter("mock-sonnet")
engine = QueryEngine(
brain=brain, registry=registry, ctx=ctx,
perms=PermissionChecker(PermissionMode.DEFAULT),
hooks=hooks,
system_prompt=assemble_system_prompt(
base=BASE_SYSTEM, project_context="", memory=memory.read(),
skills_summary=skills.summary(), tool_names=registry.names()),
cost=cost, approve=auto_approve, max_turns=10,
)
print("\n[running the agent loop]\n")
final = await engine.run("Implement factorial() with a test; make tests pass.")
print(f"\n FINAL ANSWER:\n {final}")
print(f"\n Virtual filesystem now contains:\n{vfs.tree()}")
print(f"\n Final mathutils.py:\n" +
textwrap.indent(vfs.read("mathutils.py"), " "))
print(f"\n π° cost: {cost.summary()}")
print("\n TAKEAWAY: the model only decides WHAT to do; the harness handles "
"HOW β permissions, hooks, execution, retries, results, and cost.")
async def demo_permissions():
explain(
"DEMO 2 β Governance: permission modes, path rules, denied commands",
"""The harness is the safety boundary. The same write_file tool behaves
differently under default/auto/plan modes; sensitive paths are always
denied; path rules and denied-command patterns add fine-grained control;
and PreToolUse hooks can veto a call outright.""")
write = WriteFileTool()
shell = ShellTool()
print("\n (a) Same write under each mode:")
for mode in (PermissionMode.DEFAULT, PermissionMode.AUTO, PermissionMode.PLAN):
d = PermissionChecker(mode).check(write, {"path": "notes.txt", "content": "x"})
print(f" {mode.value:8s} -> {d.action.upper():5s} ({d.reason})")
print("\n (b) Built-in sensitive-path protection (any mode):")
for p in ["app/main.py", ".env", "~/.ssh/id_rsa", "secrets/key.txt"]:
d = PermissionChecker(PermissionMode.AUTO).check(write, {"path": p, "content": "x"})
print(f" write {p:18s} -> {d.action.upper():5s} ({d.reason})")
print("\n (c) Custom path_rules + denied_commands:")
pc = PermissionChecker(
PermissionMode.AUTO,
path_rules=[{"pattern": "build/*", "allow": False}],
denied_commands=[r"rm\s+-rf\s+/", r"DROP\s+TABLE"])
print(" write build/x ->",
pc.check(write, {"path": "build/x", "content": "x"}).action.upper())
print(" shell 'rm -rf /' ->",
pc.check(shell, {"command": "rm -rf /"}).action.upper())
print(" shell 'ls -la' ->",
pc.check(shell, {"command": "ls -la"}).action.upper())
print("\n (d) A PreToolUse security hook that vetoes writing credentials:")
def security_hook(call, tool, ctx):
if tool.name == "write_file" and "password" in call.arguments.get("content", "").lower():
return HookOutcome(blocked=True, reason="content looks like a secret")
return None
hooks = HookManager().add_pre(security_hook)
ctx = ToolContext(vfs=VirtualFS())
engine = QueryEngine(brain=ScriptedBrain([]), registry=build_registry(),
ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
hooks=hooks, system_prompt="", approve=auto_approve)
r = await engine.execute_tool(
ToolCall("c1", "write_file",
{"path": "config.txt", "content": "password=hunter2"}),
on_event=console_printer)
print(f" result -> {r.output}")
print("\n TAKEAWAY: permissions + hooks form layered defense around every "
"single tool call, independent of how clever the model is.")
async def demo_skills():
explain(
"DEMO 3 β Skills: on-demand knowledge loading",
"""Skills are markdown playbooks. Only their name+description sit in the
system prompt (cheap); the full body is pulled in ONLY when the model
decides it needs it, via the `skill` tool. This keeps context small while
giving the agent deep, swappable expertise. Compatible in spirit with
anthropics/skills (drop a .md file in and it just works).""")
skills = SkillLibrary()
skills.add_markdown(textwrap.dedent("""\
---
name: commit
description: Create clean, conventional git commits
---
# Commit skill
2. Group related changes; write a Conventional Commit subject
(feat:, fix:, docs:, refactor:, test:).
3. Keep the subject <= 50 chars; explain *why* in the body.
"""))
skills.add_markdown(textwrap.dedent("""\
---
name: review
description: Review code for bugs, security, and clarity
---
# Review skill
Check: correctness, edge cases, input validation, error handling,
secrets in code, naming, and test coverage. Be specific and kind.
"""))
registry = build_registry()
ctx = ToolContext(vfs=VirtualFS(), skills=skills,
memory=MemoryStore(os.path.join(tempfile.gettempdir(),
"oh_d3.md")))
sysprompt = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", memory="",
skills_summary=skills.summary(), tool_names=registry.names())
print(" System prompt's skills section (names+descriptions only):")
print(textwrap.indent(skills.summary(), " "))
print(f"\n Loaded skill bodies so far: {skills.loaded} (none yet!)")
script = [
Use("This is a commit task β I'll load the commit skill first.",
[("skill", {"name": "commit"})]),
lambda m: Say("Following the commit playbook I just loaded, I'd stage the "
"related changes and write: 'fix: correct factorial "
"off-by-one in range bound'."),
]
engine = QueryEngine(brain=ScriptedBrain(script), registry=registry, ctx=ctx,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt=sysprompt)
print("\n[running]\n")
final = await engine.run("Help me commit my changes properly.")
print(f"\n FINAL: {final}")
print(f"\n Loaded skill bodies now: {skills.loaded} (loaded on demand)")
print("\n TAKEAWAY: knowledge is paged in just-in-time, not crammed into "
"every prompt.")
We implement the first three demos to show the harness in action. We first run a complete agent loop where the system writes buggy factorial code, tests it, detects the failure, fixes the bug, reruns the test, and saves the lesson to memory. We then explore permission governance and on-demand skills to see how safety controls and reusable playbooks improve the agent workflow.
Adding Memory, Context Compaction, and Multi-Agent Coordination
async def demo_memory():
explain(
"DEMO 4 β Memory: persistent MEMORY.md across sessions",
"""Long-term memory survives between runs by persisting to MEMORY.md. In
session 1 the agent records a user preference; in a brand-new session 2
(fresh engine, fresh transcript) that memory is injected into the system
prompt, so the agent already 'knows' the user.""")
mem_path = os.path.join(tempfile.gettempdir(), "oh_demo4_MEMORY.md")
memory = MemoryStore(mem_path)
memory.reset()
registry = build_registry()
print(" ββ Session 1 ββ")
ctx1 = ToolContext(vfs=VirtualFS(), memory=memory, skills=SkillLibrary())
s1 = [
Use("I'll remember the user's stated preferences.",
[("remember", {"note": "User prefers metric units and concise answers."})]),
lambda m: Say("Noted your preferences for next time."),
]
eng1 = QueryEngine(brain=ScriptedBrain(s1), registry=registry, ctx=ctx1,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(),
system_prompt=assemble_system_prompt(
base=BASE_SYSTEM, project_context="",
memory=memory.read(),
skills_summary="(none)", tool_names=registry.names()))
await eng1.run("Remember that I like metric units and short answers.")
print(f" MEMORY.md is now:\n{textwrap.indent(memory.read(), ' ')}")
print("\n ββ Session 2 (new session, memory reloaded from disk) ββ")
memory2 = MemoryStore(mem_path)
ctx2 = ToolContext(vfs=VirtualFS(), memory=memory2, skills=SkillLibrary())
sysprompt2 = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", memory=memory2.read(),
skills_summary="(none)", tool_names=registry.names())
print(" The new system prompt already contains:")
print(textwrap.indent("## Long-term memory (MEMORY.md)\n" + memory2.read(),
" "))
s2 = [lambda m: Say("Since you prefer metric and brevity: it's about 5 km. π")]
eng2 = QueryEngine(brain=ScriptedBrain(s2), registry=registry, ctx=ctx2,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt=sysprompt2)
final = await eng2.run("How far is a 5000 meter run, roughly?")
print(f"\n FINAL: {final}")
print("\n TAKEAWAY: state that should outlive a conversation goes to memory, "
"then is re-injected at the start of future sessions.")
async def demo_compaction():
explain(
"DEMO 5 β Context auto-compaction (multi-day sessions without overflow)",
"""As a session grows, the transcript can blow past the context window.
Auto-compaction summarizes the older middle of the conversation into a
compact note while preserving the original task and the most recent
turns β so long-running agents keep going. (We force a tiny threshold to
trigger it; real OpenHarness asks the model to write the summary.)""")
msgs = [Message(role="user", content="Build and verify a data pipeline.")]
for i in range(8):
msgs.append(Message(role="assistant", content=f"Step {i}: doing work...",
tool_calls=[ToolCall(f"c{i}", "shell",
{"command": f"process chunk {i}"})]))
msgs.append(Message(role="tool", name="shell", tool_call_id=f"c{i}",
content=f"chunk {i} processed: 1000 rows ok " * 4))
before = estimate_messages_tokens(msgs)
print(f" Before: {len(msgs)} messages, ~{before} tokens")
compacted = maybe_compact(msgs, max_tokens=300, keep_last=4)
after = estimate_messages_tokens(compacted)
print(f" After: {len(compacted)} messages, ~{after} tokens "
f"({100 * (before - after) // before}% smaller)")
print("\n The injected summary message:")
print(textwrap.indent(compacted[1].content, " "))
print("\n TAKEAWAY: the harness manages the context window so the agent can "
"run far longer than a single window allows.")
async def demo_multi_agent():
explain(
"DEMO 6 β Swarm coordination: spawning parallel subagents",
"""A lead agent decomposes a task and delegates to specialized subagents.
Each subagent is its OWN harness (own loop, own brain, own tools). Two
researchers run IN PARALLEL (issued in the same turn β asyncio.gather),
then a writer synthesizes their findings. The team registry tracks who
did what.""")
def researcher_profile():
reg = build_registry([WebSearchTool])
script = [
Use("Researching via web search.",
[("web_search", {"query": "PLACEHOLDER"})]),
lambda m: Say("Summary: " +
short(last_tool_results(m)[0]["content"], 160)),
]
return ScriptedBrain(script), reg
def writer_profile():
reg = build_registry([WriteFileTool])
script = [lambda m: Say("Synthesized brief combining both research notes "
"into a coherent paragraph.")]
return ScriptedBrain(script), reg
profiles = {"researcher": researcher_profile, "writer": writer_profile}
vfs = VirtualFS()
memory = MemoryStore(os.path.join(tempfile.gettempdir(), "oh_d6.md"))
skills = SkillLibrary()
team: list = []
def make_spawn():
async def spawn(role: str, task: str) -> str:
factory = profiles.get(role)
if not factory:
return f"(no such role: {role})"
child_brain, child_reg = factory()
if role == "researcher" and child_brain.script:
child_brain.script[0] = Use(f"Researching: {task}",
[("web_search", {"query": task})])
child_ctx = ToolContext(vfs=vfs, memory=memory, skills=skills,
spawn=spawn)
child_engine = QueryEngine(
brain=child_brain, registry=child_reg, ctx=child_ctx,
perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt="(subagent)",
approve=auto_approve, max_turns=6)
print(f" π§βπ§ spawned [{role}] for: {short(task, 60)}")
result = await child_engine.run(task, on_event=None)
team.append({"role": role, "task": task, "result": result})
return result
return spawn
ctx = ToolContext(vfs=vfs, memory=memory, skills=skills, spawn=make_spawn())
registry = build_registry()
lead_script = [
Use("I'll split this: research vector databases AND agent harnesses in "
"parallel, then have a writer combine the findings.",
[("spawn_agent", {"role": "researcher",
"task": "vector database for RAG"}),
("spawn_agent", {"role": "researcher",
"task": "agent harness design"})]),
Use("Both research notes are in β delegating synthesis to the writer.",
[("spawn_agent", {"role": "writer",
"task": "combine the two research notes"})]),
lambda m: Say("Coordination complete: 2 researchers (parallel) + 1 "
"writer produced a combined brief."),
]
engine = QueryEngine(brain=ScriptedBrain(lead_script), registry=registry,
ctx=ctx, perms=PermissionChecker(PermissionMode.AUTO),
hooks=HookManager(), system_prompt="(lead agent)",
max_turns=8)
print("\n[running the lead agent]\n")
t0 = time.time()
final = await engine.run("Produce a short brief on building RAG agents.")
dt = time.time() - t0
print(f"\n FINAL: {final}")
print(f"\n Team registry ({len(team)} subagent runs, total {dt:.3f}s):")
for entry in team:
print(f" - [{entry['role']}] {short(entry['task'], 40)} -> "
f"{short(entry['result'], 80)}")
print("\n TAKEAWAY: the same loop nests β a 'tool' can be an entire agent, "
"enabling parallel teams and delegation.")
async def demo_real_provider():
explain(
"DEMO 7 β Swap in a REAL model (Anthropic / OpenAI-compatible)",
"""Everything above ran on a deterministic mock brain β zero keys, zero
cost. Going live changes exactly ONE thing: the brain. The engine, tools,
permissions, hooks, skills, memory, and coordinator are untouched. This
is the whole point of a harness: the model is pluggable.""")
print(textwrap.dedent("""\
To run the SAME harness on a real model, set environment variables and
re-run (works with any OpenAI- or Anthropic-compatible endpoint that
OpenHarness supports: Claude, GPT, Kimi, GLM, DeepSeek, Qwen, Groq,
Ollama, OpenRouter, ...):
import os
os.environ["USE_REAL_LLM"] = "1"
# --- Anthropic-style ---
os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
os.environ["MODEL"] = "claude-sonnet-4-6"
# --- or OpenAI-style (incl. local Ollama) ---
# os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["OPENAI_BASE_URL"] = "http://localhost:11434/v1"
# os.environ["MODEL"] = "llama-3.3-70b"
Then build the engine with the real brain instead of the mock:
brain = make_real_brain(system=system_prompt) or ScriptedBrain([...])
engine = QueryEngine(brain=brain, registry=registry, ctx=ctx, ...)
await engine.run("Refactor utils.py and add tests.")
"""))
sysprompt = assemble_system_prompt(
base=BASE_SYSTEM, project_context="", memory="",
skills_summary="(none)", tool_names=build_registry().names())
real = make_real_brain(system=sysprompt)
if real is None:
print(" [USE_REAL_LLM not set β staying on the mock brain. "
"Set the env vars above and re-run to go live.]")
return
print(f" [LIVE] Using real provider: {real.api_format} / {real.model}\n")
vfs = VirtualFS()
ctx = ToolContext(vfs=vfs, memory=MemoryStore(
os.path.join(tempfile.gettempdir(), "oh_real.md")),
skills=SkillLibrary(), canned_answers={})
engine = QueryEngine(
brain=RetryingBrain(real), registry=build_registry(), ctx=ctx,
perms=PermissionChecker(PermissionMode.AUTO), hooks=HookManager(),
system_prompt=sysprompt, cost=CostMeter(real.model), max_turns=12)
final = await engine.run(
"Create greet.py with a function greet(name) that returns "
"'Hello, <name>!', then write and run a quick test to prove it works.")
print(f"\n FINAL: {final}")
print(f"\n Files:\n{vfs.tree()}")
print(f"\n π° {engine.cost.summary()}")
async def main():
banner("OpenHarness From Scratch β guided walkthrough")
print(textwrap.dedent("""
We will build up the harness one subsystem at a time:
1. The agent loop (tools, run/verify/fix, retries, cost)
2. Permissions (modes, sensitive paths, rules, hook veto)
3. Skills (on-demand knowledge)
4. Memory (persistent MEMORY.md across sessions)
5. Compaction (surviving long sessions)
6. Multi-agent (parallel subagent delegation)
7. Real provider (one-line swap to a live model)
Architecture (what each piece is responsible for):
User prompt
β
βΌ
QueryEngine βββΊ LLM brain (mock or real) "WHAT to do"
β β² β tool_use
β ββββββββββββββ
βΌ
For each tool call: Permission ββΊ PreHook ββΊ Execute ββΊ PostHook
β β β β
deny/ask veto/edit sandbox redact
β
βΌ
Tool result βββΊ back into the transcript βββΊ loop
""").rstrip())
await demo_agent_loop()
await demo_permissions()
await demo_skills()
await demo_memory()
await demo_compaction()
await demo_multi_agent()
await demo_real_provider()
banner("All demos complete π")
print(textwrap.dedent("""\
You just built the core of an agent harness:
β’ a streaming tool-call loop with retries & cost tracking
β’ type-validated, self-describing tools
β’ layered governance (permission modes + lifecycle hooks)
β’ on-demand skills and persistent memory
β’ context auto-compaction
β’ nested multi-agent coordination
β’ a one-line swap to a real LLM provider
To go deeper, study the real project: https://github.com/HKUDS/OpenHarness
(43+ tools, plugin ecosystem, MCP client, React/Ink TUI, the `oh` CLI,
and the `ohmo` personal agent). "The model is the agent; the code is the
harness."
"""))
run_async(main())
We complete the tutorial with memory, context compaction, multi-agent coordination, real-provider switching, and the final guided walkthrough. We demonstrate how memory persists across sessions, how old context gets summarized, and how a lead agent delegates work to parallel subagents before synthesizing results. We finish by running all demos through the main function, yielding a complete, runnable view of an OpenHarness-style agent system.
Conclusion
In conclusion, we have a complete hands-on understanding of how an agent harnesses language-model reasoning with real, controlled actions. We saw how each subsystem contributes to reliability: tools make the agent capable, permissions keep execution safe, hooks add governance, memory preserves useful preferences, skills load knowledge only when needed, and compaction helps longer sessions stay manageable. We also explored how the same harness can support deterministic mock brains, real LLM providers, and nested subagents that work together on delegated tasks.