Amazon Bedrock 代理人就像为您的 AWS 基础设施提供智能助理 - 他们可以推理,决定下一步要做什么,并使用 Lambda 函数启动操作。 在本文中,我将展示如何构建一个主管代理,该代理将多个 AWS Lambdas 编排为: List EC2 instances, Fetch their CPU metrics from CloudWatch, Combine both results intelligently — all without the agent ever calling AWS APIs directly. By the end, you’ll understand how Bedrock Agents work, how to use action groups, and how to chain Lambdas through a supervisor function — a clean, scalable pattern for multi-step automation. 让我们检查图表和其他例子,什么是代理,以获得更好的可视性和理解: 用户给Bedrock代理人(1)打电话,做一些任务,比如说“你存储了多少电视机?”代理人通过定义的提示知道,如果问题与检查库存状态有关,他们需要拨打(2)“数据库”行动组(3,AG)。 让我们来看看另一个例子: 每个代理人可以有多个行动组,例如,我们想要获取有关某些AWS资源的信息,比如列出所有ECS任务,逻辑与前一个相同。 更多例子: 我们添加了另一个 AG 与 EKS 操作组. 正如您在这里所看到的,每个操作组可以有多个 lambda 函数来提出请求. 在本示例中,它正在列出和删除某些现有的 K8S 集群中的资源。 行动组和 lambda 功能可以具有任何功能,即使您需要从第三方 API 获取数据,以获取天气数据或机票可用性。 我希望它现在有点清晰,让我们回到我们的监督代理设置: 在 AWS 控制台中,打开 Bedrock → 代理 → 创建代理 给它一个名字,并创建 一旦创建,你可以更改模型,如果你想,或保持克劳德的默认。 添加代理人的描述和指示.我们将在下一步创建的行动组 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: ec2: list_instances → returns instance list + instanceIds 规则: Never call AWS APIs directly. For EC2: Call ec2__list_instances 永远在行动之前使用“思考”。 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: ec2: list_instances → 返回实例列表 + instanceIds 规则: 永远不要直接拨打 AWS API。 For EC2: Call ec2__list_instances 永远在行动之前使用“思考”。 注: ec2 - 行动组名称 list_instances - 函数名称,正如我之前提到的 - 您可以每个操作组有多个函数 然后点击“保存” 顶部的“准备”按钮,一旦保存,准备将激活。 滚动到行动组 → 添加 名称 - EC2. 行动组. 召唤 - 创建一个新的 lambda 函数,其中 必须与我们在代理指示中定义的相同 list_instances 添加行动组名称和描述,单击创建,然后再次“保存”和“准备”。 转到 lambda 函数,Bedrock 创建了该函数名称中的 EC2 前缀,并添加了以下代码: import logging from typing import Dict, Any from http import HTTPStatus import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) ec2_client = boto3.client('ec2') def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: """ AWS Lambda handler for processing Bedrock agent requests related to EC2 instances. Supports: - Listing all EC2 instances - Describing a specific instance by ID """ try: action_group = event['actionGroup'] function = event['function'] message_version = event.get('messageVersion', 1) parameters = event.get('parameters', []) response_text = "" if function == "list_instances": # List all EC2 instances instances = ec2_client.describe_instances() instance_list = [] for reservation in instances['Reservations']: for instance in reservation['Instances']: instance_list.append({ 'InstanceId': instance.get('InstanceId'), 'State': instance.get('State', {}).get('Name'), 'InstanceType': instance.get('InstanceType'), 'PrivateIpAddress': instance.get('PrivateIpAddress', 'N/A'), 'PublicIpAddress': instance.get('PublicIpAddress', 'N/A') }) response_text = f"Found {len(instance_list)} EC2 instance(s): {instance_list}" elif function == "describe_instance": # Expect a parameter with the instance ID instance_id_param = next((p for p in parameters if p['name'] == 'instanceId'), None) if not instance_id_param: raise KeyError("Missing required parameter: instanceId") instance_id = instance_id_param['value'] result = ec2_client.describe_instances(InstanceIds=[instance_id]) instance = result['Reservations'][0]['Instances'][0] response_text = ( f"Instance {instance_id} details: " f"State={instance['State']['Name']}, " f"Type={instance['InstanceType']}, " f"Private IP={instance.get('PrivateIpAddress', 'N/A')}, " f"Public IP={instance.get('PublicIpAddress', 'N/A')}" ) else: response_text = f"Unknown function '{function}' requested." # Format Bedrock agent response response_body = { 'TEXT': { 'body': response_text } } action_response = { 'actionGroup': action_group, 'function': function, 'functionResponse': { 'responseBody': response_body } } response = { 'response': action_response, 'messageVersion': message_version } logger.info('Response: %s', response) return response except KeyError as e: logger.error('Missing required field: %s', str(e)) return { 'statusCode': HTTPStatus.BAD_REQUEST, 'body': f'Error: {str(e)}' } except Exception as e: logger.error('Unexpected error: %s', str(e)) return { 'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR, 'body': f'Internal server error: {str(e)}' } 注意:函数的响应必须以Bedrock特定的格式,详细信息可以在文档中找到: https://docs.aws.amazon.com/bedrock/latest/userguide/agents-lambda.html 更新函数代码后,转到 Configuration → permissions → role name 函数创建新内线策略: 如JSON: { "Version": "2012-10-17", "Statement": [ { "Sid": "Statement1", "Effect": "Allow", "Action": [ "ec2:DescribeInstances" ], "Resource": [ "*" ] } ] } 现在我们可以回到我们的代理,点击“测试”,输入文本来检查它是否确实起作用: 酷!第一个行动组按预期工作,允许添加另一个行动组列出 cloudwatch 指标: 行动组的名称 - cloudwatch 函数的名称是 getMetrics,添加描述和参数,因为这个 lambda 必须知道该实例或 intances 来检查指标。 更新代理提示,解释我们想要如何使用新的行动组,然后再单击“保存”和“准备” 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: ec2: describeInstances → 返回实例列表 + instanceIds cloudwatch: getMetrics → 需要 instance_ids 规则: Never call AWS APIs directly. For EC2 + CPU: Call ec2__describeInstances Extract instanceIds Call cloudwatch__getMetrics 组合结果。 永远在行动之前使用“思考”。 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: ec2: describeInstances → 返回实例列表 + instanceIds cloudwatch: getMetrics → 需要 instance_ids 规则: 永远不要直接拨打 AWS API。 For EC2 + CPU: Call ec2__describeInstances Extract instanceIds Call cloudwatch__getMetrics 组合结果。 永远在行动之前使用“思考”。 现在可以更新我们的 cloudwatch 函数代码: import boto3 import datetime import logging import json from typing import Dict, Any from http import HTTPStatus logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: try: action_group = event["actionGroup"] function = event["function"] message_version = event.get("messageVersion", 1) parameters = event.get("parameters", []) region = "us-east-1" instance_ids = [] # --- Parse parameters --- for param in parameters: if param.get("name") == "region": region = param.get("value") elif param.get("name") == "instance_ids": raw_value = param.get("value") if isinstance(raw_value, str): # Clean up stringified list from Bedrock agent raw_value = raw_value.strip().replace("[", "").replace("]", "").replace("'", "") instance_ids = [x.strip() for x in raw_value.split(",") if x.strip()] elif isinstance(raw_value, list): instance_ids = raw_value logger.info(f"Parsed instance IDs: {instance_ids}") if not instance_ids: response_text = f"No instance IDs provided for CloudWatch metrics in {region}." else: cloudwatch = boto3.client("cloudwatch", region_name=region) now = datetime.datetime.utcnow() start_time = now - datetime.timedelta(hours=1) metrics_output = [] for instance_id in instance_ids: try: metric = cloudwatch.get_metric_statistics( Namespace="AWS/EC2", MetricName="CPUUtilization", Dimensions=[{"Name": "InstanceId", "Value": instance_id}], StartTime=start_time, EndTime=now, Period=300, Statistics=["Average"] ) datapoints = metric.get("Datapoints", []) if datapoints: datapoints.sort(key=lambda x: x["Timestamp"]) avg_cpu = round(datapoints[-1]["Average"], 2) metrics_output.append(f"{instance_id}: {avg_cpu}% CPU (avg last hour)") else: metrics_output.append(f"{instance_id}: No recent CPU data") except Exception as e: logger.error(f"Error fetching metrics for {instance_id}: {e}") metrics_output.append(f"{instance_id}: Error fetching metrics") response_text = ( f"CPU Utilization (last hour) in {region}:\n" + "\n".join(metrics_output) ) # --- Bedrock Agent response format --- response_body = { "TEXT": { "body": response_text } } action_response = { "actionGroup": action_group, "function": function, "functionResponse": { "responseBody": response_body } } response = { "response": action_response, "messageVersion": message_version } logger.info("Response: %s", response) return response except Exception as e: logger.error(f"Unexpected error: {e}") return { "statusCode": HTTPStatus.INTERNAL_SERVER_ERROR, "body": f"Internal server error: {str(e)}" } 更新云时钟的 lambda 权限,就像我们对 ec2 lambda 所做的: { "Version": "2012-10-17", "Statement": [ { "Sid": "Statement1", "Effect": "Allow", "Action": [ "cloudwatch:GetMetricStatistics" ], "Resource": [ "*" ] } ] } 然后再试一次 我们有 EC2 和 CloudWatch 操作组,可以通过代理来调用它们,以获取 EC2 实例列表及其 CPU 指标。 代替代理单独调用 EC2 和 CloudWatch 来调用, Supervisor 会照顾这个逻辑,首先调用 EC2 函数以获取所有实例,然后将这些实例 ID 传输到 CloudWatch 函数以获取指标,最后将所有内容合并成一个清晰的结果。 这样,代理只需要调用一个操作 - 监督 - 而监督协调了背景中的所有步骤,更清洁,更快,更容易维护。 给它一个名字和描述 输入函数名称和描述 更新代理指示以避免直接呼叫 ec2 和 CloudWatch 行动组: 然后点击“保存”和“准备”。 更新管理员 lambda 函数代码, NOTE: need to update your EC2 and Cloudwatch functions name in the code below: import boto3 import json import logging import re import ast logger = logging.getLogger() logger.setLevel(logging.INFO) lambda_client = boto3.client("lambda") def lambda_handler(event, context): try: action_group = event["actionGroup"] function = event["function"] parameters = event.get("parameters", []) message_version = event.get("messageVersion", "1.0") # Parse parameters region = "us-east-1" for param in parameters: if param.get("name") == "region": region = param.get("value") # Decide routing if function == "analyzeInfrastructure": logger.info("Supervisor: calling EC2 and CloudWatch") # Step 1: call EC2 Lambda ec2_payload = { "actionGroup": "ec2", "function": "list_instances", "parameters": [{"name": "region", "value": region}], "messageVersion": "1.0" } ec2_response = invoke_lambda("ec2-yeikw", ec2_payload) #### CHANGE TO YOUR EC2 FUNCTION NAME instances = extract_instance_ids(ec2_response) # Step 2: call CloudWatch Lambda (if instances found) if instances: cw_payload = { "actionGroup": "cloudwatch", "function": "getMetrics", "parameters": [ {"name": "region", "value": region}, {"name": "instance_ids", "value": instances} ], "messageVersion": "1.0" } cw_response = invoke_lambda("cloudwatch-ef6ty", cw_payload) #### CHANGE TO YOUR CLOUDWATCH FUNCTION NAME final_text = merge_responses(ec2_response, cw_response) else: final_text = "No instances found to analyze." else: final_text = f"Unknown function: {function}" # Construct Bedrock-style response response = { "messageVersion": message_version, "response": { "actionGroup": action_group, "function": function, "functionResponse": { "responseBody": { "TEXT": {"body": final_text} } } } } logger.info("Supervisor response: %s", response) return response except Exception as e: logger.exception("Error in supervisor") return { "statusCode": 500, "body": f"Supervisor error: {str(e)}" } def invoke_lambda(name, payload): """Helper to call another Lambda and parse response""" response = lambda_client.invoke( FunctionName=name, InvocationType="RequestResponse", Payload=json.dumps(payload), ) result = json.loads(response["Payload"].read()) return result def extract_instance_ids(ec2_response): """Extract instance IDs from EC2 Lambda response""" try: body = ec2_response["response"]["functionResponse"]["responseBody"]["TEXT"]["body"] # Try to extract JSON-like data after "Found X EC2 instance(s):" if "Found" in body and "[" in body and "]" in body: data_part = body.split(":", 1)[1].strip() try: instances = ast.literal_eval(data_part) # safely parse the list return [i["InstanceId"] for i in instances if "InstanceId" in i] except Exception: pass # fallback regex in case of plain text return re.findall(r"i-[0-9a-f]+", body) except Exception as e: logger.error("extract_instance_ids error: %s", e) return [] def merge_responses(ec2_resp, cw_resp): """Combine EC2 and CloudWatch outputs""" ec2_text = ec2_resp["response"]["functionResponse"]["responseBody"]["TEXT"]["body"] cw_text = cw_resp["response"]["functionResponse"]["responseBody"]["TEXT"]["body"] return f"{ec2_text}\n\n{cw_text}" 再次,添加监督 lambda 权限来调用我们的 EC2 和 Cloudwatch 函数,例如: { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": [ "arn:aws:lambda:us-east-1:<account_id>:function:ec2-<id>", "arn:aws:lambda:us-east-1:<account_id>:function:cloudwatch-<id>" ] } ] } 让我们再次测试该功能,令人惊讶的是它失败了 我检查了我的 supervusor supervisor 函数日志,并查看此 一个它似乎没有显示任何有用的,但没有 - 暗示它 3000.00ms. 它的默认 lambda 函数时间,允许调整它. 去监督函数 - 配置 - 一般和编辑时间的参数,我改变到 10 秒 它帮助了! 您可以通过添加 AWS 账单分析来继续扩展此功能,以便找到最昂贵的资源或最昂贵的资源。 ,等等,您不必仅限于 AWS 资源,您可以自由获得一些外部功能。 您正在运行的昂贵的 ec2 实例