Amazon announced the general availability of a few weeks ago, Kendra is a highly accurate and easy to use enterprise search service powered by machine learning. Amazon Kendra In this post I will build a question and answer chatbot solution using with , API in , and the solution provides a conversational interface for Questions and Answers. It allows users to ask their questions and get relevant answers quickly. React Amplify WebSocket AWS API Gateway AWS Fargate Amazon Kendra, What we’ll cover in this post: Create Amazon Kendra index, extract questions and answers from semi-structured document to Kendra FAQ. Deploy a WebSocket API in API Gateway to process the question and answer messages. Create a React application and use the AWS Amplify to connect & interact with the chatbot through WebSocket. Create a service in AWS Fargate that let our bot call Kendra’s API to provide answer and send back to user. The following diagram shows the architecture of the above steps: Why AWS Fargate matters? An easy approach is that using Lambda function to query Amazon Kendra without using Fargate. However, with AWS Fargate or EC2, we are able to extend the chatbot with custom AI models to make our bot more human, eg. We can build a chatbot based on the model and query Kendra only for Specific data. Hugging Face State-of-the-Art Conversational AI If your program is a long compute job which requires more GBs of memory and higher performance, is probably the better option. Fargate Prerequisites Setup an AWS account Install latest a ws-cli Install Amplify cli Basic understanding of React Basic understanding of Docker Basic understanding of CloudFormation Install or update the to latest version Serverless framework installed (optional) jq Now, Let’s get started! Creating an Amazon Kendra index Let’s create a Kendra index. Kendra supports unstructured and semi-structured documents like FAQs stored in S3, we will use FAQS in our case. First, let’s download QnA dataset and upload it to S3. We can use for our chatbot. Microsoft Research WikiQA Corpus Once downloading the dataset, let’s transform to Kendra supported csv format like below: Use following script to transform dataset and upload the transformed csv file to existing S3 bucket my-kendra-index: json boto3 pandas pd os def create_faq_format(input_path): faq_list = [] open(input_path) f: lines = [line.strip( ) line f] i range( , len(lines)): l = lines[i].split( ) l[ ]== : faq_list.append({ :l[ ], :l[ ]}) faq_list qa_list = create_faq_format( ) df = pd.DataFrame(qa_list, columns=[ , ]) df = df.drop_duplicates(subset= , keep= ) df.to_csv( , index=False) s3 = boto3.resource( ) s3.meta.client.upload_file( , , ) import import import as import with as '\n' for in for in 2 '\t' if 2 "1" "Question" 0 "Answer" 1 return "WikiQACodePackage/data/wiki/WikiQASent-train.txt" "Question" "Answer" 'Question' "first" 'faq.csv' 's3' "faq.csv" 'my-kendra-index' 'faq/faq.csv' Now, we are now ready to create a Kendra index. To create a Kendra index, complete the following steps: On the Amazon Kendra console, choose Launch Amazon Kendra. Create index and enter an Index name, such as my-aq-index. For IAM role, choose Create a new role to create a role to allow Amazon Kendra to access CloudWatch Logs. Create Index. After the Kendra index has been created, we can add our FAQ document: Add FAQ from Amazon Kendra console. For S3, browse S3 to find your bucket, and select the FAQ csv file, here we use s3://my-kendra-index/faq/faq.csv. For IAM role, select Create a new role to allow Amazon Kendra to access FAQ content object in S3 bucket. Add FAQ. Now that we have a working Kendra index, let’s move to next step. Deploying WebSocket API in API Gateway to process the QnA messages In this section we will build a WebSockets API in AWS API Gateway, create lambda functions to manage WebSockets routes ( ) and create DynamoDb to store WebSockets connection Ids and user names. 1) 2) $connect, $disconnect, sendMessage 3) We will use to build and deploy all required resources, let’s create a new Serverless project and add following config to serverless.yml : Serverless Framework service: serverless-chat provider: name: aws runtime: nodejs12.x stackName: ${self:service}-${self:provider.stage} stage: ${opt:stage, 'dev' } region: ${opt:region, 'ap-southeast-2' } tags: project: chatbot iamRoleStatements: - Effect: Allow Action: - "execute-api:ManageConnections" Resource: - "arn:aws:execute-api:*:*:**/@connections/*" - Effect: Allow Action: - "dynamodb:PutItem" - "dynamodb:GetItem" - "dynamodb:UpdateItem" - "dynamodb:DeleteItem" - "dynamodb:Query" - "dynamodb:Scan" Resource: - Fn: :GetAtt: [ChatConnectionsTable, Arn] - Fn: :Join: - "/" - - Fn: :GetAtt: [ChatConnectionsTable, Arn] - "*" environment: CHATCONNECTION_TABLE: Ref: ChatConnectionsTable websocketApiName: websocket-chat-${self:provider.stage} websocketApiRouteSelectionExpression: $request.body.action functions: connectionManager: handler: handler.connectionManager events: - websocket: route: $connect authorizer: name: "authFunc" identitySource: - "route.request.querystring.token" - websocket: route: $disconnect authFunc: handler: handler.authFunc environment: APP_CLIENT_ID: ${ssm:/chatbot/${self:provider.stage}/app_client_id} USER_POOL_ID: ${ssm:/chatbot/${self:provider.stage}/user_pool_id} defaultMessages: handler: handler.defaultMessage events: - websocket: route: $default sendMessage: handler: handler.sendMessage events: - websocket: route: sendMessage resources: Resources: ChatConnectionsTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: connectionId AttributeType: S - AttributeName: userid AttributeType: S KeySchema: - AttributeName: connectionId KeyType: HASH ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 GlobalSecondaryIndexes: - IndexName: userid_index KeySchema: - AttributeName: userid KeyType: HASH Projection: ProjectionType: ALL ProvisionedThroughput: ReadCapacityUnits: "5" WriteCapacityUnits: "5" Note that the Cognito App Client Id ( ) and Cognito User Pool Id ( ) in serverless.yml has not been created yet, We only reference Cognito details as SSM Parameters here, in next step, we will create Cognito User Pool using Amplify Cli and then we can modify related SSM parameters from System Storage Manager console. /chatbot/dev/app_client_id /chatbot/dev/user_pool_id Once serviceless.yml has been modified, update handler.js to create the lambda functions for WebSockets routes: , , : $connect with custom authorizer $disconnect sendMessage ; AWS = ( ); DDB = AWS.DynamoDB({ : }); jose = ( ); fetch = ( ); KEYS_URL = ; successfullResponse = { : , : , }; .exports.connectionManager = (event, context, callback) => { (event.requestContext.eventType === ) { { addConnection( event.requestContext.connectionId, event.queryStringParameters.username ); callback( , successfullResponse); } (error) { callback( , .stringify(error)); } } (event.requestContext.eventType === ) { { deleteConnection(event.requestContext.connectionId); callback( , successfullResponse); } (error) { callback( , { : , : + .stringify(err), }); } } }; .exports.defaultMessage = { callback( ); }; .exports.sendMessage = (event, context, callback) => { connectionData; { { body } = event; messageBodyObj = .parse(body); params = { : , : , : { : { : .parse(messageBodyObj.data).to || , }, }, : process.env.CHATCONNECTION_TABLE, }; connectionData = DDB.query(params).promise(); } (err) { .log(err); { : }; } postCalls = connectionData.Items.map( ({ connectionId }) => { { send(event, connectionId.S); } (err) { (err.statusCode === ) { deleteConnection(connectionId.S); } .log( .stringify(err)); err; } }); { .all(postCalls); } (err) { .log(err); callback( , .stringify(err)); } callback( , successfullResponse); }; send = { postData = .parse(event.body).data; apigwManagementApi = AWS.ApiGatewayManagementApi({ : , : event.requestContext.domainName + + event.requestContext.stage, }); apigwManagementApi .postToConnection({ : connectionId, : postData }) .promise(); }; addConnection = { putParams = { : process.env.CHATCONNECTION_TABLE, : { : { : connectionId }, : { : userid }, }, }; DDB.putItem(putParams).promise(); }; deleteConnection = { deleteParams = { : process.env.CHATCONNECTION_TABLE, : { : { : connectionId }, }, }; DDB.deleteItem(deleteParams).promise(); }; .exports.authFunc = (event, context, callback) => { { : { token }, methodArn, } = event; policy; { policy = authCognitoToken(token, methodArn); callback( , policy); } (error) { .log(error); callback( ); } }; authCognitoToken = (token, methodArn) => { (!token) ( ); app_client_id = process.env.APP_CLIENT_ID; sections = token.split( ); authHeader = jose.util.base64url.decode(sections[ ]); authHeader = .parse(authHeader); kid = authHeader.kid; rawRes = fetch(KEYS_URL); response = rawRes.json(); (rawRes.ok) { keys = response[ ]; key_index = ; keys.some( { (kid == key.kid) { key_index = index; } }); foundKey = keys.find( { kid === key.kid; }); (!foundKey) { callback( ); } jwkRes = jose.JWK.asKey(foundKey); verifyRes = jose.JWS.createVerify(jwkRes).verify(token); claims = .parse(verifyRes.payload); current_ts = .floor( () / ); (current_ts > claims.exp) { ( ); } (claims.client_id != app_client_id) { ( ); } { generatePolicy( , , methodArn); } } ( ); }; generatePolicy = { authResponse = {}; authResponse.principalId = principalId; (effect && resource) { policyDocument = {}; policyDocument.Version = ; policyDocument.Statement = []; statementOne = {}; statementOne.Action = ; statementOne.Effect = effect; statementOne.Resource = resource; policyDocument.Statement[ ] = statementOne; authResponse.policyDocument = policyDocument; } authResponse; }; generateAllow = { generatePolicy(principalId, , resource); }; generateDeny = { generatePolicy(principalId, , resource); }; "use strict" const require "aws-sdk" const new apiVersion "2012-10-08" const require "node-jose" const require "node-fetch" const `https://cognito-idp. .amazonaws.com/ /.well-known/jwks.json` ${process.env.AWS_REGION} ${process.env.USER_POOL_ID} const statusCode 200 body "Connected" module async if "CONNECT" try await null catch null JSON else if "DISCONNECT" try await null catch null statusCode 500 body "Failed to connect: " JSON module ( ) => event, context, callback null module async let try const const JSON const IndexName "userid_index" KeyConditionExpression "userid = :u" ExpressionAttributeValues ":u" S JSON "ROBOT" TableName await catch console return statusCode 500 const async try return await catch if 410 return await console JSON throw try await Promise catch console null JSON null const ( ) => event, connectionId const JSON const new apiVersion "2018-11-29" endpoint "/" return ConnectionId Data const ( ) => connectionId, userid const TableName Item connectionId S userid S return const ( ) => connectionId const TableName Key connectionId S return module async const queryStringParameters let try await null catch console "Signature verification failed" const async if throw new Error "Unauthorized" const const "." let 0 JSON const const await const await if const "keys" let -1 ( ) => key, index if const ( ) => key return if "Public key not found in jwks.json" const await const await const JSON const Math new Date 1000 if throw new Error "Token is expired" if throw new Error "Token was not issued for this audience" else return "me" "Allow" throw new Error "Keys url is invalid" const ( ) function principalId, effect, resource var if var "2012-10-17" var "execute-api:Invoke" 0 return const ( ) function principalId, resource return "Allow" const ( ) function principalId, resource return "Deny" Run the following commands to deploy WebSocket API: $sls dev YOUR_REGION deploy --stage --region Building a React application with AWS Amplify In this section, We will build a web app using React and AWS Amplify with authentication feature. The complete project in Github , you can find the following folders in the project directory: repo amplify/.config/, and amplify/backend/. project-config.json in .config/ folder. backend-config.json in backend/ folder. CloudFormation files in the backend/ folder. Let’s download source code and re-initialise the existing Amplify project by running: init $amplify then push changes: push $amplify and deploy: $amplify publish We will get the web application URL after project has been deployed: Now, log in to AWS Cognito service console and you can now see AWS Cognito User Pool has been created . Copy the User Pool Id and App Client Id to SSM Parameters we already created in previous step. Now that we’re ready for the final step! Creating a chatbot service in AWS Fargate In this section, we will create a bot task running in chatbot service in AWS Fargate. Chatbot task connects to WebSocket API, When the user asks a question, the bot can query the Kendra index, and Kendra will surface a relevant answer, send back to the user who asked the question. To deploy the Fargate service, perform the following steps: Download chatbot script and here. Dockerfile Build Docker, Tag an Amazon ECR Repository and push the image to ECR, for more details, please refer to AWS official tutorial. Download the CloudFormation templates and bash scripts here. If using the Fargate launch type, the mode is required. We need to deploy VPC and Security Groups: awsvpc network $ -d bash create-infra.sh dev Create task definition. $ -d bash create-task.sh dev Deploy chatbot service in AWS Fargate. deploy-service -d dev $bash .sh The main logic can be found : here json logging boto3 datetime websocket ssl os logging logger = logging.getLogger() logger.setLevel(logging.INFO) ssm = boto3.client( , region_name= ) KENDRA_INDEX_SSM = ssm.get_parameter(Name=os.environ[ ]) kendra_index_id = KENDRA_INDEX_SSM[ ][ ] kendra = boto3.client( , region_name= ) ROBOT_USER_SSM = ssm.get_parameter(Name=os.environ[ ]) user_name = ROBOT_USER_SSM[ ][ ] ROBOT_PASS_SSM = ssm.get_parameter( Name=os.environ[ ], WithDecryption=True) password = ROBOT_PASS_SSM[ ][ ] USER_POOL_SSM = ssm.get_parameter(Name=os.environ[ ]) user_pool = USER_POOL_SSM[ ][ ] APP_CLIENT_SSM = ssm.get_parameter(Name=os.environ[ ]) app_client = APP_CLIENT_SSM[ ][ ] credentials = boto3.Session().get_credentials() WS_URL_SSM = ssm.get_parameter(Name=os.environ[ ]) def on_message(ws, message): message_obj = json.loads(message) result = get_answer(message_obj[ ][ ]) len(result[ ]) > : logger.debug(result[ ][ ][ ][ ]) answer_text = result[ ][ ][ ][ ] : answer_text = ws.send(json.dumps({ : , : json.dumps({ : answer_text, : , : , : message_obj[ ]}) })) def authenticate_and_get_token(username, password, user_pool_id, app_client_id): client = boto3.client( ) resp = client.admin_initiate_auth( UserPoolId=user_pool_id, ClientId=app_client_id, AuthFlow= , AuthParameters={ : username, : password } ) resp[ ][ ] def on_error(ws, error): logger.error(error) def on_close(ws): logger.info( ) def on_open(ws): logger.info( ) def get_answer(text): response = kendra.query( IndexId=kendra_index_id, QueryText=text, QueryResultTypeFilter= , ) response __name__ == : access_token = authenticate_and_get_token( user_name, password, user_pool, app_client) ws_url = .format( WS_URL_SSM[ ][ ], access_token) websocket.enableTrace(False) ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={ : ssl.CERT_NONE}) import import import import import import import import 'ssm' "ap-southeast-2" "KENDRA_INDEX_KEY" "Parameter" "Value" 'kendra' 'us-east-1' "ROBOT_USER_SSM" "Parameter" "Value" "ROBOT_PASS_SSM" "Parameter" "Value" "USER_POOL_SSM" "Parameter" "Value" "APP_CLIENT_SSM" "Parameter" "Value" "WS_URL_KEY" "data" "text" if "ResultItems" 0 "ResultItems" 0 "DocumentExcerpt" "Text" "ResultItems" 0 "DocumentExcerpt" "Text" else "Sorry, I could not find an answer." "action" "sendMessage" "data" "data" "type" "text" "author" "ROBOT" "to" "author" 'cognito-idp' 'ADMIN_NO_SRP_AUTH' "USERNAME" "PASSWORD" return 'AuthenticationResult' 'AccessToken' "### closed ###" "connected" 'QUESTION_ANSWER' return if '__main__' "{}?token={}&username=ROBOT" "Parameter" "Value" "cert_reqs" (Optional) Extending Chatbot with ConvAI model To extend the chatbot with model, you can try below sample script, note that you will need to put more effort to train the model and put it in the docker or . ConvAI EFS json logging boto3 flask torch datetime torch.nn.functional F # requests_aws4auth AWS4Auth simpletransformers.conv_ai ConvAIModel flask request, Response app = flask.Flask(__name__) region = # ssm = boto3.client( , region_name=region) # credentials = boto3.Session().get_credentials() # awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, # region, service, session_token=credentials.token) dynamodb = boto3.client( ) polly_client = boto3.Session(region_name=region).client( ) s3 = boto3.resource( ) BUCKET_NAME = TABLE_NAME = SPECIAL_TOKENS = [ , , , , ] history = [] convAimodel = ConvAIModel( , , use_cuda=False) character = [ , , , ] def text_2_speech(userid, response_msg): response = polly_client.synthesize_speech(VoiceId= , OutputFormat= , Text=response_msg) object_key = .format(userid, int(datetime.datetime.utcnow().timestamp())) object = s3.Object( BUCKET_NAME, object_key) object.put(Body=response[ ].read()) object_key def get_chat_histories(userid): response = dynamodb.get_item(TableName=TABLE_NAME, Key={ : { : userid }}) response: json.loads(response[ ][ ][ ]) { : []} def save_chat_histories(userid, history): dynamodb.put_item(TableName=TABLE_NAME, Item={ : { : userid}, : { : history}}) def sample_sequence(aiCls, personality, history, tokenizer, model, args, current_output=None): special_tokens_ids = tokenizer.convert_tokens_to_ids(SPECIAL_TOKENS) current_output is None: current_output = [] i range(args[ ]): instance = aiCls.build_input_from_segments( personality, history, current_output, tokenizer, with_eos=False) input_ids = torch.tensor( instance[ ], device=aiCls.device).unsqueeze( ) token_type_ids = torch.tensor( instance[ ], device=aiCls.device).unsqueeze( ) logits = model(input_ids, token_type_ids=token_type_ids) isinstance(logits, tuple): # gpt2 and maybe others logits = logits[ ] logits = logits[ , , :] / args[ ] logits = aiCls.top_filtering( logits, top_k=args[ ], top_p=args[ ]) probs = F.softmax(logits, dim= ) prev = torch.topk(probs, )[ ] args[ ] torch.multinomial(probs, ) i < args[ ] and prev.item() special_tokens_ids: prev.item() special_tokens_ids: probs.max().item() == : warnings.warn( ) # avoid infinitely looping over special token prev = torch.multinomial(probs, num_samples= ) prev.item() special_tokens_ids: current_output.append(prev.item()) current_output def interact(raw_text, model, personality, userid, history): args = model.args tokenizer = model.tokenizer process_count = model.args[ ] model._move_model_to_device() not personality: dataset = get_dataset( tokenizer, None, args[ ], process_count=process_count, proxies=model.__dict__.get( , None), interact=True, ) personalities = [dialog[ ] dataset dataset.values() dialog dataset] personality = random.choice(personalities) : personality = [tokenizer.encode(s.lower()) s personality] history.append(tokenizer.encode(raw_text)) torch.no_grad(): out_ids = sample_sequence( model, personality, history, tokenizer, model.model, args) history.append(out_ids) history = history[-( * args[ ] + ):] out_text = tokenizer.decode(out_ids, skip_special_tokens=True) save_chat_histories(userid, json.dumps({ : history})) out_text @app.route( , methods=[ ]) def process_chat_message(): response = None request.form[ ] is None: response = Response( , status= ) : : userid = request.form[ ] message = request.form[ ] history = get_chat_histories(userid) history = history[ ] response_msg = interact(message, convAimodel, character, userid, history) audio_key = text_2_speech(userid, response_msg) Response( json.dumps({ : response_msg, : audio_key}), status= , mimetype= ) except Exception ex: logging.exception(ex) Response(ex.message, status= ) # response __name__ == : app.run(debug=True, host= , port= ) import import import import import import import as from import from import from import 'ap-southeast-2' 'ssm' 'dynamodb' 'polly' 's3' "aiyi.demo.textract" "serverless-chat-dev-ChatHistoryTable-M0BPSVMQJBFX" "<bos>" "<eos>" "<speaker1>" "<speaker2>" "<pad>" "gpt" "model" "i like computers ." "i like reading books ." "i like talking to chatbots ." "i love listening to classical music ." 'Joanna' 'mp3' "{}/{}/speech.mp3" 'AudioStream' return 'userid' 'S' if 'Item' in return "Item" "history" "S" return "history" return 'userid' 'S' 'history' 'S' if for in "max_length" "input_ids" 0 "token_type_ids" 0 if for 0 0 -1 "temperature" "top_k" "top_p" -1 1 1 if "no_sample" else 1 if "min_length" in while in if 1 "Warning: model generating special token with probability 1." break 1 if in break return "" " Interact with a model in the terminal. Args: personality: A list of sentences that the model will use to build a personality. Returns: None " "" "process_count" if "cache_dir" "proxies" "personality" for in for in else for in with 2 "max_history" 1 "history" return '/message-received' 'POST' if 'userid' "" 415 else try 'userid' 'message' "history" return "message" "audio" 200 'application/json' as return 500 return if '__main__' '0.0.0.0' 8000 Once the service has been deployed, we should be able to ask the bot questions, let’s visit the React application and try it out live! AS you can see the results above evidence that even only key words are used, the system can respond with the correct answer. If you would like to learn more about , there is an official tutorial of building a chatbot using Lex and Kendra, for details please refer to this . Amazon Kendra link I hope you have found this article useful, The source code for this post can be found in my . GitHub repo