A practical spectrum from scripted tasks to goal-seeking systems (LoA-1 → LoA-3).
If Agents & Process Automation 101 was the why and Levels of Autonomy (LoA) for Enterprise Automation was the dial, this is the wiring you can ship, now tightened to match how you’ll actually deploy: one generic agent, SOP packaged with each model version, and the caller chooses the version (e.g., @Champion
/@Challenger
or separate endpoints). Tools are Unity Catalog (UC) functions only. Policy lives in YAML. We finish with a standardised Assurance Gate and real telemetry.
This article is meant for a more technical audience compared to the previous ones; architects and engineers who want a blueprint they can ship easily.
TL;DR
-
Agent ≠ SOP. Ship one generic agent; each model version bundles its SOP. Caller picks the version (alias/endpoint).
-
Tools = UC functions. Reads → UC SQL FUNCTION. Actions → Model-as-Function (MLflow pyfunc + Serving). The agent never calls raw APIs or MCP directly.
-
Planner on rails. LLM plans one step at a time using only allow-listed UC tools from the SOP, executes, observes, reflects.
-
Guardrails without extra boxes. Structure/PII via UC FUNCTIONS; policy thresholds as CEL expressions inside the SOP.
-
Assurance Gate. Verify (UC) → policy (CEL) → MLflow judges (groundedness/relevance/safety) → auto-finalize or HITL.
-
Ops that scale. MLflow Tracing → OpenTelemetry; aliases for
Champion
/Challenger
rollout; clean lineage because SOP is versioned with the model.Why this shape works
Two things unlock LoA-3 in the real world: governed tools and explicit policy.
- Putting tools behind UC securables gives you
EXECUTE
/endpoint perms, audit, and lineage by default. - Keeping policy in YAML (CEL expressions) makes change-control boring—in a good way. Thresholds move via PR, not code pushes.
- A single generic agent eliminates drift. Swapping SOPs becomes a version bump, not a new code path.
SOP (YAML): single source of truth
- Putting tools behind UC securables gives you
Store SOPs in a UC Volume and bundle one with each model version. Keep it contract‑first: planner limits, allow‑listed tools, memory, CEL gates, judges.
name: "vendor_onboarding"
goal: "Onboard vendor if compliant"
loa: 3
planner:
model: "databricks-gpt-oss-20b" # or "databricks-gpt-oss-120b"
base_url: "https://<workspace-host>/serving-endpoints/databricks-gpt-oss-20b/invocations"
temperature: 0.2
max_steps: 6
max_reflections: 2
never_guess: true
system_prompt: |
You are a control-plane planner. At each turn propose ONE next step as STRICT JSON:
{"use": <tool_name>, "args": {...}, "stop": true|false, "reason": "<why>"}
Use ONLY the allowed tools and their schemas. If unsafe/unknown, set stop=true.
If the last tool failed:
• Retry only when idempotent and policy allows; otherwise propose an alternative or stop ("HITL").
tools:
- name: LookupCompany
kind: sql_function
uc_fn: "procure.tools.lookup_company"
description: "Resolve a company by legal name; returns {id,legal_name,country}."
input_schema: { type: object, properties: { name: { type: string } }, required: [name] }
- name: SanctionsCheck
kind: model_function
uc_model: "procure.tools.sanctions_check"
uc_endpoint: "https://<workspace>/serving-endpoints/sanctions_check/invocations"
description: "KYC screen; returns {sanctions:'clear'|'flag', evidence:[uri...]}"
input_schema: { type: object, properties: { entity_id: { type: string } }, required: [entity_id] }
- name: CreateVendor
kind: model_function
uc_model: "procure.tools.create_vendor"
uc_endpoint: "https://<workspace>/serving-endpoints/create_vendor/invocations"
description: "Create vendor in ERP; args: {company:{...}}; returns created record."
input_schema: { type: object, properties: { company: { type: object } }, required: [company] }
guardrails:
uc_functions:
- "procure.governance.validate_vendor" # structural/PII check
thresholds:
amount_max: 1000
gates_cel:
- if: "input.amount > thresholds.amount_max" then: "require_hitl"
- if: "size(ctx.SanctionsCheck.evidence) == 0" then: "fail_and_retry"
failure:
default:
max_retries: 1
backoff_ms: [250]
retryable: ["timeout", "rate limit", "429", "temporarily unavailable", "503"]
non_retryable: ["permission", "invalid argument", "validation", "not found"]
per_tool:
CreateVendor:
max_retries: 0
LookupCompany:
max_retries: 2
backoff_ms: [250, 500]
judging:
pass_threshold: 0.82
rubric: ["policy_adherence", "source_citations", "numerical_consistency"]
observability:
traces: "mlflow+otel"
audit: "cloudevents.v1"
Why this works: UC‑governed tools + YAML policy + a single generic agent—you get EXECUTE/endpoint perms, audit/lineage by default; thresholds move via PR, not code; and swapping SOPs is a version bump, not a new code path.
Agent core (LangChain tool‑calling; Assurance Gate at the end)
Here’s the on-rails core the rest hangs on: a single MLflow LangChain model, built from an SOP (YAML) bundled with the version, drives a LangChain tool-calling agent that’s constrained to an allow-list of Unity Catalog tools. Inputs to each tool are typed (JSON Schema → Pydantic), and the loop runs plan → act → observe, with no off-catalog calls. Before anything commits, an Assurance Gate fires: UC structural/PII checks, then CEL policy thresholds from the SOP, then MLflow judges (groundedness/relevance/safety). Pass and it auto-finalizes; miss and it routes to HITL.
# agent/sop_agent_langchain_chain.py
# LangChain chain that:
# 1) builds a tool-calling AgentExecutor from SOP
# 2) runs it
# 3) applies the Assurance Gate (UC guardrail fn → CEL → MLflow judges)
from __future__ import annotations
import os, json, yaml, time, requests
from typing import Any, Dict, List
from collections import defaultdict
from databricks import sql
from jsonschema import validate as js_validate
from celpy import Environment as CELEnv
from pydantic import BaseModel, create_model
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import StructuredTool
from langchain_core.runnables import RunnableLambda
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_openai import ChatOpenAI
from mlflow.genai.judges import is_grounded, is_safe, is_context_relevant
# ───────────── UC helpers ─────────────
def _sql_connect():
return sql.connect(
server_hostname=os.getenv("DB_HOST"),
http_path=os.getenv("DB_WAREHOUSE"),
access_token=os.getenv("DB_TOKEN"),
)
def call_uc_sql_function(fn_path: str, args: Dict[str, Any]) -> Any:
with _sql_connect() as c:
cur = c.cursor()
placeholders = ",".join(["?"] * len(args))
cur.execute(f"SELECT {fn_path}({placeholders})", tuple(args.values()))
return cur.fetchone()[0]
def call_uc_model(endpoint_url: str, inputs: Dict[str, Any]) -> Any:
r = requests.post(
endpoint_url,
headers={"Authorization": f"Bearer {os.getenv('DATABRICKS_TOKEN')}",
"Content-Type": "application/json"},
json={"inputs": inputs}, timeout=60)
r.raise_for_status()
return r.json()
def uc_validate_payload(fn_path: str, payload_json: str) -> bool:
with _sql_connect() as c:
cur = c.cursor()
cur.execute(f"SELECT {fn_path}(?)", (payload_json,))
return bool(cur.fetchone()[0])
# ───────────── CEL + judges ─────────────
def cel_decision(gates: List[dict], cel_ctx: Dict[str, Any]) -> str:
env = CELEnv()
for g in gates:
if env.compile(g["if"]).program().evaluate(cel_ctx):
return g["then"]
return "allow"
def judge_mlflow(proposal: Dict[str, Any], pass_q: float,
request: str, response: str, context: Any) -> Dict[str, Any]:
def yes(x):
v = getattr(getattr(x, "feedback", None), "value", "no")
return 1.0 if str(v).lower() == "yes" else 0.0
parts = {
"groundedness": yes(is_grounded(request=request, response=response, context=context)),
"context_relevance": yes(is_context_relevant(request=request, context=context)),
"safety": yes(is_safe(content=response)),
}
score = sum(parts.values()) / max(1, len(parts))
return {"score": score, "verdict": "pass" if score >= float(pass_q) else "fail"}
# ───────────── JSON Schema → Pydantic for tool signatures ─────────────
_JSON_TO_PYD = {"string": str, "number": float, "integer": int, "boolean": bool, "object": Dict[str, Any], "array": List[Any]}
def _schema_to_pydantic(name: str, schema: Dict[str, Any]) -> BaseModel:
props, req = schema.get("properties", {}) or {}, set(schema.get("required", []) or [])
fields = {k: (_JSON_TO_PYD.get(spec.get("type", "object"), Dict[str, Any]), (... if k in req else None))
for k, spec in props.items()} or {"payload": (Dict[str, Any], ...)}
return create_model(name, **fields)
# ───────────── Tools with SOP-driven retry/backoff + error observation ─────────────
def _make_tool(spec: Dict[str, Any],
failure_cfg: Dict[str, Any],
attempts: defaultdict) -> StructuredTool:
ArgsModel = _schema_to_pydantic(f"{spec['name']}Args", spec["input_schema"])
per_tool = (failure_cfg.get("per_tool", {}) or {}).get(spec["name"], {})
default = failure_cfg.get("default", {}) or {}
max_retries = int(per_tool.get("max_retries", default.get("max_retries", 0)))
backoff_ms = list(per_tool.get("backoff_ms", default.get("backoff_ms", [])))
retryable = [s.lower() for s in default.get("retryable", [])]
nonretry = [s.lower() for s in default.get("non_retryable", [])]
def _run(**kwargs):
js_validate(kwargs, spec["input_schema"])
attempt = 0
while True:
try:
out = call_uc_sql_function(spec["uc_fn"], kwargs) if spec["kind"] == "sql_function" \
else call_uc_model(spec["uc_endpoint"], kwargs)
attempts[(spec["name"], "ok")] += 1
return out if isinstance(out, (dict, list, str, int, float, bool)) \
else json.loads(json.dumps(out, default=str))
except Exception as e:
msg = str(e)
low = msg.lower()
attempts[(spec["name"], "err")] += 1
# 1) hard stop (non-retryable) → return error observation
if any(s in low for s in nonretry):
return {"_error": {
"tool": spec["name"], "message": msg[:500],
"retryable": False, "attempts": attempt, "policy_max_retries": max_retries
}}
# 2) retries exhausted → return error observation
if attempt >= max_retries:
return {"_error": {
"tool": spec["name"], "message": msg[:500],
"retryable": any(s in low for s in retryable), "attempts": attempt,
"policy_max_retries": max_retries
}}
# 3) looks retryable? backoff a bit and try again
if retryable and any(s in low for s in retryable):
delay = backoff_ms[attempt] if attempt < len(backoff_ms) else 250
time.sleep(min(delay, 1000) / 1000.0)
attempt += 1
continue
# 4) not clearly retryable → return error observation
return {"_error": {
"tool": spec["name"], "message": msg[:500],
"retryable": False, "attempts": attempt, "policy_max_retries": max_retries
}}
return StructuredTool.from_function(
name=spec["name"],
description=spec.get("description", f"UC tool {spec['name']}"),
args_schema=ArgsModel,
func=_run,
return_direct=False,
)
def _build_agent_executor(sop: Dict[str, Any]) -> AgentExecutor:
# tools
attempts = defaultdict(int)
failure_cfg = sop.get("failure", {"default": {"max_retries": 0}})
tools = [_make_tool(t, failure_cfg, attempts) for t in sop["tools"]]
# planner LLM (Databricks GPT-OSS via OpenAI-compatible endpoint)
llm = ChatOpenAI(
model=sop["planner"]["model"],
base_url=sop["planner"]["base_url"],
api_key=os.getenv("DATABRICKS_TOKEN") or os.getenv("LLM_API_KEY"),
temperature=float(sop["planner"].get("temperature", 0.2)),
)
prompt = ChatPromptTemplate.from_messages([
("system", sop["planner"]["system_prompt"].strip()),
("human", "{input}"),
MessagesPlaceholder("agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
return AgentExecutor(
agent=agent,
tools=tools,
max_iterations=int(sop["planner"].get("max_steps", 6)),
return_intermediate_steps=True,
handle_parsing_errors=True,
verbose=False,
)
# ───────────── Chain factory (Runner) ─────────────
def build_chain_from_sop(sop_path: str):
sop = yaml.safe_load(open(sop_path))
executor = _build_agent_executor(sop)
def _invoke_agent(user_input: Dict[str, Any]):
# 1) invoke agent
block = {
"goal": sop["goal"],
"constraints": {"never_guess": bool(sop["planner"].get("never_guess", True)),
"max_steps": int(sop["planner"].get("max_steps", 6))},
"input": dict(user_input),
"tools": [t["name"] for t in sop["tools"]],
}
res = executor.invoke({"input": json.dumps(block, ensure_ascii=False)})
# 2) collect outputs (latest per tool)
ctx_map: Dict[str, Any] = {}
for action, output in res.get("intermediate_steps", []):
name = getattr(action, "tool", "tool")
try:
ctx_map[name] = json.loads(output) if isinstance(output, str) else output
except Exception:
ctx_map[name] = output
# 3) Assurance Gate
proposal = {"input": dict(user_input), "ctx": ctx_map}
payload = json.dumps(proposal)
for fn in sop["guardrails"]["uc_functions"]:
if not uc_validate_payload(fn, payload):
return {"status": "FAILED_GUARDRAIL", "reason": f"UC guardrail failed: {fn}"}
decision = cel_decision(
sop["guardrails"]["gates_cel"],
{"input": user_input, "ctx": ctx_map, "thresholds": sop["guardrails"]["thresholds"]}
)
judgment = judge_mlflow(
proposal=proposal,
pass_q=float(sop["judging"]["pass_threshold"]),
request=sop["goal"],
response=payload,
context=ctx_map.get("SanctionsCheck", {}).get("evidence", []),
)
if decision in ("fail_and_retry", "require_hitl") or judgment["verdict"] == "fail":
return {"status": "REQUIRES_HITL", "confidence": judgment["score"], "result": ctx_map}
return {"status": "COMPLETED", "confidence": judgment["score"], "result": ctx_map}
# LangChain expects a Runnable; wrap the callable
return RunnableLambda(_invoke_agent)
Wrap tools as UC functions
Keep reads in SQL; wrap actions as Model‑as‑Function (MLflow pyfunc + Serving). Grant EXECUTE/endpoint perms to the agent SP. Return JSON from SQL for predictable Python handling.
Read (JSON out):
CREATE OR REPLACE FUNCTION procure.tools.lookup_company(name STRING)
RETURNS STRING
RETURN to_json(named_struct(
'id', id, 'legal_name', legal_name, 'country', country
))
FROM (
SELECT id, legal_name, country
FROM procure.refdata.companies
WHERE lower(legal_name) = lower(name)
LIMIT 1
);
GRANT EXECUTE ON FUNCTION procure.tools.lookup_company TO `spn:procurement-agent`;
Retrieval memory (policy recall):
CREATE OR REPLACE FUNCTION procure.knowledge.recall_policy(query STRING) -- replace with vector search in prod
RETURNS STRING
RETURN to_json(named_struct('passages',
collect_list(named_struct('id', id, 'text', passage))
))
FROM (
SELECT id, passage
FROM procure.knowledge.policy_index
WHERE contains(passage, query)
LIMIT 5
);
GRANT EXECUTE ON FUNCTION procure.knowledge.recall_policy TO `spn:procurement-agent`;
Guardrail (structural check):
CREATE OR REPLACE FUNCTION procure.governance.validate_vendor(payload STRING)
RETURNS BOOLEAN
RETURN from_json(
payload,
'input STRUCT<amount DOUBLE>, ctx STRUCT<SanctionsCheck STRUCT<evidence ARRAY<STRING>>>>'
) IS NOT NULL;
GRANT EXECUTE ON FUNCTION procure.governance.validate_vendor TO `spn:procurement-agent`;
Package, register, alias (Champion/Challenger)
Each version bundles its SOP; the caller chooses which to invoke.
# register/register_agent_langchain_uc.py
import os, hashlib, mlflow
from mlflow.tracking import MlflowClient
from agent.sop_agent_langchain_chain import build_chain_from_sop
mlflow.set_registry_uri("databricks-uc") # UC-backed registry
def register_agent_version_from_uc_volume(
sop_path: str,
model_name: str = "procure.platform.agent", # catalog.schema.model
alias: str | None = None,
) -> str:
lc_chain = build_chain_from_sop(sop_path)
with mlflow.start_run() as run:
info = mlflow.langchain.log_model(
lc_model=lc_chain,
name="agent",
registered_model_name=model_name,
pip_requirements=[
"mlflow",
"langchain",
"langchain-openai",
"langchain-core",
"requests",
"PyYAML",
"databricks-sql-connector",
"cel-python",
"jsonschema",
],
tags={"sop.path": sop_path},
input_example={"company_name": "ACME Ltd", "amount": 750},
)
mv = info.model_version
if alias:
MlflowClient().set_registered_model_alias(model_name, alias, mv.version)
print(f"Registered {model_name} v{mv.version}" + (f" alias={alias}" if alias else ""))
return mv.version
if __name__ == "__main__":
register_agent_version_from_uc_volume(
sop_path="/Volumes/procure/policies/agent_sops/vendor_onboarding.yml",
alias="Champion"
)
# register_agent_version_from_uc_volume("/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Challenger")
Serve & invoke (caller picks the version)
Option A — single endpoint that points to @Champion; flip alias during rollout.
Option B — two endpoints:..._champion
and ..._challenger
import mlflow.langchain
lc = mlflow.langchain.load_model("models:/procure.platform.agent@Champion")
out = lc.invoke({"company_name": "ACME Ltd", "amount": 750})
You’ll get:
{
"status": "COMPLETED",
"result": { "...tool outputs..." },
"confidence": 0.86,
"plan": [
{ "executed": { "use": "LookupCompany", "args": {"name": "ACME Ltd"}, "ok": true } },
{ "executed": { "use": "SanctionsCheck", "args": {"entity_id": "..."}, "ok": true } }
]
}
How this maps to LoA (and why it scales)
- LoA‑1 — Scripted flow + AI gate. Keep your workflow; add one governed model decision; log traces.
- LoA‑2 — Assistant with bounded steps. Same agent, more steps; HITL by default.
- LoA‑3 — Plan → Act → Verify → Judge. UC tools only, CEL policy, MLflow judges. This is where most wins land today.
Ops you’ll actually run
- Governance: UC is the source of truth for tools & permissions; SOP travels with the model version (clean lineage).
- Observability: MLflow Tracing → OTel.
- A/B safely: flip aliases (Champion/Challenger) or run two endpoints.
SLOs: STP%, judge‑pass rate, approvals/100, p95 cost & latency, policy blocks, loop kills. Promote autonomy only when the data earns it.
Next step checklist
- Wrap reads/actions as UC functions; grant EXECUTE/endpoint perms to the agent SP.
- Draft the SOP (YAML) with planner limits, allow‑listed tools, memory, CEL gates, and judge threshold.
- Package the generic agent with the SOP as an artifact; register in UC; set @Champion.
- Create a serving endpoint for Champion (and optionally Challenger).
- Turn on MLflow Tracing → OTel; wire CloudEvents; watch SLOs.
- Iterate: update SOP → create new model version → move the alias. No code changes.
Ship it
Put the SOP in UC. Wrap the tools. Package the agent once. Register two versions with their own SOPs (@Champion
, @Challenger
). Point your endpoint at the alias, turn on MLflow→OTel, and watch STP%, judge‑pass, and p95 latency. If the numbers hold, expand scope; if they wobble, flip the alias back. Governed tools, explicit policy, small blast radius. This is production AI. Ship it.