文章目录
- aws(学习笔记第三十六课) lambda调用rekognition(名人视频分析AI)
- 学习内容:
-
- 1. 整体架构
- 2. 代码分析
-
- 2.1 创建上传`video`的`S3 bucket`
- 2.2 创建`SNS topic`
- 2.3 创建`start_processing_lambda`需要的`role`
- 2.4 创建`start_rekognition_lambda`
- 2.5 创建`process_video_lambda`
- 2.6 赋予权限给`start_rekognition_lambda`和`process_video_lambda`
- 2.7 `process_video_lambda`订阅`SNS event`
- 2.8 对`start_rekognition_lambda`加入`trigger`
- 2.9 `lambda`代码解析
- 3.部署执行
aws(学习笔记第三十六课) lambda调用rekognition(名人视频分析AI)
- 使用
lambda
调用rekognition
(名人视频分析)
学习内容:
- 使用
rekognition
(名人视频分析)
1. 整体架构
1.1 全体处理架构
1.2 代码链接
代码链接(rekognition-video-processor)
1.3 代码修改
在源代码中,进行结果输出都是采用print
语句,这导致cloudwatch
中不能正常进行输出。对lambda
的代码部分,做如下对应。
- 加入如下代码
import logging logging.getLogger().setLevel(logging.INFO) logger = logging.getLogger(__name__)
- 将
print
的部分改成logger.info
2. 代码分析
2.1 创建上传video
的S3 bucket
# S3 bucket to store videos
video_bucket = s3.Bucket(
self,
"S3Bucket",
removal_policy=RemovalPolicy.DESTROY,
)
2.2 创建SNS topic
# SNS
rekognition_sns_topic = sns.Topic(
self,
"RekognitionSnsTopic",
display_name="Rekognition Job Completion SNS Topic",
)
这里,如果process complete
之后,通过SNS topic
通知结果,之后lambda
进行处理。
2.3 创建start_processing_lambda
需要的role
2.3.1 AmazonSNSFullAccess
权限
rekognition_role = iam.Role(
self,
"RekognitionRole",
assumed_by=iam.ServicePrincipal("rekognition.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSNSFullAccess"),
],
)
因为要video
处理完成后要发布SNS event
,所以对start_processing_lambda
准备访问AmazonSNSFullAccess
的policy
。
2.3.2 S3 bucket
权限
# Define IAM permissions needed
s3_lambda_policy = iam.PolicyStatement(
actions=["s3:GetObject"],
resources=[video_bucket.bucket_arn, video_bucket.bucket_arn + "/*"],
effect=iam.Effect.ALLOW,
)
这里需要访问S3 bucket
,所以需要S3 bucket
权限。
2.3.3 rekognition
权限
rekognition_lambda_policy = iam.PolicyStatement(
actions=["rekognition:*"],
resources=["*"],
effect=iam.Effect.ALLOW,
)
需要执行视频分析的rekognition
,所以赋予rekognition
权限。
2.3.4 PassRole
权限
pass_role_lambda_policy =iam.PolicyStatement(
actions=["iam:PassRole"],
resources=[
rekognition_role.role_arn
],
)
这里,当video processing
结束的时候,需要publish event
到SNS
,所以需要PassRole
。
2.4 创建start_rekognition_lambda
# Lambda which detects when a video has been uploaded to the S3 bucket and starts the video processing with Rekognition
start_processing_lambda_function = lambda_.Function(
self,
"LambdaFunction",
function_name="start-processing-rekognition-demo-lambda",
runtime=lambda_.Runtime.PYTHON_3_10,
handler="index.lambda_handler",
code=lambda_.Code.from_asset("lambdas/start_processing"),
environment={
"SNS_TOPIC_ARN": rekognition_sns_topic.topic_arn,
"SNS_ROLE_ARN": rekognition_role.role_arn,
},
)
这里,environment
传递两个参数
- 一个是
SNS topic arn
,用来当process complete
的时候,发布SNS event
,通知process_video_lambda
进行处理。 - 另一个是
rekognition_role
的arn
。
2.5 创建process_video_lambda
# Lambda which detects when a video has been processed by reckognition. It stracts the data of each celebrity identified
process_video_lambda = lambda_.Function(
self,
"RekognitionLambda",
function_name="process-video-rekognition-demo-lambda",
runtime=lambda_.Runtime.PYTHON_3_10,
handler="index.lambda_handler",
code=lambda_.Code.from_asset("lambdas/process_video"),
)
这个lambda
不进行真正的video processing
,它就是进行start_rekognition_lambda
的结果输出。
2.6 赋予权限给start_rekognition_lambda
和process_video_lambda
2.6.1 赋予权限给start_rekognition_lambda
# Grant permissions to the lambdas defined
start_processing_lambda_function.add_to_role_policy(s3_lambda_policy)
start_processing_lambda_function.add_to_role_policy(rekognition_lambda_policy)
start_processing_lambda_function.add_to_role_policy(pass_role_lambda_policy)
2.6.2 赋予权限给process_video_lambda
process_video_lambda.add_to_role_policy(rekognition_lambda_policy)
2.7 process_video_lambda
订阅SNS event
rekognition_sns_topic.grant_publish(process_video_lambda)
rekognition_sns_topic.add_subscription(sns_subs.LambdaSubscription(process_video_lambda))
这样,video_process_complete event
就能被process_video_lambda
检知到。
2.8 对start_rekognition_lambda
加入trigger
# Automatically trigger lambda when new image is uploaded to S3
start_processing_lambda_function.add_event_source(
aws_lambda_event_sources.S3EventSource(
video_bucket, events=[s3.EventType.OBJECT_CREATED]
)
)
特定的video_bucket
只要上传了mp4
文件,都会触发start_rekognition_lambda
。
2.9 lambda
代码解析
2.9.1 start_rekognition_lambda
代码
import boto3
import os
import logging
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
rekognition = boto3.client("rekognition")
s3 = boto3.client("s3")
SNS_TOPIC_ARN = os.environ["SNS_TOPIC_ARN"]
SNS_ROLE_ARN = os.environ["SNS_ROLE_ARN"]
def lambda_handler(event, context):
key = event["Records"][0]["s3"]["object"]["key"]
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_extension = os.path.splitext(key)[1]
logger.info("key is " + key)
logger.info("file_extension is " + file_extension)
if file_extension == ".mp4":
response = rekognition.start_celebrity_recognition(
Video={"S3Object": {"Bucket": bucket_name, "Name": key}},
NotificationChannel={
"SNSTopicArn": SNS_TOPIC_ARN,
"RoleArn": SNS_ROLE_ARN,
},
)
logger.info("recognition has been started.")
recognition = rekognition.get_celebrity_recognition(JobId=response["JobId"])
return {"statusCode": 200, "body": "Rekognition job started!"}
return {"statusCode": 200, "body": "No video uploaded"}
注意,这里,print
的地方都改成了logger.info
,因为cloudwatch
输出的场合,print
不起作用。
2.9.2 process_video_lambda
代码
import json
import boto3
sns = boto3.client("sns")
rekognition = boto3.client("rekognition")
import logging
logging.getLogger().setLevel(logging.INFO)
logger = logging.getLogger(__name__)
def lambda_handler(event, context):
logger.info("received event.")
for record in event["Records"]:
# Get the Rekognition job status and job ID from the SNS message
sns_message = json.loads(record["Sns"]["Message"])
rekognition_job_status = sns_message["Status"]
logger.info("rekognition job status is " + rekognition_job_status )
rekognition_job_id = sns_message["JobId"]
if rekognition_job_status == "SUCCEEDED":
try:
# Get celebrity recognition results based on the job ID
celebrity_recognition_result = rekognition.get_celebrity_recognition(
JobId=rekognition_job_id
)
# Process celebrity recognition results
celebrities = celebrity_recognition_result["Celebrities"]
for celebrity in celebrities:
celebrity_name = celebrity["Celebrity"]["Name"]
confidence = celebrity["Celebrity"]["Confidence"]
logger.info(
f"Celebrity Name: {celebrity_name}, Confidence: {confidence}%"
)
except Exception as e:
logger.info(f"Error processing celebrity recognition results: {str(e)}")
elif rekognition_job_status == "FAILED":
# Handle the case where the Rekognition job failed
logger.info(f"Rekognition job {rekognition_job_id} failed.")
else:
# Handle other job status or ignore it as needed
logger.info(
f"Rekognition job {rekognition_job_id} is in status: {rekognition_job_status}"
)
3.部署执行
3.1 测试video
这里找了一个trump
的测试视频。
3.2 开始部署
cdk --require-approval never deploy
3.3 视频分析
3.3.1 上传视频
上传视频。–>测试视频。
3.3.3 检查cloudwatch
上的结果
这里,解析出了三个名人:
- Donald Trump, Confidence: 99.97700500488281%
- Eli Manning, Confidence: 99.83112335205078%
- Darrell Hammond, Confidence: 85.286651611328%
3.3.4 cleanup stack
- 删除
S3 bucket
的文件 clean stack
cdk destroy