aws(学习笔记第四十三课) s3_sns_sqs_lambda_chain

文章目录

  • aws(学习笔记第四十三课) s3_sns_sqs_lambda_chain
  • 学习内容:
    • 1. 整体架构
      • 1.1 代码链接
      • 1.2 整体架构
      • 1.3 测试代码需要的修改
        • 1.3.1 `unit test`代码中引入`stack`的修改
        • 1.3.2 `test_outputs_created`代码中把错误的去掉
    • 2. 代码解析
      • 2.1 生成`dead_letter_queue`死信队列和`upload_queue`文件上传队列
      • 2.3 创建`sqs topic`并对其进行订阅`subscribe`
      • 2.4 创建`S3 bucket`
      • 2.5 为`S3 bucket`绑定`SNS Topic`
      • 2.6 创建`lambda`函数
      • 2.7 对`upload_queue`绑定`lambda`函数
      • 2.8 输出`cdk`的`output`
        • 2.8.1 `UploadFileToS3Example`
        • 2.8.2 `UploadSqsQueueUrl`
        • 2.8.3 `LambdaFunctionName`
        • 2.8.4 `LambdaFunctionLogGroupName`
    • 3 执行`cdk`
      • 3.1 执行`cdk`
      • 3.2 执行`cdk`的结果
      • 3.3 `output`检查
      • 3.4 向`S3 bucket`上传`csv`文件
    • 4 使用`unit test`进行`aws cdk`的测试
      • 4.1 `unit test`代码
        • 4.1.1 创建`mock stack`
        • 4.1.2 创建`test`
      • 4.2 进行`unit test`

aws(学习笔记第四十三课) s3_sns_sqs_lambda_chain

  • 使用lambda监视S3 upload event,并使用unit test进行aws cdk的测试

学习内容:

  • 使用dead_letter_queue死信队列
  • 使用sqs的队列
  • 使用unit test进行aws cdk的测试

1. 整体架构

1.1 代码链接

代码连接(s3_sns_sqs_lambda_chain)

1.2 整体架构

在这里插入图片描述

1.3 测试代码需要的修改

1.3.1 unit test代码中引入stack的修改
import aws_cdk as cdk
import pytestfrom s3_sns_sqs_lambda_chain.s3_sns_sqs_lambda_chain_stack import S3SnsSqsLambdaChainStack 

s3_sns_sqs_lambda_chain这里如果不加上,那么会报错。

1.3.2 test_outputs_created代码中把错误的去掉
def test_outputs_created(template):"""Test for CloudFormation Outputs- Upload File To S3 Instructions- Queue Url- Lambda Function Name- Lambda Function Log Group Name"""# template.hastemplate.has_output("UploadFileToS3Example", {})template.has_output("UploadSqsQueueUrl", {})template.has_output("LambdaFunctionName", {})template.has_output("LambdaFunctionLogGroupName", {})

template.has这里要去掉,如果不去掉就会执行错误。

2. 代码解析

2.1 生成dead_letter_queue死信队列和upload_queue文件上传队列

# Note: A dead-letter queue is optional but it helps capture any failed messagesdlq = sqs.Queue(self,id="dead_letter_queue_id",retention_period=Duration.days(7))dead_letter_queue = sqs.DeadLetterQueue(max_receive_count=1,queue=dlq)upload_queue = sqs.Queue(self,id="sample_queue_id",dead_letter_queue=dead_letter_queue,visibility_timeout = Duration.seconds(LAMBDA_TIMEOUT * 6))

在这里插入图片描述

2.3 创建sqs topic并对其进行订阅subscribe

sqs_subscription = sns_subs.SqsSubscription(upload_queue,raw_message_delivery=True)upload_event_topic = sns.Topic(self,id="sample_sns_topic_id")# This binds the SNS Topic to the SQS Queueupload_event_topic.add_subscription(sqs_subscription)

在这里插入图片描述

2.4 创建S3 bucket

    # Note: Lifecycle Rules are optional but are included here to keep costs#       low by cleaning up old files or moving them to lower cost storage optionss3_bucket = s3.Bucket(self,id="sample_bucket_id",block_public_access=s3.BlockPublicAccess.BLOCK_ALL,versioned=True,lifecycle_rules=[s3.LifecycleRule(enabled=True,expiration=Duration.days(365),transitions=[s3.Transition(storage_class=s3.StorageClass.INFREQUENT_ACCESS,transition_after=Duration.days(30)),s3.Transition(storage_class=s3.StorageClass.GLACIER,transition_after=Duration.days(90)),])])

这里考虑成本,所以30days之后s3.StorageClass.INFREQUENT_ACCESS,所以90days之后s3.StorageClass.GLACIER

2.5 为S3 bucket绑定SNS Topic

    # Note: If you don't specify a filter all uploads will trigger an event.#       Also, modifying the event type will handle other object operations# This binds the S3 bucket to the SNS Topics3_bucket.add_event_notification(s3.EventType.OBJECT_CREATED_PUT,s3n.SnsDestination(upload_event_topic),s3.NotificationKeyFilter(prefix="uploads", suffix=".csv"))

在这里插入图片描述

2.6 创建lambda函数

function = _lambda.Function(self, "lambda_function",runtime=_lambda.Runtime.PYTHON_3_9,handler="lambda_function.handler",code=_lambda.Code.from_asset(path=lambda_dir),timeout = Duration.seconds(LAMBDA_TIMEOUT))

lambda函数的实现:

def handler(event, context):# output event to logsprint(event)return {'statusCode': 200,'body': event}

在这里插入图片描述

2.7 对upload_queue绑定lambda函数

# This binds the lambda to the SQS Queueinvoke_event_source = lambda_events.SqsEventSource(upload_queue)function.add_event_source(invoke_event_source)

在这里插入图片描述

2.8 输出cdkoutput

2.8.1 UploadFileToS3Example

这里创建了一个CfnOutput,用于输出上传本地文件到S3 bucket上的示例command

 CfnOutput(self,"UploadFileToS3Example",value="aws s3 cp <local-path-to-file> s3://{}/".format(s3_bucket.bucket_name),description="Upload a file to S3 (using AWS CLI) to trigger the SQS chain",)
2.8.2 UploadSqsQueueUrl

这里输出一个Output,用于表示upload_queuequeue_url

CfnOutput(self,"UploadSqsQueueUrl",value=upload_queue.queue_url,description="Link to the SQS Queue triggered on S3 uploads",)
2.8.3 LambdaFunctionName

这里输出一个Output,用于表示lambda的函数名。

    CfnOutput(self,"LambdaFunctionName",value=function.function_name,)
2.8.4 LambdaFunctionLogGroupName

这里输出一个Output,用于表示lambda函数的log输出的log group

    CfnOutput(self,"LambdaFunctionLogGroupName",value=function.log_group.log_group_name,)

3 执行cdk

3.1 执行cdk

python -m venv .venv
source ./.venv/Scripts/activate
pip install -r requirement.txt
cdk --require-approval never deploy

3.2 执行cdk的结果

在这里插入图片描述

3.3 output检查

在这里插入图片描述

3.4 向S3 bucket上传csv文件

在这里插入图片描述

aws s3 cp ./uploads_bbb.csv s3://s3-sns-sqs-lambda-stack-samplebucketideae240bf-l2je2slhprg2/

在这里插入图片描述
这里,每次上传csv文件都会触发lambda函数调用。

4 使用unit test进行aws cdk的测试

4.1 unit test代码

4.1.1 创建mock stack
@pytest.fixture
def template():"""Generate a mock stack that embeds the orchestrator construct for testing"""script_dir = pathlib.Path(__file__).parentlambda_dir = str(script_dir.joinpath("..", "..", "lambda"))app = cdk.App()stack = S3SnsSqsLambdaChainStack(app,"s3-sns-sqs-lambda-stack",lambda_dir=lambda_dir)return cdk.assertions.Template.from_stack(stack)
4.1.2 创建test

测试topic关联

def test_sns_topic_created(template):"""Test for SNS Topic and Subscription: S3 Uploadvent Notification"""template.resource_count_is("AWS::SNS::Subscription", 1)template.resource_count_is("AWS::SNS::Topic", 1)template.resource_count_is("AWS::SNS::TopicPolicy", 1)

测试queue关联

def test_sqs_queue_created(template):"""Test for SQS Queue:- Queue to process uploads- Dead-letter Queue"""template.resource_count_is("AWS::SQS::Queue", 2)template.resource_count_is("AWS::SQS::QueuePolicy", 1)

测试lambda关联

def test_lambdas_created(template):"""Test for Lambdas created:- Sample Lambda- Bucket Notification Handler (automatically provisioned)- Log Retention (automatically provisioned)"""template.resource_count_is("AWS::Lambda::Function", 3)template.resource_count_is("AWS::Lambda::EventSourceMapping", 1)

测试output关联

def test_outputs_created(template):"""Test for CloudFormation Outputs- Upload File To S3 Instructions- Queue Url- Lambda Function Name- Lambda Function Log Group Name"""template.has_output("UploadFileToS3Example", {})template.has_output("UploadSqsQueueUrl", {})template.has_output("LambdaFunctionName", {})template.has_output("LambdaFunctionLogGroupName", {})

4.2 进行unit test

  • 进行依赖包的安装
pip install -r requirements-dev.txt

在这里插入图片描述

  • 进行unit test
pytest

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82889.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python训练营打卡Day43

kaggle找到一个图像数据集&#xff0c;用cnn网络进行训练并且用grad-cam做可视化 进阶&#xff1a;并拆分成多个文件 config.py import os# 基础配置类 class Config:def __init__(self):# Kaggle配置self.kaggle_username "" # Kaggle用户名self.kaggle_key &quo…

hive 3集成Iceberg 1.7中的Java版本问题

hive 3.1.3 集成iceberg 1.7.2创建Iceberg表报错如下&#xff1a; Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/iceberg/mr/hive/HiveIcebergStorageHandler has been compiled by a more recent version of the Java Runtime …

文本切块技术(Splitter)

为什么要分块&#xff1f; 将长文本分解成适当大小的片段&#xff0c;以便于嵌入、索引和存储&#xff0c;并提高检索的精确度。 用ChunkViz工具可视化分块 在线使用 ChunkViz github https://github.com/gkamradt/ChunkViz 如何确定大模型所能接受的最长上下文 可以从…

C++:用 libcurl 发送一封带有附件的邮件

编写mingw C 程序&#xff0c;用 libcurl 发送一封带有附件的邮件 下面是一个使用 MinGW 编译的 C 程序&#xff0c;使用 libcurl 发送带附件的邮件。这个程序完全通过代码实现 SMTP 邮件发送&#xff0c;不依赖外部邮件客户端&#xff1a; // send_email.cpp #include <i…

tensorflow image_dataset_from_directory 训练数据集构建

以数据集 https://www.kaggle.com/datasets/vipoooool/new-plant-diseases-dataset 为例 目录结构 训练图像数据集要求&#xff1a; 主目录下包含多个子目录&#xff0c;每个子目录代表一个类别。每个子目录中存储属于该类别的图像文件。 例如 main_directory/ ...cat/ ...…

遨游Spring AI:第一盘菜Hello World

Spring AI的正式版已经发布了&#xff0c;很显然&#xff0c;接下来我们要做的事情就是写一个Hello World。 总体思路就是在本地搭建一个简单的大模型&#xff0c;然后编写Spring AI代码与模型进行交互。 分五步&#xff1a; 1. 安装Ollama&#xff1b; 2. 安装DeepSeek&…

华为云Flexus+DeepSeek征文|基于华为云Flexus X和DeepSeek-R1打造个人知识库问答系统

目录 前言 1 快速部署&#xff1a;一键搭建Dify平台 1.1 部署流程详解 1.2 初始配置与登录 2 构建专属知识库 2.1 进入知识库模块并创建新库 2.2 选择数据源导入内容 2.3 上传并识别多种文档格式 2.4 文本处理与索引构建 2.5 保存并完成知识库创建 3接入ModelArts S…

Java优化:双重for循环

在工作中&#xff0c;经常性的会出现在两张表中查找相同ID的数据&#xff0c;许多开发者会使用两层for循环嵌套&#xff0c;虽然实现功能没有问题&#xff0c;但是效率极低&#xff0c;一下是一个简单的优化过程&#xff0c;代码耗时凑从26856ms优化到了748ms。 功能场景 有两…

Prompt Tuning:生成的模型文件有什么构成

一、为什么Prompt Tuning会生成模型文件? 1. Prompt Tuning的本质:优化可训练的「提示参数」 核心逻辑:Prompt Tuning(提示调优)是一种轻量级的微调技术,仅优化模型输入层的提示向量(Prompt Embedding)或少量额外参数,而非更新整个预训练模型的权重。生成模型文件的原…

ARM SMMUv3简介(一)

1.概述 SMMU&#xff08;System Memory Management Unit&#xff0c;系统内存管理单元&#xff09;是ARM架构中用于管理设备访问系统内存的硬件模块。SMMU和MMU的功能类似&#xff0c;都是将虚拟地址转换成物理地址&#xff0c;不同的是MMU转换的虚拟地址来自CPU&#xff0c;S…

在 Windows 系统上运行 Docker 容器中的 Ubuntu 镜像并显示 GUI

在 Windows 上安装一个 X Server&#xff08;如 VcXsrv 或 X410&#xff09;&#xff0c;Ubuntu 容器通过网络将图形界面转发到 Windows。 步骤&#xff1a; 安装 X Server&#xff1a; 推荐使用VcXsrv&#xff0c;免费开源。 安装后运行 XLaunch&#xff0c;选择&#xff1…

Vue3学习(4)- computed的使用

1. 简述与使用 作用&#xff1a;computed 用于基于响应式数据派生出新值&#xff0c;其值会自动缓存并在依赖变化时更新。 ​缓存机制​&#xff1a;依赖未变化时直接返回缓存值&#xff0c;避免重复计算&#xff08;通过 _dirty 标志位实现&#xff09;。​响应式更新​&…

【HarmonyOS 5】出行导航开发实践介绍以及详细案例

以下是 ‌HarmonyOS 5‌ 出行导航的核心能力详解&#xff08;无代码版&#xff09;&#xff0c;聚焦智能交互、多端协同与场景化创新&#xff1a; 一、交互革新&#xff1a;从被动响应到主动服务 ‌意图驱动导航‌ ‌自然语义理解‌&#xff1a;用户通过语音指令&#xff08;如…

csrf攻击学习

原理 csrf又称跨站伪造请求攻击&#xff0c;现代网站利用Cookie、Session 或 Token 等机制识别用户身份&#xff0c;一旦用户访问某个网站&#xff0c;浏览器在之后请求会自动带上这些信息来识别用户身份。用户在网站进行请求或者操作时服务器会给出对应的内容&#xff0c;比如…

深入剖析MySQL锁机制,多事务并发场景锁竞争

一、隐藏字段对 InnoDB 的行锁&#xff08;Record Lock&#xff09;与间隙锁&#xff08;Gap Lock&#xff09;的影响 1. 隐藏字段与锁的三大核心影响 类型影响维度描述DB_TRX_IDMVCC 可见性控制决定是否读取当前版本&#xff0c;或在加锁时避开不可见版本&#xff08;影响加锁…

以SMMUv2为例,使用Trace32可视化操作SMMU的常用命令详解

Trace32支持一系列的SMMU命令&#xff0c;可以帮助用户更好地配置、查看和分析SMMU。换句话说&#xff0c;就是让SMMU的配置变得可视化。 在添加SMMU实例之前&#xff0c;需要选择一个CPU来激活该SMMU实例的相关命令。Trace32让SMMU的配置可视化的本质是&#xff0c;操纵CPU读取…

将数据库表导出为C#实体对象

数据库方式 use 数据库;declare TableName sysname 表名 declare Result varchar(max) /// <summary> /// TableName /// </summary> public class TableName {select Result Result /// <summary>/// CONVERT(NVARCHAR(500), ISNULL(ColN…

CSS 预处理器与工具

目录 CSS 预处理器与工具1. Less主要特性 2. Sass/SCSS主要特性 3. Tailwind CSS主要特性 4. 其他工具PostCSSCSS Modules 5. 选择建议 CSS 预处理器与工具 1. Less Less 是一个 CSS 预处理器&#xff0c;它扩展了 CSS 语言&#xff0c;添加了变量、嵌套规则、混合&#xff0…

this.$set() 的用法详解(Vue响应式系统相关)

1. 什么是 this.$set()&#xff1f; this.$set(target, key, value) 是 Vue 2 中提供的一个方法&#xff0c;用于向响应式对象中动态添加属性&#xff0c;确保新加的属性同样是响应式的。 2. 为什么需要它&#xff1f; Vue 2 的响应式系统基于 Object.defineProperty&#…

【HarmonyOS Next之旅】DevEco Studio使用指南(三十)

目录 1 -> 部署云侧工程 2 -> 通过CloudDev面板获取云开发资源支持 3 -> 通用云开发模板 3.1 -> 适用范围 3.2 -> 效果图 4 -> 总结 1 -> 部署云侧工程 可以选择在云函数和云数据库全部开发完成后&#xff0c;将整个云工程资源统一部署到AGC云端。…