A practical spectrum from scripted tasks to goal-seeking systems (LoA-1 → LoA-3). A practical spectrum from scripted tasks to goal-seeking systems (LoA-1 → LoA-3). If Agents & Process Automation 101 was the why and Levels of Autonomy (LoA) for Enterprise Automation was the dial, this is the wiring you can ship, now tightened to match how you’ll actually deploy: one generic agent, SOP packaged with each model version, and the caller chooses the version (e.g., @Champion/@Challenger or separate endpoints). Tools are Unity Catalog (UC) functions only. Policy lives in YAML. We finish with a standardised Assurance Gate and real telemetry. Agents & Process Automation 101 why Levels of Autonomy (LoA) for Enterprise Automation dial wiring you can ship one generic agent, SOP packaged with each model version caller chooses the version @Champion @Challenger Unity Catalog (UC) YAML Assurance Gate real telemetry This article is meant for a more technical audience compared to the previous ones; architects and engineers who want a blueprint they can ship easily. TL;DR TL;DR Agent ≠ SOP. Ship one generic agent; each model version bundles its SOP. Caller picks the version (alias/endpoint). Tools = UC functions. Reads → UC SQL FUNCTION. Actions → Model-as-Function (MLflow pyfunc + Serving). The agent never calls raw APIs or MCP directly. Planner on rails. LLM plans one step at a time using only allow-listed UC tools from the SOP, executes, observes, reflects. Guardrails without extra boxes. Structure/PII via UC FUNCTIONS; policy thresholds as CEL expressions inside the SOP. Assurance Gate. Verify (UC) → policy (CEL) → MLflow judges (groundedness/relevance/safety) → auto-finalize or HITL. Ops that scale. MLflow Tracing → OpenTelemetry; aliases for Champion/Challenger rollout; clean lineage because SOP is versioned with the model. Why this shape works Two things unlock LoA-3 in the real world: governed tools and explicit policy. Putting tools behind UC securables gives you EXECUTE/endpoint perms, audit, and lineage by default. Keeping policy in YAML (CEL expressions) makes change-control boring—in a good way. Thresholds move via PR, not code pushes. A single generic agent eliminates drift. Swapping SOPs becomes a version bump, not a new code path. SOP (YAML): single source of truth Agent ≠ SOP. Ship one generic agent; each model version bundles its SOP. Caller picks the version (alias/endpoint). Agent ≠ SOP. Ship one generic agent; each model version bundles its SOP. Caller picks the version (alias/endpoint). Agent ≠ SOP. Tools = UC functions. Reads → UC SQL FUNCTION. Actions → Model-as-Function (MLflow pyfunc + Serving). The agent never calls raw APIs or MCP directly. Tools = UC functions. Reads → UC SQL FUNCTION. Actions → Model-as-Function (MLflow pyfunc + Serving). The agent never calls raw APIs or MCP directly. Tools = UC functions Planner on rails. LLM plans one step at a time using only allow-listed UC tools from the SOP, executes, observes, reflects. Planner on rails. LLM plans one step at a time using only allow-listed UC tools from the SOP, executes, observes, reflects. Planner on rails Guardrails without extra boxes. Structure/PII via UC FUNCTIONS; policy thresholds as CEL expressions inside the SOP. Guardrails without extra boxes. Structure/PII via UC FUNCTIONS; policy thresholds as CEL expressions inside the SOP. Guardrails without extra boxes Assurance Gate. Verify (UC) → policy (CEL) → MLflow judges (groundedness/relevance/safety) → auto-finalize or HITL. Assurance Gate. Verify (UC) → policy (CEL) → MLflow judges (groundedness/relevance/safety) → auto-finalize or HITL. Assurance Gate Ops that scale. MLflow Tracing → OpenTelemetry; aliases for Champion/Challenger rollout; clean lineage because SOP is versioned with the model. Why this shape works Two things unlock LoA-3 in the real world: governed tools and explicit policy. Putting tools behind UC securables gives you EXECUTE/endpoint perms, audit, and lineage by default. Keeping policy in YAML (CEL expressions) makes change-control boring—in a good way. Thresholds move via PR, not code pushes. A single generic agent eliminates drift. Swapping SOPs becomes a version bump, not a new code path. SOP (YAML): single source of truth Ops that scale. MLflow Tracing → OpenTelemetry; aliases for Champion/Challenger rollout; clean lineage because SOP is versioned with the model. Ops that scale Champion Challenger Why this shape works Why this shape works Two things unlock LoA-3 in the real world: governed tools and explicit policy. governed tools explicit policy Putting tools behind UC securables gives you EXECUTE/endpoint perms, audit, and lineage by default. Keeping policy in YAML (CEL expressions) makes change-control boring—in a good way. Thresholds move via PR, not code pushes. A single generic agent eliminates drift. Swapping SOPs becomes a version bump, not a new code path. Putting tools behind UC securables gives you EXECUTE/endpoint perms, audit, and lineage by default. UC securables EXECUTE Keeping policy in YAML (CEL expressions) makes change-control boring—in a good way. Thresholds move via PR, not code pushes. YAML (CEL expressions) A single generic agent eliminates drift. Swapping SOPs becomes a version bump, not a new code path. single generic agent SOP (YAML): single source of truth SOP (YAML): single source of truth Store SOPs in a UC Volume and bundle one with each model version. Keep it contract‑first: planner limits, allow‑listed tools, memory, CEL gates, judges. memory name: "vendor_onboarding" goal: "Onboard vendor if compliant" loa: 3 planner: model: "databricks-gpt-oss-20b" # or "databricks-gpt-oss-120b" base_url: "https://<workspace-host>/serving-endpoints/databricks-gpt-oss-20b/invocations" temperature: 0.2 max_steps: 6 max_reflections: 2 never_guess: true system_prompt: | You are a control-plane planner. At each turn propose ONE next step as STRICT JSON: {"use": <tool_name>, "args": {...}, "stop": true|false, "reason": "<why>"} Use ONLY the allowed tools and their schemas. If unsafe/unknown, set stop=true. If the last tool failed: • Retry only when idempotent and policy allows; otherwise propose an alternative or stop ("HITL"). tools: - name: LookupCompany kind: sql_function uc_fn: "procure.tools.lookup_company" description: "Resolve a company by legal name; returns {id,legal_name,country}." input_schema: { type: object, properties: { name: { type: string } }, required: [name] } - name: SanctionsCheck kind: model_function uc_model: "procure.tools.sanctions_check" uc_endpoint: "https://<workspace>/serving-endpoints/sanctions_check/invocations" description: "KYC screen; returns {sanctions:'clear'|'flag', evidence:[uri...]}" input_schema: { type: object, properties: { entity_id: { type: string } }, required: [entity_id] } - name: CreateVendor kind: model_function uc_model: "procure.tools.create_vendor" uc_endpoint: "https://<workspace>/serving-endpoints/create_vendor/invocations" description: "Create vendor in ERP; args: {company:{...}}; returns created record." input_schema: { type: object, properties: { company: { type: object } }, required: [company] } guardrails: uc_functions: - "procure.governance.validate_vendor" # structural/PII check thresholds: amount_max: 1000 gates_cel: - if: "input.amount > thresholds.amount_max" then: "require_hitl" - if: "size(ctx.SanctionsCheck.evidence) == 0" then: "fail_and_retry" failure: default: max_retries: 1 backoff_ms: [250] retryable: ["timeout", "rate limit", "429", "temporarily unavailable", "503"] non_retryable: ["permission", "invalid argument", "validation", "not found"] per_tool: CreateVendor: max_retries: 0 LookupCompany: max_retries: 2 backoff_ms: [250, 500] judging: pass_threshold: 0.82 rubric: ["policy_adherence", "source_citations", "numerical_consistency"] observability: traces: "mlflow+otel" audit: "cloudevents.v1" name: "vendor_onboarding" goal: "Onboard vendor if compliant" loa: 3 planner: model: "databricks-gpt-oss-20b" # or "databricks-gpt-oss-120b" base_url: "https://<workspace-host>/serving-endpoints/databricks-gpt-oss-20b/invocations" temperature: 0.2 max_steps: 6 max_reflections: 2 never_guess: true system_prompt: | You are a control-plane planner. At each turn propose ONE next step as STRICT JSON: {"use": <tool_name>, "args": {...}, "stop": true|false, "reason": "<why>"} Use ONLY the allowed tools and their schemas. If unsafe/unknown, set stop=true. If the last tool failed: • Retry only when idempotent and policy allows; otherwise propose an alternative or stop ("HITL"). tools: - name: LookupCompany kind: sql_function uc_fn: "procure.tools.lookup_company" description: "Resolve a company by legal name; returns {id,legal_name,country}." input_schema: { type: object, properties: { name: { type: string } }, required: [name] } - name: SanctionsCheck kind: model_function uc_model: "procure.tools.sanctions_check" uc_endpoint: "https://<workspace>/serving-endpoints/sanctions_check/invocations" description: "KYC screen; returns {sanctions:'clear'|'flag', evidence:[uri...]}" input_schema: { type: object, properties: { entity_id: { type: string } }, required: [entity_id] } - name: CreateVendor kind: model_function uc_model: "procure.tools.create_vendor" uc_endpoint: "https://<workspace>/serving-endpoints/create_vendor/invocations" description: "Create vendor in ERP; args: {company:{...}}; returns created record." input_schema: { type: object, properties: { company: { type: object } }, required: [company] } guardrails: uc_functions: - "procure.governance.validate_vendor" # structural/PII check thresholds: amount_max: 1000 gates_cel: - if: "input.amount > thresholds.amount_max" then: "require_hitl" - if: "size(ctx.SanctionsCheck.evidence) == 0" then: "fail_and_retry" failure: default: max_retries: 1 backoff_ms: [250] retryable: ["timeout", "rate limit", "429", "temporarily unavailable", "503"] non_retryable: ["permission", "invalid argument", "validation", "not found"] per_tool: CreateVendor: max_retries: 0 LookupCompany: max_retries: 2 backoff_ms: [250, 500] judging: pass_threshold: 0.82 rubric: ["policy_adherence", "source_citations", "numerical_consistency"] observability: traces: "mlflow+otel" audit: "cloudevents.v1" Why this works: UC‑governed tools + YAML policy + a single generic agent—you get EXECUTE/endpoint perms, audit/lineage by default; thresholds move via PR, not code; and swapping SOPs is a version bump, not a new code path. UC‑governed tools + YAML policy + a single generic agent Agent core (LangChain tool‑calling; Assurance Gate at the end) Here’s the on-rails core the rest hangs on: a single MLflow LangChain model, built from an SOP (YAML) bundled with the version, drives a LangChain tool-calling agent that’s constrained to an allow-list of Unity Catalog tools. Inputs to each tool are typed (JSON Schema → Pydantic), and the loop runs plan → act → observe, with no off-catalog calls. Before anything commits, an Assurance Gate fires: UC structural/PII checks, then CEL policy thresholds from the SOP, then MLflow judges (groundedness/relevance/safety). Pass and it auto-finalizes; miss and it routes to HITL. MLflow LangChain model LangChain tool-calling agent allow-list of Unity Catalog tools typed plan → act → observe Assurance Gate UC structural/PII checks CEL policy thresholds MLflow judges HITL # agent/sop_agent_langchain_chain.py # LangChain chain that: # 1) builds a tool-calling AgentExecutor from SOP # 2) runs it # 3) applies the Assurance Gate (UC guardrail fn → CEL → MLflow judges) from __future__ import annotations import os, json, yaml, time, requests from typing import Any, Dict, List from collections import defaultdict from databricks import sql from jsonschema import validate as js_validate from celpy import Environment as CELEnv from pydantic import BaseModel, create_model from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import StructuredTool from langchain_core.runnables import RunnableLambda from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_openai import ChatOpenAI from mlflow.genai.judges import is_grounded, is_safe, is_context_relevant # ───────────── UC helpers ───────────── def _sql_connect(): return sql.connect( server_hostname=os.getenv("DB_HOST"), http_path=os.getenv("DB_WAREHOUSE"), access_token=os.getenv("DB_TOKEN"), ) def call_uc_sql_function(fn_path: str, args: Dict[str, Any]) -> Any: with _sql_connect() as c: cur = c.cursor() placeholders = ",".join(["?"] * len(args)) cur.execute(f"SELECT {fn_path}({placeholders})", tuple(args.values())) return cur.fetchone()[0] def call_uc_model(endpoint_url: str, inputs: Dict[str, Any]) -> Any: r = requests.post( endpoint_url, headers={"Authorization": f"Bearer {os.getenv('DATABRICKS_TOKEN')}", "Content-Type": "application/json"}, json={"inputs": inputs}, timeout=60) r.raise_for_status() return r.json() def uc_validate_payload(fn_path: str, payload_json: str) -> bool: with _sql_connect() as c: cur = c.cursor() cur.execute(f"SELECT {fn_path}(?)", (payload_json,)) return bool(cur.fetchone()[0]) # ───────────── CEL + judges ───────────── def cel_decision(gates: List[dict], cel_ctx: Dict[str, Any]) -> str: env = CELEnv() for g in gates: if env.compile(g["if"]).program().evaluate(cel_ctx): return g["then"] return "allow" def judge_mlflow(proposal: Dict[str, Any], pass_q: float, request: str, response: str, context: Any) -> Dict[str, Any]: def yes(x): v = getattr(getattr(x, "feedback", None), "value", "no") return 1.0 if str(v).lower() == "yes" else 0.0 parts = { "groundedness": yes(is_grounded(request=request, response=response, context=context)), "context_relevance": yes(is_context_relevant(request=request, context=context)), "safety": yes(is_safe(content=response)), } score = sum(parts.values()) / max(1, len(parts)) return {"score": score, "verdict": "pass" if score >= float(pass_q) else "fail"} # ───────────── JSON Schema → Pydantic for tool signatures ───────────── _JSON_TO_PYD = {"string": str, "number": float, "integer": int, "boolean": bool, "object": Dict[str, Any], "array": List[Any]} def _schema_to_pydantic(name: str, schema: Dict[str, Any]) -> BaseModel: props, req = schema.get("properties", {}) or {}, set(schema.get("required", []) or []) fields = {k: (_JSON_TO_PYD.get(spec.get("type", "object"), Dict[str, Any]), (... if k in req else None)) for k, spec in props.items()} or {"payload": (Dict[str, Any], ...)} return create_model(name, **fields) # ───────────── Tools with SOP-driven retry/backoff + error observation ───────────── def _make_tool(spec: Dict[str, Any], failure_cfg: Dict[str, Any], attempts: defaultdict) -> StructuredTool: ArgsModel = _schema_to_pydantic(f"{spec['name']}Args", spec["input_schema"]) per_tool = (failure_cfg.get("per_tool", {}) or {}).get(spec["name"], {}) default = failure_cfg.get("default", {}) or {} max_retries = int(per_tool.get("max_retries", default.get("max_retries", 0))) backoff_ms = list(per_tool.get("backoff_ms", default.get("backoff_ms", []))) retryable = [s.lower() for s in default.get("retryable", [])] nonretry = [s.lower() for s in default.get("non_retryable", [])] def _run(**kwargs): js_validate(kwargs, spec["input_schema"]) attempt = 0 while True: try: out = call_uc_sql_function(spec["uc_fn"], kwargs) if spec["kind"] == "sql_function" \ else call_uc_model(spec["uc_endpoint"], kwargs) attempts[(spec["name"], "ok")] += 1 return out if isinstance(out, (dict, list, str, int, float, bool)) \ else json.loads(json.dumps(out, default=str)) except Exception as e: msg = str(e) low = msg.lower() attempts[(spec["name"], "err")] += 1 # 1) hard stop (non-retryable) → return error observation if any(s in low for s in nonretry): return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": False, "attempts": attempt, "policy_max_retries": max_retries }} # 2) retries exhausted → return error observation if attempt >= max_retries: return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": any(s in low for s in retryable), "attempts": attempt, "policy_max_retries": max_retries }} # 3) looks retryable? backoff a bit and try again if retryable and any(s in low for s in retryable): delay = backoff_ms[attempt] if attempt < len(backoff_ms) else 250 time.sleep(min(delay, 1000) / 1000.0) attempt += 1 continue # 4) not clearly retryable → return error observation return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": False, "attempts": attempt, "policy_max_retries": max_retries }} return StructuredTool.from_function( name=spec["name"], description=spec.get("description", f"UC tool {spec['name']}"), args_schema=ArgsModel, func=_run, return_direct=False, ) def _build_agent_executor(sop: Dict[str, Any]) -> AgentExecutor: # tools attempts = defaultdict(int) failure_cfg = sop.get("failure", {"default": {"max_retries": 0}}) tools = [_make_tool(t, failure_cfg, attempts) for t in sop["tools"]] # planner LLM (Databricks GPT-OSS via OpenAI-compatible endpoint) llm = ChatOpenAI( model=sop["planner"]["model"], base_url=sop["planner"]["base_url"], api_key=os.getenv("DATABRICKS_TOKEN") or os.getenv("LLM_API_KEY"), temperature=float(sop["planner"].get("temperature", 0.2)), ) prompt = ChatPromptTemplate.from_messages([ ("system", sop["planner"]["system_prompt"].strip()), ("human", "{input}"), MessagesPlaceholder("agent_scratchpad"), ]) agent = create_tool_calling_agent(llm, tools, prompt) return AgentExecutor( agent=agent, tools=tools, max_iterations=int(sop["planner"].get("max_steps", 6)), return_intermediate_steps=True, handle_parsing_errors=True, verbose=False, ) # ───────────── Chain factory (Runner) ───────────── def build_chain_from_sop(sop_path: str): sop = yaml.safe_load(open(sop_path)) executor = _build_agent_executor(sop) def _invoke_agent(user_input: Dict[str, Any]): # 1) invoke agent block = { "goal": sop["goal"], "constraints": {"never_guess": bool(sop["planner"].get("never_guess", True)), "max_steps": int(sop["planner"].get("max_steps", 6))}, "input": dict(user_input), "tools": [t["name"] for t in sop["tools"]], } res = executor.invoke({"input": json.dumps(block, ensure_ascii=False)}) # 2) collect outputs (latest per tool) ctx_map: Dict[str, Any] = {} for action, output in res.get("intermediate_steps", []): name = getattr(action, "tool", "tool") try: ctx_map[name] = json.loads(output) if isinstance(output, str) else output except Exception: ctx_map[name] = output # 3) Assurance Gate proposal = {"input": dict(user_input), "ctx": ctx_map} payload = json.dumps(proposal) for fn in sop["guardrails"]["uc_functions"]: if not uc_validate_payload(fn, payload): return {"status": "FAILED_GUARDRAIL", "reason": f"UC guardrail failed: {fn}"} decision = cel_decision( sop["guardrails"]["gates_cel"], {"input": user_input, "ctx": ctx_map, "thresholds": sop["guardrails"]["thresholds"]} ) judgment = judge_mlflow( proposal=proposal, pass_q=float(sop["judging"]["pass_threshold"]), request=sop["goal"], response=payload, context=ctx_map.get("SanctionsCheck", {}).get("evidence", []), ) if decision in ("fail_and_retry", "require_hitl") or judgment["verdict"] == "fail": return {"status": "REQUIRES_HITL", "confidence": judgment["score"], "result": ctx_map} return {"status": "COMPLETED", "confidence": judgment["score"], "result": ctx_map} # LangChain expects a Runnable; wrap the callable return RunnableLambda(_invoke_agent) # agent/sop_agent_langchain_chain.py # LangChain chain that: # 1) builds a tool-calling AgentExecutor from SOP # 2) runs it # 3) applies the Assurance Gate (UC guardrail fn → CEL → MLflow judges) from __future__ import annotations import os, json, yaml, time, requests from typing import Any, Dict, List from collections import defaultdict from databricks import sql from jsonschema import validate as js_validate from celpy import Environment as CELEnv from pydantic import BaseModel, create_model from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.tools import StructuredTool from langchain_core.runnables import RunnableLambda from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_openai import ChatOpenAI from mlflow.genai.judges import is_grounded, is_safe, is_context_relevant # ───────────── UC helpers ───────────── def _sql_connect(): return sql.connect( server_hostname=os.getenv("DB_HOST"), http_path=os.getenv("DB_WAREHOUSE"), access_token=os.getenv("DB_TOKEN"), ) def call_uc_sql_function(fn_path: str, args: Dict[str, Any]) -> Any: with _sql_connect() as c: cur = c.cursor() placeholders = ",".join(["?"] * len(args)) cur.execute(f"SELECT {fn_path}({placeholders})", tuple(args.values())) return cur.fetchone()[0] def call_uc_model(endpoint_url: str, inputs: Dict[str, Any]) -> Any: r = requests.post( endpoint_url, headers={"Authorization": f"Bearer {os.getenv('DATABRICKS_TOKEN')}", "Content-Type": "application/json"}, json={"inputs": inputs}, timeout=60) r.raise_for_status() return r.json() def uc_validate_payload(fn_path: str, payload_json: str) -> bool: with _sql_connect() as c: cur = c.cursor() cur.execute(f"SELECT {fn_path}(?)", (payload_json,)) return bool(cur.fetchone()[0]) # ───────────── CEL + judges ───────────── def cel_decision(gates: List[dict], cel_ctx: Dict[str, Any]) -> str: env = CELEnv() for g in gates: if env.compile(g["if"]).program().evaluate(cel_ctx): return g["then"] return "allow" def judge_mlflow(proposal: Dict[str, Any], pass_q: float, request: str, response: str, context: Any) -> Dict[str, Any]: def yes(x): v = getattr(getattr(x, "feedback", None), "value", "no") return 1.0 if str(v).lower() == "yes" else 0.0 parts = { "groundedness": yes(is_grounded(request=request, response=response, context=context)), "context_relevance": yes(is_context_relevant(request=request, context=context)), "safety": yes(is_safe(content=response)), } score = sum(parts.values()) / max(1, len(parts)) return {"score": score, "verdict": "pass" if score >= float(pass_q) else "fail"} # ───────────── JSON Schema → Pydantic for tool signatures ───────────── _JSON_TO_PYD = {"string": str, "number": float, "integer": int, "boolean": bool, "object": Dict[str, Any], "array": List[Any]} def _schema_to_pydantic(name: str, schema: Dict[str, Any]) -> BaseModel: props, req = schema.get("properties", {}) or {}, set(schema.get("required", []) or []) fields = {k: (_JSON_TO_PYD.get(spec.get("type", "object"), Dict[str, Any]), (... if k in req else None)) for k, spec in props.items()} or {"payload": (Dict[str, Any], ...)} return create_model(name, **fields) # ───────────── Tools with SOP-driven retry/backoff + error observation ───────────── def _make_tool(spec: Dict[str, Any], failure_cfg: Dict[str, Any], attempts: defaultdict) -> StructuredTool: ArgsModel = _schema_to_pydantic(f"{spec['name']}Args", spec["input_schema"]) per_tool = (failure_cfg.get("per_tool", {}) or {}).get(spec["name"], {}) default = failure_cfg.get("default", {}) or {} max_retries = int(per_tool.get("max_retries", default.get("max_retries", 0))) backoff_ms = list(per_tool.get("backoff_ms", default.get("backoff_ms", []))) retryable = [s.lower() for s in default.get("retryable", [])] nonretry = [s.lower() for s in default.get("non_retryable", [])] def _run(**kwargs): js_validate(kwargs, spec["input_schema"]) attempt = 0 while True: try: out = call_uc_sql_function(spec["uc_fn"], kwargs) if spec["kind"] == "sql_function" \ else call_uc_model(spec["uc_endpoint"], kwargs) attempts[(spec["name"], "ok")] += 1 return out if isinstance(out, (dict, list, str, int, float, bool)) \ else json.loads(json.dumps(out, default=str)) except Exception as e: msg = str(e) low = msg.lower() attempts[(spec["name"], "err")] += 1 # 1) hard stop (non-retryable) → return error observation if any(s in low for s in nonretry): return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": False, "attempts": attempt, "policy_max_retries": max_retries }} # 2) retries exhausted → return error observation if attempt >= max_retries: return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": any(s in low for s in retryable), "attempts": attempt, "policy_max_retries": max_retries }} # 3) looks retryable? backoff a bit and try again if retryable and any(s in low for s in retryable): delay = backoff_ms[attempt] if attempt < len(backoff_ms) else 250 time.sleep(min(delay, 1000) / 1000.0) attempt += 1 continue # 4) not clearly retryable → return error observation return {"_error": { "tool": spec["name"], "message": msg[:500], "retryable": False, "attempts": attempt, "policy_max_retries": max_retries }} return StructuredTool.from_function( name=spec["name"], description=spec.get("description", f"UC tool {spec['name']}"), args_schema=ArgsModel, func=_run, return_direct=False, ) def _build_agent_executor(sop: Dict[str, Any]) -> AgentExecutor: # tools attempts = defaultdict(int) failure_cfg = sop.get("failure", {"default": {"max_retries": 0}}) tools = [_make_tool(t, failure_cfg, attempts) for t in sop["tools"]] # planner LLM (Databricks GPT-OSS via OpenAI-compatible endpoint) llm = ChatOpenAI( model=sop["planner"]["model"], base_url=sop["planner"]["base_url"], api_key=os.getenv("DATABRICKS_TOKEN") or os.getenv("LLM_API_KEY"), temperature=float(sop["planner"].get("temperature", 0.2)), ) prompt = ChatPromptTemplate.from_messages([ ("system", sop["planner"]["system_prompt"].strip()), ("human", "{input}"), MessagesPlaceholder("agent_scratchpad"), ]) agent = create_tool_calling_agent(llm, tools, prompt) return AgentExecutor( agent=agent, tools=tools, max_iterations=int(sop["planner"].get("max_steps", 6)), return_intermediate_steps=True, handle_parsing_errors=True, verbose=False, ) # ───────────── Chain factory (Runner) ───────────── def build_chain_from_sop(sop_path: str): sop = yaml.safe_load(open(sop_path)) executor = _build_agent_executor(sop) def _invoke_agent(user_input: Dict[str, Any]): # 1) invoke agent block = { "goal": sop["goal"], "constraints": {"never_guess": bool(sop["planner"].get("never_guess", True)), "max_steps": int(sop["planner"].get("max_steps", 6))}, "input": dict(user_input), "tools": [t["name"] for t in sop["tools"]], } res = executor.invoke({"input": json.dumps(block, ensure_ascii=False)}) # 2) collect outputs (latest per tool) ctx_map: Dict[str, Any] = {} for action, output in res.get("intermediate_steps", []): name = getattr(action, "tool", "tool") try: ctx_map[name] = json.loads(output) if isinstance(output, str) else output except Exception: ctx_map[name] = output # 3) Assurance Gate proposal = {"input": dict(user_input), "ctx": ctx_map} payload = json.dumps(proposal) for fn in sop["guardrails"]["uc_functions"]: if not uc_validate_payload(fn, payload): return {"status": "FAILED_GUARDRAIL", "reason": f"UC guardrail failed: {fn}"} decision = cel_decision( sop["guardrails"]["gates_cel"], {"input": user_input, "ctx": ctx_map, "thresholds": sop["guardrails"]["thresholds"]} ) judgment = judge_mlflow( proposal=proposal, pass_q=float(sop["judging"]["pass_threshold"]), request=sop["goal"], response=payload, context=ctx_map.get("SanctionsCheck", {}).get("evidence", []), ) if decision in ("fail_and_retry", "require_hitl") or judgment["verdict"] == "fail": return {"status": "REQUIRES_HITL", "confidence": judgment["score"], "result": ctx_map} return {"status": "COMPLETED", "confidence": judgment["score"], "result": ctx_map} # LangChain expects a Runnable; wrap the callable return RunnableLambda(_invoke_agent) Wrap tools as UC functions Keep reads in SQL; wrap actions as Model‑as‑Function (MLflow pyfunc + Serving). Grant EXECUTE/endpoint perms to the agent SP. Return JSON from SQL for predictable Python handling. Return JSON from SQL Read (JSON out): Read (JSON out): CREATE OR REPLACE FUNCTION procure.tools.lookup_company(name STRING) RETURNS STRING RETURN to_json(named_struct( 'id', id, 'legal_name', legal_name, 'country', country )) FROM ( SELECT id, legal_name, country FROM procure.refdata.companies WHERE lower(legal_name) = lower(name) LIMIT 1 ); GRANT EXECUTE ON FUNCTION procure.tools.lookup_company TO `spn:procurement-agent`; CREATE OR REPLACE FUNCTION procure.tools.lookup_company(name STRING) RETURNS STRING RETURN to_json(named_struct( 'id', id, 'legal_name', legal_name, 'country', country )) FROM ( SELECT id, legal_name, country FROM procure.refdata.companies WHERE lower(legal_name) = lower(name) LIMIT 1 ); GRANT EXECUTE ON FUNCTION procure.tools.lookup_company TO `spn:procurement-agent`; Retrieval memory (policy recall): Retrieval memory (policy recall): CREATE OR REPLACE FUNCTION procure.knowledge.recall_policy(query STRING) -- replace with vector search in prod RETURNS STRING RETURN to_json(named_struct('passages', collect_list(named_struct('id', id, 'text', passage)) )) FROM ( SELECT id, passage FROM procure.knowledge.policy_index WHERE contains(passage, query) LIMIT 5 ); GRANT EXECUTE ON FUNCTION procure.knowledge.recall_policy TO `spn:procurement-agent`; CREATE OR REPLACE FUNCTION procure.knowledge.recall_policy(query STRING) -- replace with vector search in prod RETURNS STRING RETURN to_json(named_struct('passages', collect_list(named_struct('id', id, 'text', passage)) )) FROM ( SELECT id, passage FROM procure.knowledge.policy_index WHERE contains(passage, query) LIMIT 5 ); GRANT EXECUTE ON FUNCTION procure.knowledge.recall_policy TO `spn:procurement-agent`; Guardrail (structural check): Guardrail (structural check): CREATE OR REPLACE FUNCTION procure.governance.validate_vendor(payload STRING) RETURNS BOOLEAN RETURN from_json( payload, 'input STRUCT<amount DOUBLE>, ctx STRUCT<SanctionsCheck STRUCT<evidence ARRAY<STRING>>>>' ) IS NOT NULL; GRANT EXECUTE ON FUNCTION procure.governance.validate_vendor TO `spn:procurement-agent`; CREATE OR REPLACE FUNCTION procure.governance.validate_vendor(payload STRING) RETURNS BOOLEAN RETURN from_json( payload, 'input STRUCT<amount DOUBLE>, ctx STRUCT<SanctionsCheck STRUCT<evidence ARRAY<STRING>>>>' ) IS NOT NULL; GRANT EXECUTE ON FUNCTION procure.governance.validate_vendor TO `spn:procurement-agent`; Package, register, alias (Champion/Challenger) Each version bundles its SOP; the caller chooses which to invoke. # register/register_agent_langchain_uc.py import os, hashlib, mlflow from mlflow.tracking import MlflowClient from agent.sop_agent_langchain_chain import build_chain_from_sop mlflow.set_registry_uri("databricks-uc") # UC-backed registry def register_agent_version_from_uc_volume( sop_path: str, model_name: str = "procure.platform.agent", # catalog.schema.model alias: str | None = None, ) -> str: lc_chain = build_chain_from_sop(sop_path) with mlflow.start_run() as run: info = mlflow.langchain.log_model( lc_model=lc_chain, name="agent", registered_model_name=model_name, pip_requirements=[ "mlflow", "langchain", "langchain-openai", "langchain-core", "requests", "PyYAML", "databricks-sql-connector", "cel-python", "jsonschema", ], tags={"sop.path": sop_path}, input_example={"company_name": "ACME Ltd", "amount": 750}, ) mv = info.model_version if alias: MlflowClient().set_registered_model_alias(model_name, alias, mv.version) print(f"Registered {model_name} v{mv.version}" + (f" alias={alias}" if alias else "")) return mv.version if __name__ == "__main__": register_agent_version_from_uc_volume( sop_path="/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Champion" ) # register_agent_version_from_uc_volume("/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Challenger") # register/register_agent_langchain_uc.py import os, hashlib, mlflow from mlflow.tracking import MlflowClient from agent.sop_agent_langchain_chain import build_chain_from_sop mlflow.set_registry_uri("databricks-uc") # UC-backed registry def register_agent_version_from_uc_volume( sop_path: str, model_name: str = "procure.platform.agent", # catalog.schema.model alias: str | None = None, ) -> str: lc_chain = build_chain_from_sop(sop_path) with mlflow.start_run() as run: info = mlflow.langchain.log_model( lc_model=lc_chain, name="agent", registered_model_name=model_name, pip_requirements=[ "mlflow", "langchain", "langchain-openai", "langchain-core", "requests", "PyYAML", "databricks-sql-connector", "cel-python", "jsonschema", ], tags={"sop.path": sop_path}, input_example={"company_name": "ACME Ltd", "amount": 750}, ) mv = info.model_version if alias: MlflowClient().set_registered_model_alias(model_name, alias, mv.version) print(f"Registered {model_name} v{mv.version}" + (f" alias={alias}" if alias else "")) return mv.version if __name__ == "__main__": register_agent_version_from_uc_volume( sop_path="/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Champion" ) # register_agent_version_from_uc_volume("/Volumes/procure/policies/agent_sops/vendor_onboarding.yml", alias="Challenger") Serve & invoke (caller picks the version) Option A — single endpoint that points to @Champion; flip alias during rollout.Option B — two endpoints:..._champion and ..._challenger ..._champion ..._challenger import mlflow.langchain lc = mlflow.langchain.load_model("models:/procure.platform.agent@Champion") out = lc.invoke({"company_name": "ACME Ltd", "amount": 750}) import mlflow.langchain lc = mlflow.langchain.load_model("models:/procure.platform.agent@Champion") out = lc.invoke({"company_name": "ACME Ltd", "amount": 750}) You’ll get: You’ll get: { "status": "COMPLETED", "result": { "...tool outputs..." }, "confidence": 0.86, "plan": [ { "executed": { "use": "LookupCompany", "args": {"name": "ACME Ltd"}, "ok": true } }, { "executed": { "use": "SanctionsCheck", "args": {"entity_id": "..."}, "ok": true } } ] } { "status": "COMPLETED", "result": { "...tool outputs..." }, "confidence": 0.86, "plan": [ { "executed": { "use": "LookupCompany", "args": {"name": "ACME Ltd"}, "ok": true } }, { "executed": { "use": "SanctionsCheck", "args": {"entity_id": "..."}, "ok": true } } ] } How this maps to LoA (and why it scales) LoA‑1 — Scripted flow + AI gate. Keep your workflow; add one governed model decision; log traces. LoA‑2 — Assistant with bounded steps. Same agent, more steps; HITL by default. LoA‑3 — Plan → Act → Verify → Judge. UC tools only, CEL policy, MLflow judges. This is where most wins land today. LoA‑1 — Scripted flow + AI gate. Keep your workflow; add one governed model decision; log traces. LoA‑1 — Scripted flow + AI gate. LoA‑2 — Assistant with bounded steps. Same agent, more steps; HITL by default. LoA‑2 — Assistant with bounded steps. LoA‑3 — Plan → Act → Verify → Judge. UC tools only, CEL policy, MLflow judges. This is where most wins land today. LoA‑3 — Plan → Act → Verify → Judge. Ops you’ll actually run Governance: UC is the source of truth for tools & permissions; SOP travels with the model version (clean lineage). Observability: MLflow Tracing → OTel. A/B safely: flip aliases (Champion/Challenger) or run two endpoints. Governance: UC is the source of truth for tools & permissions; SOP travels with the model version (clean lineage). Governance: Observability: MLflow Tracing → OTel. Observability: A/B safely: flip aliases (Champion/Challenger) or run two endpoints. A/B safely: SLOs: STP%, judge‑pass rate, approvals/100, p95 cost & latency, policy blocks, loop kills. Promote autonomy only when the data earns it. SLOs: Next step checklist Wrap reads/actions as UC functions; grant EXECUTE/endpoint perms to the agent SP. Draft the SOP (YAML) with planner limits, allow‑listed tools, memory, CEL gates, and judge threshold. Package the generic agent with the SOP as an artifact; register in UC; set @Champion. Create a serving endpoint for Champion (and optionally Challenger). Turn on MLflow Tracing → OTel; wire CloudEvents; watch SLOs. Iterate: update SOP → create new model version → move the alias. No code changes. Wrap reads/actions as UC functions; grant EXECUTE/endpoint perms to the agent SP. Draft the SOP (YAML) with planner limits, allow‑listed tools, memory, CEL gates, and judge threshold. memory Package the generic agent with the SOP as an artifact; register in UC; set @Champion. Create a serving endpoint for Champion (and optionally Challenger). Turn on MLflow Tracing → OTel; wire CloudEvents; watch SLOs. Iterate: update SOP → create new model version → move the alias. No code changes. Ship it Ship it Put the SOP in UC. Wrap the tools. Package the agent once. Register two versions with their own SOPs (@Champion, @Challenger). Point your endpoint at the alias, turn on MLflow→OTel, and watch STP%, judge‑pass, and p95 latency. If the numbers hold, expand scope; if they wobble, flip the alias back. Governed tools, explicit policy, small blast radius. This is production AI. Ship it. @Champion @Challenger This is production AI. Ship it.