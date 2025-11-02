Amazon Bedrock 代理人就像为您的 AWS 基础设施提供智能助理 - 他们可以推理,决定下一步要做什么,并使用 Lambda 函数启动操作。 在本文中,我将展示如何构建一个主管代理,该代理将多个 AWS Lambdas 编排为: \n \n \n \n \n \n List EC2 instances, \n \n Fetch their CPU metrics from CloudWatch, \n \n \n Combine both results intelligently — all without the agent ever calling AWS APIs directly. By the end, you’ll understand how Bedrock Agents work, how to use action groups, and how to chain Lambdas through a supervisor function — a clean, scalable pattern for multi-step automation. 让我们检查图表和其他例子,什么是代理,以获得更好的可视性和理解: 用户给Bedrock代理人(1)打电话,做一些任务,比如说“你存储了多少电视机?”代理人通过定义的提示知道,如果问题与检查库存状态有关,他们需要拨打(2)“数据库”行动组(3,AG)。 让我们来看看另一个例子: 每个代理人可以有多个行动组,例如,我们想要获取有关某些AWS资源的信息,比如列出所有ECS任务,逻辑与前一个相同。 更多例子: 我们添加了另一个 AG 与 EKS 操作组. 正如您在这里所看到的,每个操作组可以有多个 lambda 函数来提出请求. 在本示例中,它正在列出和删除某些现有的 K8S 集群中的资源。 行动组和 lambda 功能可以具有任何功能,即使您需要从第三方 API 获取数据,以获取天气数据或机票可用性。 我希望它现在有点清晰,让我们回到我们的监督代理设置: 在 AWS 控制台中,打开 Bedrock → 代理 → 创建代理 给它一个名字,并创建 一旦创建,你可以更改模型,如果你想,或保持克劳德的默认。 添加代理人的描述和指示.我们将在下一步创建的行动组 \n \n \n \n \n \n \n \n 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: \n \n ec2: list_instances → returns instance list + instanceIds 规则: \n \n \n Never call AWS APIs directly. For EC2:\n \n \n \n Call ec2__list_instances 永远在行动之前使用“思考”。 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: \n \n ec2: list_instances → 返回实例列表 + instanceIds 规则: \n \n \n 永远不要直接拨打 AWS API。 For EC2:\n \n \n \n Call ec2__list_instances 永远在行动之前使用“思考”。 注: ec2 - 行动组名称 list_instances - 函数名称,正如我之前提到的 - 您可以每个操作组有多个函数 然后点击“保存” 顶部的“准备”按钮,一旦保存,准备将激活。 滚动到行动组 → 添加 名称 - EC2. 行动组. 召唤 - 创建一个新的 lambda 函数,其中 必须与我们在代理指示中定义的相同 list_instances 添加行动组名称和描述,单击创建,然后再次“保存”和“准备”。 转到 lambda 函数,Bedrock 创建了该函数名称中的 EC2 前缀,并添加了以下代码: import logging\nfrom typing import Dict, Any\nfrom http import HTTPStatus\nimport boto3\n\nlogger = logging.getLogger()\nlogger.setLevel(logging.INFO)\n\nec2_client = boto3.client('ec2')\n\ndef lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:\n """\n AWS Lambda handler for processing Bedrock agent requests related to EC2 instances.\n \n Supports:\n - Listing all EC2 instances\n - Describing a specific instance by ID\n """\n try:\n action_group = event['actionGroup']\n function = event['function']\n message_version = event.get('messageVersion', 1)\n parameters = event.get('parameters', [])\n\n response_text = ""\n\n if function == "list_instances":\n # List all EC2 instances\n instances = ec2_client.describe_instances()\n instance_list = []\n for reservation in instances['Reservations']:\n for instance in reservation['Instances']:\n instance_list.append({\n 'InstanceId': instance.get('InstanceId'),\n 'State': instance.get('State', {}).get('Name'),\n 'InstanceType': instance.get('InstanceType'),\n 'PrivateIpAddress': instance.get('PrivateIpAddress', 'N/A'),\n 'PublicIpAddress': instance.get('PublicIpAddress', 'N/A')\n })\n response_text = f"Found {len(instance_list)} EC2 instance(s): {instance_list}"\n\n elif function == "describe_instance":\n # Expect a parameter with the instance ID\n instance_id_param = next((p for p in parameters if p['name'] == 'instanceId'), None)\n if not instance_id_param:\n raise KeyError("Missing required parameter: instanceId")\n\n instance_id = instance_id_param['value']\n result = ec2_client.describe_instances(InstanceIds=[instance_id])\n instance = result['Reservations'][0]['Instances'][0]\n response_text = (\n f"Instance {instance_id} details: "\n f"State={instance['State']['Name']}, "\n f"Type={instance['InstanceType']}, "\n f"Private IP={instance.get('PrivateIpAddress', 'N/A')}, "\n f"Public IP={instance.get('PublicIpAddress', 'N/A')}"\n )\n\n else:\n response_text = f"Unknown function '{function}' requested."\n\n # Format Bedrock agent response\n response_body = {\n 'TEXT': {\n 'body': response_text\n }\n }\n\n action_response = {\n 'actionGroup': action_group,\n 'function': function,\n 'functionResponse': {\n 'responseBody': response_body\n }\n }\n\n response = {\n 'response': action_response,\n 'messageVersion': message_version\n }\n\n logger.info('Response: %s', response)\n return response\n\n except KeyError as e:\n logger.error('Missing required field: %s', str(e))\n return {\n 'statusCode': HTTPStatus.BAD_REQUEST,\n 'body': f'Error: {str(e)}'\n }\n\n except Exception as e:\n logger.error('Unexpected error: %s', str(e))\n return {\n 'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR,\n 'body': f'Internal server error: {str(e)}'\n }\n 注意:函数的响应必须以Bedrock特定的格式,详细信息可以在文档中找到: https://docs.aws.amazon.com/bedrock/latest/userguide/agents-lambda.html 更新函数代码后,转到 Configuration → permissions → role name 函数创建新内线策略: 如JSON: {\n "Version": "2012-10-17",\n "Statement": [\n {\n "Sid": "Statement1",\n "Effect": "Allow",\n "Action": [\n "ec2:DescribeInstances"\n ],\n "Resource": [\n "*"\n ]\n }\n ]\n}\n 现在我们可以回到我们的代理,点击“测试”,输入文本来检查它是否确实起作用: 酷!第一个行动组按预期工作,允许添加另一个行动组列出 cloudwatch 指标: 行动组的名称 - cloudwatch 函数的名称是 getMetrics,添加描述和参数,因为这个 lambda 必须知道该实例或 intances 来检查指标。 更新代理提示,解释我们想要如何使用新的行动组,然后再单击“保存”和“准备” \n \n \n \n \n \n \n \n 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: \n \n \n ec2: describeInstances → 返回实例列表 + instanceIds cloudwatch: getMetrics → 需要 instance_ids 规则: \n \n \n \n Never call AWS APIs directly. For EC2 + CPU:\n \n \n \n \n \n Call ec2__describeInstances Extract instanceIds Call cloudwatch__getMetrics 组合结果。 永远在行动之前使用“思考”。 您是主要的 AWS 监督代理人。 目标:帮助分析 AWS 基础设施。 行动小组: \n \n \n ec2: describeInstances → 返回实例列表 + instanceIds cloudwatch: getMetrics → 需要 instance_ids 规则: \n \n \n \n 永远不要直接拨打 AWS API。 For EC2 + CPU:\n \n \n \n \n \n Call ec2__describeInstances Extract instanceIds Call cloudwatch__getMetrics 组合结果。 永远在行动之前使用“思考”。 现在可以更新我们的 cloudwatch 函数代码: import boto3\nimport datetime\nimport logging\nimport json\nfrom typing import Dict, Any\nfrom http import HTTPStatus\n\nlogger = logging.getLogger()\nlogger.setLevel(logging.INFO)\n\ndef lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:\n try:\n action_group = event["actionGroup"]\n function = event["function"]\n message_version = event.get("messageVersion", 1)\n parameters = event.get("parameters", [])\n\n region = "us-east-1"\n instance_ids = []\n\n # --- Parse parameters ---\n for param in parameters:\n if param.get("name") == "region":\n region = param.get("value")\n elif param.get("name") == "instance_ids":\n raw_value = param.get("value")\n if isinstance(raw_value, str):\n # Clean up stringified list from Bedrock agent\n raw_value = raw_value.strip().replace("[", "").replace("]", "").replace("'", "")\n instance_ids = [x.strip() for x in raw_value.split(",") if x.strip()]\n elif isinstance(raw_value, list):\n instance_ids = raw_value\n\n logger.info(f"Parsed instance IDs: {instance_ids}")\n\n if not instance_ids:\n response_text = f"No instance IDs provided for CloudWatch metrics in {region}."\n else:\n cloudwatch = boto3.client("cloudwatch", region_name=region)\n now = datetime.datetime.utcnow()\n start_time = now - datetime.timedelta(hours=1)\n metrics_output = []\n\n for instance_id in instance_ids:\n try:\n metric = cloudwatch.get_metric_statistics(\n Namespace="AWS/EC2",\n MetricName="CPUUtilization",\n Dimensions=[{"Name": "InstanceId", "Value": instance_id}],\n StartTime=start_time,\n EndTime=now,\n Period=300,\n Statistics=["Average"]\n )\n datapoints = metric.get("Datapoints", [])\n if datapoints:\n datapoints.sort(key=lambda x: x["Timestamp"])\n avg_cpu = round(datapoints[-1]["Average"], 2)\n metrics_output.append(f"{instance_id}: {avg_cpu}% CPU (avg last hour)")\n else:\n metrics_output.append(f"{instance_id}: No recent CPU data")\n except Exception as e:\n logger.error(f"Error fetching metrics for {instance_id}: {e}")\n metrics_output.append(f"{instance_id}: Error fetching metrics")\n\n response_text = (\n f"CPU Utilization (last hour) in {region}:\\n" +\n "\\n".join(metrics_output)\n )\n\n # --- Bedrock Agent response format ---\n response_body = {\n "TEXT": {\n "body": response_text\n }\n }\n\n action_response = {\n "actionGroup": action_group,\n "function": function,\n "functionResponse": {\n "responseBody": response_body\n }\n }\n\n response = {\n "response": action_response,\n "messageVersion": message_version\n }\n\n logger.info("Response: %s", response)\n return response\n\n except Exception as e:\n logger.error(f"Unexpected error: {e}")\n return {\n "statusCode": HTTPStatus.INTERNAL_SERVER_ERROR,\n "body": f"Internal server error: {str(e)}"\n }\n 更新云时钟的 lambda 权限,就像我们对 ec2 lambda 所做的: {\n "Version": "2012-10-17",\n "Statement": [\n {\n "Sid": "Statement1",\n "Effect": "Allow",\n "Action": [\n "cloudwatch:GetMetricStatistics"\n ],\n "Resource": [\n "*"\n ]\n }\n ]\n}\n 然后再试一次 我们有 EC2 和 CloudWatch 操作组,可以通过代理来调用它们,以获取 EC2 实例列表及其 CPU 指标。 代替代理单独调用 EC2 和 CloudWatch 来调用, Supervisor 会照顾这个逻辑,首先调用 EC2 函数以获取所有实例,然后将这些实例 ID 传输到 CloudWatch 函数以获取指标,最后将所有内容合并成一个清晰的结果。 这样,代理只需要调用一个操作 - 监督 - 而监督协调了背景中的所有步骤,更清洁,更快,更容易维护。 给它一个名字和描述 输入函数名称和描述 更新代理指示以避免直接呼叫 ec2 和 CloudWatch 行动组: 然后点击“保存”和“准备”。 更新管理员 lambda 函数代码, NOTE: need to update your EC2 and Cloudwatch functions name in the code below: import boto3\nimport json\nimport logging\nimport re\nimport ast\n\nlogger = logging.getLogger()\nlogger.setLevel(logging.INFO)\n\nlambda_client = boto3.client("lambda")\n\ndef lambda_handler(event, context):\n try:\n action_group = event["actionGroup"]\n function = event["function"]\n parameters = event.get("parameters", [])\n message_version = event.get("messageVersion", "1.0")\n\n # Parse parameters\n region = "us-east-1"\n for param in parameters:\n if param.get("name") == "region":\n region = param.get("value")\n\n # Decide routing\n if function == "analyzeInfrastructure":\n logger.info("Supervisor: calling EC2 and CloudWatch")\n\n # Step 1: call EC2 Lambda\n ec2_payload = {\n "actionGroup": "ec2",\n "function": "list_instances",\n "parameters": [{"name": "region", "value": region}],\n "messageVersion": "1.0"\n }\n\n ec2_response = invoke_lambda("ec2-yeikw", ec2_payload) #### CHANGE TO YOUR EC2 FUNCTION NAME\n instances = extract_instance_ids(ec2_response)\n\n # Step 2: call CloudWatch Lambda (if instances found)\n if instances:\n cw_payload = {\n "actionGroup": "cloudwatch",\n "function": "getMetrics",\n "parameters": [\n {"name": "region", "value": region},\n {"name": "instance_ids", "value": instances}\n ],\n "messageVersion": "1.0"\n }\n cw_response = invoke_lambda("cloudwatch-ef6ty", cw_payload) #### CHANGE TO YOUR CLOUDWATCH FUNCTION NAME\n final_text = merge_responses(ec2_response, cw_response)\n else:\n final_text = "No instances found to analyze."\n\n else:\n final_text = f"Unknown function: {function}"\n\n # Construct Bedrock-style response\n response = {\n "messageVersion": message_version,\n "response": {\n "actionGroup": action_group,\n "function": function,\n "functionResponse": {\n "responseBody": {\n "TEXT": {"body": final_text}\n }\n }\n }\n }\n\n logger.info("Supervisor response: %s", response)\n return response\n\n except Exception as e:\n logger.exception("Error in supervisor")\n return {\n "statusCode": 500,\n "body": f"Supervisor error: {str(e)}"\n }\n\n\ndef invoke_lambda(name, payload):\n """Helper to call another Lambda and parse response"""\n response = lambda_client.invoke(\n FunctionName=name,\n InvocationType="RequestResponse",\n Payload=json.dumps(payload),\n )\n result = json.loads(response["Payload"].read())\n return result\n\n\ndef extract_instance_ids(ec2_response):\n """Extract instance IDs from EC2 Lambda response"""\n try:\n body = ec2_response["response"]["functionResponse"]["responseBody"]["TEXT"]["body"]\n\n # Try to extract JSON-like data after "Found X EC2 instance(s):"\n if "Found" in body and "[" in body and "]" in body:\n data_part = body.split(":", 1)[1].strip()\n try:\n instances = ast.literal_eval(data_part) # safely parse the list\n return [i["InstanceId"] for i in instances if "InstanceId" in i]\n except Exception:\n pass\n\n # fallback regex in case of plain text\n return re.findall(r"i-[0-9a-f]+", body)\n except Exception as e:\n logger.error("extract_instance_ids error: %s", e)\n return []\n\n\ndef merge_responses(ec2_resp, cw_resp):\n """Combine EC2 and CloudWatch outputs"""\n ec2_text = ec2_resp["response"]["functionResponse"]["responseBody"]["TEXT"]["body"]\n cw_text = cw_resp["response"]["functionResponse"]["responseBody"]["TEXT"]["body"]\n return f"{ec2_text}\\n\\n{cw_text}"\n 再次,添加监督 lambda 权限来调用我们的 EC2 和 Cloudwatch 函数,例如: {\n "Version": "2012-10-17",\n "Statement": [\n {\n "Sid": "VisualEditor0",\n "Effect": "Allow",\n "Action": "lambda:InvokeFunction",\n "Resource": [\n "arn:aws:lambda:us-east-1:<account_id>:function:ec2-<id>",\n "arn:aws:lambda:us-east-1:<account_id>:function:cloudwatch-<id>"\n ]\n }\n ]\n}\n 让我们再次测试该功能,令人惊讶的是它失败了 我检查了我的 supervusor supervisor 函数日志,并查看此 一个它似乎没有显示任何有用的,但没有 - 暗示它 3000.00ms. 它的默认 lambda 函数时间,允许调整它. 去监督函数 - 配置 - 一般和编辑时间的参数,我改变到 10 秒 它帮助了! 您可以通过添加 AWS 账单分析来继续扩展此功能,以便找到最昂贵的资源或最昂贵的资源。 ,等等,您不必仅限于 AWS 资源,您可以自由获得一些外部功能。 您正在运行的昂贵的 ec2 实例