Mycelium: runtime guards for AI agents
Runtime guards for AI agents. Prevents predictable failures before they reach the LLM: duplicate tool execution on retry, stale context, invalid tool calls. Framework-agnostic. Experimental (v1.2).
How it fits
Mycelium wraps your agent loop. Guards run on messages and tools before side effects and before the next LLM call. Not a framework. Not observability.
LangGraph · CrewAI · Python
Install
Requires Python 3.10 or later.
pip install mycelium-runtime pip install 'mycelium-runtime[redis]' # optional pip install 'mycelium-runtime[postgres]' mycelium init # quickstart template (action ledger) mycelium init --full # all guards mycelium init --minimal # smaller multi-guard template mycelium demo # langgraph#7417 bug and fix
API
# Context protect, protect_sync, Session MessageValidator, HistoryGuard # Tools bounded, bounded_sync, ToolRegistry, ToolRunner # Actions ledger, ledger_sync, task_ledger, task_ledger_sync ActionLedger, TaskLedger, StateFlush, AuditReceiptEmitter, verify_receipt # Config load_config, load_config_from_string, MyceliumConfig
Context
from mycelium import protect, protect_sync, Session
from mycelium import MessageValidator, HistoryGuard
# async
@protect(entity_param="customer_id", ttl=60)
async def fetch_customer(customer_id: str) -> dict:
return await db.get(customer_id)
async def run(customer_id: str, messages: list):
async with Session():
await fetch_customer(customer_id=customer_id)
messages = MessageValidator().repair(messages)
guard = HistoryGuard(max_tokens=100_000)
messages = guard.validate(messages)
guard.check_for_drops(messages) # after framework trimming
return await llm.ainvoke(messages)
# sync (CrewAI, Smolagents)
@protect_sync(entity_param="customer_id", ttl=60)
def fetch_customer(customer_id: str) -> dict:
return db.get(customer_id)
with Session():
fetch_customer(customer_id="c1")
Tools
from mycelium import bounded, bounded_sync, ToolRegistry, ToolRunner
registry = ToolRegistry(allowed=["fetch_customer", "delete_file"])
@registry.register
@bounded(
schema={"customer_id": {"type": "string", "required": True, "pattern": r"^c\d+$"}},
output_schema={"customer_id": {"type": "string", "required": True}, "name": {"type": "string", "required": True}},
allowed_paths=["/workspace/src/"],
path_param="path",
)
async def fetch_customer(customer_id: str) -> dict:
return await db.get(customer_id)
runner = ToolRunner(registry=registry, max_llm_retries=2, max_tool_retries=3)
result = await runner.call(fetch_customer, customer_id="c1")
# with LLM retry loop
result, messages = await runner.run_with_llm_retry(
fetch_customer,
messages=messages,
tool_call_id="call_1",
kwargs={"customer_id": "c1"},
invoke_llm=llm.ainvoke,
parse_tool_kwargs=extract_tool_args,
)
# sync
@bounded_sync(schema={"customer_id": {"type": "string", "required": True}})
def fetch_customer(customer_id: str) -> dict:
return db.get(customer_id)
Actions
from mycelium import ledger, ledger_sync, task_ledger_sync
# tool-level idempotency
@ledger_sync()
def send_payment(amount: float, recipient: str) -> dict:
return gateway.charge(amount, recipient)
send_payment(amount=100.0, recipient="acct_123", tool_call_id="call_abc")
send_payment(amount=100.0, recipient="acct_123", tool_call_id="call_abc") # cached
# or request_id instead of tool_call_id
send_payment(amount=100.0, recipient="acct_123", request_id="invoice-42")
@ledger()
async def send_payment(amount: float, recipient: str) -> dict:
return await gateway.charge(amount, recipient)
# task-level idempotency
@task_ledger_sync(id_from=["invoice_id"])
def process_invoice(invoice_id: str, amount: float) -> dict:
...
return {"invoice_id": invoice_id, "status": "paid"}
process_invoice(invoice_id="inv-42", amount=100.0)
process_invoice(invoice_id="inv-42", amount=200.0) # returns first result
# correction retry: use a new task_id
process_invoice(invoice_id="inv-42", task_id="inv-42-attempt-2")
YAML
action_ledger:
storage: file # memory | file | redis | postgres
path: ./mycelium-ledger.json
tools: [send_payment, delete_file]
task_ledger:
storage: file
path: ./mycelium-task-ledger.json
tasks: [process_invoice]
state_flush:
storage: file
path: ./mycelium-state.json
flush_on: [cancel, disconnect, error]
audit_receipt:
agent_id: payment-agent
signing_key_env: MYCELIUM_SIGNING_KEY
storage: file
path: ./mycelium-receipts.jsonl
auto: true
tools:
fetch_customer:
protect: {entity_param: customer_id, ttl: 60}
bounded:
schema:
customer_id: {type: string, required: true, pattern: "^c\\d+$"}
output_schema:
customer_id: {type: string, required: true}
name: {type: string, required: true}
delete_file:
bounded:
schema:
path: {type: string, required: true}
allowed_paths: [/workspace/src/]
path_param: path
send_payment:
bounded:
schema:
amount: {type: number, required: true}
recipient: {type: string, required: true}
tasks:
process_invoice:
ledger: true
id_from: [invoice_id]
registry:
auto: true
runner:
max_llm_retries: 2
max_tool_retries: 3
history_guard:
max_tokens: 100000
max_messages: 1000
message_validator:
enabled: true
from mycelium import load_config
import my_tools
config = load_config("mycelium.yaml")
# option A: instrument a module
tools = config.instrument(my_tools)
# option B: decorate one function (name must match tools: key)
@config.apply
def send_payment(amount: float, recipient: str) -> dict:
return gateway.charge(amount, recipient)
@config.apply_task
def process_invoice(invoice_id: str) -> dict:
...
with config.run(thread_id):
messages = config.prepare_messages(messages)
runner = config.build_runner(registry=config.registry)
Common searches
How do I prevent duplicate tool execution in LangGraph?
Claim a durable idempotency key (tool_call_id) before the side
effect runs. LangGraph Cloud can redispatch long tool calls while the first is still
in flight (langgraph#7417).
Mycelium's @ledger_sync() records an in-flight claim in file, Redis, or
Postgres so the second invocation returns the cached result.
pip install mycelium-runtime && mycelium demo
Is Mycelium a replacement for Langfuse or LangSmith?
No. Langfuse and LangSmith are observability and tracing. They show what happened after an agent runs. Mycelium is runtime prevention: idempotency ledgers, tool boundary checks, and context guards during execution. Use both if you want traces and guards.
How do I make AI agent tool calls idempotent in Python?
Wrap side-effect tools with a ledger that atomically claims
request_id or tool_call_id before execution. On framework
retry or worker redispatch, return the stored result instead of re-running the tool.
Mycelium ships @ledger_sync() (sync) and @ledger() (async).
Framework-agnostic; no LangGraph import required.