【Kafka】Kafka测试的快速实用的例子
1.简介
在本教程中,我们将快速探索一些基本的高级方法,用于测试使用Kafka构建的微服务应用程序。此外,我们将了解以传统/现有测试方式测试Kafka应用程序的声明方式的优势。
对于此处解释的所有内容,我们可以在本文的“结论”部分找到正在运行的代码示例。
为了使教程简明扼要,我们将仅演示以下方面。
- 制造者测试
- 消费者测试
- 挂钩生产者和消费者测试
- 生成RAW记录和JSON记录
- 使用RAW记录和JSON记录
- 传统的测试挑战。
- 声明式测试的优点(IEEE论文)
- 将REST API测试与Kafka测试相结合
- 在Docker中旋转Kafka - 单节点和多节点
在继续本教程之前,我强烈建议您阅读我们需要了解的Kafka测试最低要求。
有关Kafka流以及如何开发流应用程序的更多详细信息,请访问Confluent开发流媒体应用程序教程。
2.Kafka测试挑战
困难的部分是应用程序逻辑的某些部分或数据库过程不断地为主题生成记录,应用程序的另一部分继续使用记录并根据业务规则不断处理它们。
记录,分区,偏移,异常情况等不断变化,使得难以根据测试内容,何时测试以及如何测试进行思考。
3.测试解决方案方法
我们可以采用端到端的测试方法,该方法将验证生产,消费和DLQ记录以及应用程序处理逻辑。 这将使我们有信心将我们的应用程序发布到更高的环境。
我们可以通过在Docker化容器中创建Kafka或将我们的测试指向我们的Kubernetes-Kafka集群或任何其他微服务基础架构中的任何集成测试环境来实现。
在这里,我们选择一个功能,生成所需的记录并验证,使用预期的记录并验证,以及HTTP REST或SOAP API验证,这有助于保持我们的测试更清洁,更少噪音。
4.生产者测试
当我们为主题创建记录时,我们可以验证来自Kafka经纪人的确认。 此验证采用recordMetadata格式。
例如,将“recordMetaData”可视化为JSON将如下所示:
Response from the broker after a successful "produce".
{"recordMetadata": {"offset": 0,"timestamp": 1547760760264,"serializedKeySize": 13,"serializedValueSize": 34,"topicPartition": {"hash": 749715182,"partition": 0, //<--- To which partition the record landed"topic": "demo-topic"}}}
5.消费者测试
当我们从主题中阅读或使用时,我们可以验证从主题中获取的记录。 在这里,我们也可以验证/断言一些元数据,但大多数时候您可能只需要处理记录(而不是元数据)。
例如,有时我们只能验证记录的数量,即只有大小,而不是实际记录
例如,将获取的“记录”可视化为JSON将如下所示:
Records fetched after a successful "consume".{"records": [{"topic": "demo-topic","key": "1547792460796","value": "Hello World 1"},{// ...}]}
包含元数据信息的完整记录看起来就像我们在下面得到的那样,如果我们有测试要求,我们也可以验证/断言。
The fetched records with the metadata from the broker.{"records": [{"topic": "demo-topic","partition": 0,"offset": 3,"key": "1547792460796", //<---- Record key"value": "Hello World", //<---- Record value}]}
6.生产者和消费者测试
在相同的端到端测试中,我们可以对相同的记录执行以下两个步骤:
步骤1:
生成主题“demo-topic”并验证来自代理的receivedrecordMetadata。
例如,使用“key”生成记录:“1234”,“value”:“Hello World”
第2步:
从同一主题“demo-topic”中获取并验证记录。
断言响应中存在相同的记录,即“密钥”:“1234”,“值”:“Hello World”。
如果在我们开始消费之前将它们生成到同一主题,我们可能已经消耗了多条记录。
7.传统测试方式的挑战
第1点
首先,传统风格没有任何问题。但是,对于Kafka经纪人来说,它有一个陡峭的学习曲线来处理。
例如,当我们处理经纪人时,我们需要彻底了解Kafka客户端API,例如: Key-SerDe,Value-SerDe,Time-Outs,同时记录Poolings,commitSyncs,recordTypes等,以及API级别的更多内容。
对于功能测试,我们并不需要在API级别了解这些概念。
第2点
我们的测试代码与客户端API代码紧密结合。这意味着我们在维护测试套件以及测试框架的代码时引入了许多挑战。
8.声明式测试的优点
为了绘制一个明喻,“docker-compose”作品的有趣方式被称为“声明方式”。我们告诉Docker Compose框架(在YAML文件中)在某些端口上启动某些东西,将某些服务链接到其他服务等,框架为我们完成了一些事情。我们也可以用类似的声明方式驱动我们的测试,我们将在下一节中看到。
这有多整洁?试想一下,如果我们必须为相同的重复性任务编写代码/ shell脚本,那将是多么麻烦。
第1点
在声明式样式中,我们可以完全跳过处理代理的API级别,并且只关注测试场景。但是,我们仍然可以灵活地使用Kafka Client API并为其添加自己的风格。
第2点
这有助于发现更多缺陷,因为我们不会花时间编写代码,而是花更多的时间编写测试并覆盖更多的业务场景/用户旅程。
怎么样?
在这里,我们告诉测试使用Kafka-Topic,这是我们的“终点”或“url”
即“url”:“kafka-topic:demo-topic”
接下来,我们告诉测试使用操作“生产”
即“操作”:“生产”
接下来,我们需要将记录发送到请求有效负载:
"request": {"records": [{"key": "KEY-1234","value": "Hello World"}]}
然后,我们告诉测试我们期望响应“status”返回为“Ok”和来自代理的一些记录元数据,即非空值。这是我们测试的“断言”部分。
"assertions": {"status" : "Ok","recordMetadata" : "$NOT.NULL"}
注意:我们甚至可以立即声明所有'recordMetadata,我们将在后面的部分中看到。现在,让我们保持简单并继续。
完成后,我们的完整测试将如下所示:
{"name": "produce_a_record","url": "kafka-topic:demo-topic","operation": "produce","request": {"recordType" : "RAW","records": [{"key": 101,"value": "Hello World"}]},"assertions": {"status": "Ok","recordMetadata": "$NOT.NULL"}}
就是这样。我们完成了测试用例并准备运行。
现在,看看上面的测试,任何人都可以轻松找出正在测试的场景。
注意:
- 我们使用客户端API消除了编码麻烦,以处理Kafka经纪人。
- 我们通过遍历其对象路径,解析请求 - 有效负载,解析响应 - 有效负载等,消除了断言每个字段键/值的编码麻烦。
同时,我们使用框架的JSON比较功能立即断言结果,因此,使测试更容易和更清洁。
我们在测试时逃脱了两个主要的麻烦。
并且,字段的顺序在这里无关紧要。以下代码也是正确的(字段顺序交换)。
"assertions": {"recordMetadata": "$NOT.NULL""status": "Ok",}
9.使用JUnit运行单个测试
这非常容易。 我们只需要将JUnit @Test方法指向JSON文件即可。 真的是这样的。
@TargetEnv("kafka_servers/kafka_test_server.properties")@RunWith(ZeroCodeUnitRunner.class)public class KafkaProduceTest {@Test@JsonTestCase("kafka/produce/test_kafka_produce.json")public void testProduce() throws Exception {// No code is needed here. What?// Where are the 'assertions' gone ?}}
在上面的代码中:
'test_kafka_produce.json'是测试用例,其中包含我们之前讨论过的JSON步骤。
'kafka_test_server.properties'包含“Broker”详细信息和生产者/消费者配置。
'@RunWith(ZeroCodeUnitRunner.class)'是运行测试的JUnit自定义运行器。
此外,我们可以使用Suite runner或Package runner来运行整个测试套件。
请访问这些RAW和JSON示例和说明。
10.写我们的第一个制造者测试
我们在上面的部分中学习了如何生成记录并断言代理响应/确认。
但我们不必止步于此。 我们可以进一步要求我们的测试逐字段断言“recordMetadata”以验证它是否写入了正确“主题”的正确“分区”等等,如下所示。
"assertions": {"status": "Ok","recordMetadata": {"offset": 0, //<--- This is the record 'offset' in the partition"topicPartition": {"partition": 0, //<--- This is the partition number"topic": "demo-topic" //<--- This is the topic name}}}
而已。在上面的“断言”块中,我们完成了预期值与实际值的比较。
注意:比较和断言立即完成。 “断言”块立即与从Kafka Broker收到的实际“status”和“recordMetadata”进行比较。字段的顺序在这里并不重要。如果字段值或结构不匹配,则测试仅失败。
11.编写我们的首次消费者测试
同样,要编写“消费者”测试,我们需要知道:
主题名称'demo-topic'是我们的“终点”,a.k.a。“url”:“url”:“kafka-topic:demo-topic”。
操作,即“消费”:“操作”:“消费”。
在使用主题消息时,我们需要发送如下信息:“request”:{}
上述“请求”意味着什么也不做,只是消费而不做“提交”。
或者我们可以在测试中提及在消费或消费记录后做某些事情。
"request": {"consumerLocalConfigs": {"commitSync": true,"maxNoOfRetryPollsOrTimeouts": 3}}
“commitSync”:true:在这里,我们告诉测试在使用消息之后执行`commitSync`,这意味着,当你下次`poll`时它不再读取消息。它只会在有关主题的情况下阅读新消息。
“maxNoOfRetryPollsOrTimeouts”:3:在这里,我们告诉测试最多显示轮询三次,然后停止轮询。如果我们有更多记录,我们可以将其设置为更大的值。默认值为1。
“pollingTime”:500:在这里,我们告诉测试每次轮询时轮询500毫秒。如果跳过此标志,则默认值为100毫秒。
访问此页面以获取所有可配置密钥 - 来自源代码的ConsumerLocalConfigs。
访问HelloWorld Kafka示例仓库,在家中试用。
注意:这些配置值可以在属性文件中全局设置为所有测试,这意味着它将应用于我们的测试包中的所有测试。此外,我们可以覆盖套件内特定测试或测试的任何配置。因此,它为我们提供了覆盖所有测试场景的灵活性。
好吧,设置这些属性并不是什么大问题,我们必须这样做才能将它们外化。因此,维护起来越简单,对我们来说就越好!但我们必须了解其中的内容。
我们将在接下来的章节中讨论这个问题。
12.将REST API测试与Kafka测试相结合
大多数时候,在微服务架构中,我们使用RESTful服务,SOAP服务(可能是遗留的)和Kafka来构建应用程序。
因此,我们需要涵盖端到端测试场景中的所有API合同验证,包括Kafka。
但这并不是什么大问题,因为毕竟这里没有什么变化,除了我们只是将我们的“url”指向我们的REST或SOAP服务的HTTP端点,然后相应地操纵有效负载/断言块。真的是这样的。
请访问将Kafka测试与REST API测试结合使用,以获得完整的逐步方法。
如果我们有一个用例:
步骤1:Kafka调用 - 我们向“地址主题”发送ID为“id-lon-123”的“地址”记录,该记录最终被处理并写入“地址”数据库(例如Postgres或Hadoop)。然后我们断言经纪人确认。
步骤2:REST调用 - 使用“/ api / v1 / addresses / id-lon-123”查询(GET)“地址”REST API并断言响应。
相应的测试用例如下所示。
{"scenarioName": "Kafka and REST api validation example","steps": [{"name": "produce_to_kafka","url": "kafka-topic:people-address","operation": "produce","request": {"recordType" : "JSON","records": [{"key": "id-lon-123","value": {"id": "id-lon-123","postCode": "UK-BA9"}}]},"assertions": {"status": "Ok","recordMetadata" : "$NOT.NULL"}},{"name": "verify_updated_address","url": "/api/v1/addresses/${$.produce_to_kafka.request.records[0].value.id}","operation": "GET","request": {"headers": {"X-GOVT-API-KEY": "top-key-only-known-to-secu-cleared"}},"assertions": {"status": 200,"value": {"id": "${$.produce_to_kafka.request.records[0].value.id}","postCode": "${$.produce_to_kafka.request.records[0].value.postcode}"}}}]}
易于阅读!容易写!
字段通过JSON路径重用值而不是硬编码。这是一个很好的节省时间!
13.生成RAW记录与JSON记录
在RAW的情况下,我们只是静静地说:
“recordType”:“RAW”,
然后,我们的测试用例如下所示:
{"name": "produce_a_record","url": "kafka-topic:demo-topic","operation": "produce","request": {"recordType" : "RAW","records": [{"key": 101,"value": "Hello World"}]},"assertions": {"status": "Ok","recordMetadata": "$NOT.NULL"}}
2.对于JSON记录,我们以相同的方式提及它:
“recordType”:“JSON”
而且,我们的测试用例如下所示:
{"name": "produce_a_record","url": "kafka-topic:demo-topic","operation": "produce","request": {"recordType" : "JSON","records": [{"key": 101,"value": {"name" : "Jey"}}]},"assertions": {"status": "Ok","recordMetadata": "$NOT.NULL"}}
注意:“value”部分这次有一个JSON记录。
14.Kafka在Docker容器中
理想情况下,本节应该在开头。但是,只是在不知道结果的情况下运行docker-compose文件有什么意义呢?我们可以在这里找到让每个人的生活变得轻松!
我们可以在下面找到docker-compose文件和分步说明。
Docker中的单节点Kafka
Docker中的多节点Kafka集群
15.结论
在本教程中,我们以声明的方式学习了Kafka测试的一些基本方面。此外,我们学习了如何轻松地测试涉及Kafka和REST的微服务。
使用这种方法,我们已经测试并验证了集群Kafka数据管道到Hadoop以及部署在Kubernetes Orchestrated pod中的Http REST / SOAP API。我们发现这种方法非常非常直接并且降低了复杂性,以便将工件维护和推广到更高的环境。
通过这种方法,我们能够完全清晰地涵盖了许多测试场景,并且在开发周期的早期阶段发现了更多缺陷,即使没有编写任何测试代码也是如此。这有助于我们以简单,干净的方式构建和维护我们的回归包。
下面给出了repo GitHub(在家尝试)的这些示例的完整源代码。
要运行任何测试,我们可以直接导航到'src / test / java'下相应的JUnit @Test。在点击任何Junit测试之前,我们需要使用kafka调出Docker。
使用“kafka-schema-registry.yml(参见Wiki)”可以运行所有测试。
- Produce Tests (RAW and JSON)
- Consume Tests (RAW and JSON)
- Produce Records Directly From File
- Consume Records and Dump to File
如果您发现此页面对测试Kafka和HTTP API很有帮助,请在GitHub上留下“明星”!
快乐的测试!
原文:https://dzone.com/articles/a-quick-and-practical-example-of-kafka-testing
- 84 次浏览