TL;DR
接下来介绍如何在您的Spring Boot应用程序中使用Apache Kafka,这将展示如何开始使用Spring Boot和Apache Kafka?,这里我将演示如何在您的Spring Boot应用程序中启用合流模式注册表和Avro序列化格式。
使用Avro模式,可以在微服务应用程序之间建立数据协定。
完整的源代码可以在GitHub上下载。
Version | Date | Date |
v1.0 | 7/31/19 | Initial revision |
先决条件
- Java 8+
- Confluent Platform 5.3 or newer
- Optional: Confluent Cloud account
我们开始写吧
一如既往,我们将从generating a project starter 开始。在这个启动程序中,您应该启用“Spring for Apache Kafka”和“Spring Web starter”
Figure 1. Generate a new project with Spring Initializer.
<project> <dependencies> <!-- other dependencies --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-schema-registry-client</artifactId> (1) <version>5.3.0</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> (2) <version>1.8.2</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> (3) <version>5.2.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-streams-avro-serde</artifactId> <version>5.3.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <!-- other maven repositories the project --> <repository> <id>confluent</id> (4) <url>https://packages.confluent.io/maven/</url> </repository> </repositories> <plugins> <!-- other maven plugins in the project --> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.8.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>src/main/resources/avro</sourceDirectory> (5) <outputDirectory>${project.build.directory}/generated-sources</outputDirectory> <stringType>String</stringType> </configuration> </execution> </executions> </plugin> </plugins> </project>
- Confluent Schema Registry client
- Avro dependency
- Avro SerDes
- Confluent Maven repository
- Source directory where you put your Avro files and store generated Java POJOs
Spring Boot应用程序的体系结构
您的应用程序将包括以下组件:
- use.avsc:一个Avro文件
- SpringAvroApplication.java:应用程序的起点。这个类还包括应用程序正在使用的新主题的配置。
- Producer.java:封装Kafka Producer的组件
- Consumer.java:来自Kafka主题的消息的侦听器
- java:一个RESTful控制器,它接受HTTP命令以便在Kafka主题中发布消息
创建用户Avro文件
{ "namespace": "io.confluent.developer", (1) "type": "record", "name": "User", "fields": [ { "name": "name", "type": "string", "avro.java.string": "String" }, { "name": "age", "type": "int" } ] }
avro maven插件将在io.confluent.developer包中生成用户POJO。此POJO具有名称和年龄属性。
创建Spring Boot应用程序类
@SpringBootApplication public class SpringAvroApplication { @Value("${topic.name}") (1) private String topicName; @Value("${topic.partitions-num}") private Integer partitions; @Value("${topic.replication-factor}") private short replicationFactor; @Bean NewTopic moviesTopic() { (2) return new NewTopic(topicName, partitions, replicationFactor); } public static void main(String[] args) { SpringApplication.run(SpringAvroApplication.class, args); } }
这些是Spring从application.yaml文件中注入的主题参数。
Spring Boot基于提供的配置创建了一个新的Kafka主题。作为应用程序开发人员,您负责创建主题,而不是依赖于自动创建主题,这在生产环境中应该是错误的。
创建生产者组件
@Service @CommonsLog(topic = "Producer Logger") public class Producer { @Value("${topic.name}") (1) private String TOPIC; private final KafkaTemplate<String, User> kafkaTemplate; @Autowired public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2) this.kafkaTemplate = kafkaTemplate; } void sendMessage(User user) { this.kafkaTemplate.send(this.TOPIC, user.getName(), user); (3) log.info(String.format("Produced user -> %s", user)); } }
将从application.yaml中注入主题名。
Spring将使用application.yaml中提供的属性初始化KafkaTemplate。
我们将使用 User 作为主键向主题发送消息。
Spring在应用程序启动期间实例化所有这些组件,应用程序准备好通过REST端点接收消息。默认的HTTP端口是9080,可以在application.yaml配置文件中进行更改。
创建消费者组件
@Service @CommonsLog(topic = "Producer Logger") public class Producer { @Value("${topic.name}") (1) private String TOPIC; private final KafkaTemplate<String, User> kafkaTemplate; @Autowired public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2) this.kafkaTemplate = kafkaTemplate; } void sendMessage(User user) { this.kafkaTemplate.send(this.TOPIC, user.getName(), user); (3) log.info(String.format("Produced user -> %s", user)); } }
主题名称将从application.yaml中注入。
使用@KafkaListener注释,spring kafka框架将实例化一个新的使用者。
创建KafkaController组件
@RestController @RequestMapping(value = "/user") (1) public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { (2) this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) { this.producer.sendMessage(new User(name, age)); (3) } }
kafkaontroller映射到/user HTTP端点。
Spring注入producer组件。
当一个新请求到达/user/publish端点时,生产者将其发送给Kafka。
运行示例
先决条件
提示:在本指南中,我假设您已经安装了Java开发工具包(JDK)。如果你没有,我强烈建议使用SDKMAN!安装它。
您还需要在本地安装合流平台5.3或更新版本。如果你还没有它,跟随汇合平台快速启动。请确保也安装Confluent CLI(请参阅快速入门本节中的步骤4)。
启动Kafka和Schema注册表
confluent local start schema-registry
Confluent CLI提供用于管理本地Confluent平台安装的本地模式。合流CLI以正确的顺序启动每个组件。
您应该在终端中看到类似的输出。
构建和运行Spring Boot应用程序
在examples目录中,运行./mvnw clean package编译并生成一个可运行的JAR。之后,可以运行以下命令:
java-jar目标/kafka-avro-0.0.1-SNAPSHOT.jar
测试生产者/消费者REST服务
为了简单起见,我喜欢使用curl命令,但您可以使用任何REST客户端(如Postman或IntelliJ IDEA中的REST客户端):
curl -X POST -d “name=vik&age=33” http://localhost:9080/user/publish
2019-06-06 22:52:59.485 INFO 28910 --- [nio-9080-exec-1] Producer Logger : Produced user -> {"name": "vik", "age": 33} 2019-06-06 22:52:59.559 INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger : Consumed message -> {"name": "vik", "age": 33}
使用Confluent Cloud运行应用程序
要将此演示应用程序与confluent cloud一起使用,您将需要托管架构注册表的端点和一个API键/密钥。一旦您选择了一个环境,就可以很容易地从confluent的cloud UI中检索到这两者。
必须至少创建一个Kafka群集才能访问托管架构注册表。一旦选择Schema Registry选项,就可以检索端点并创建新的API/secret。
confluent cloud 配置的示例可以在application-Cloud.yaml中找到:
topic: name: users partitions-num: 6 replication-factor: 3 server: port: 9080 spring: kafka: bootstrap-servers: - mybootstrap.confluent.cloud:9092 (1) properties: # CCloud broker connection parameters ssl.endpoint.identification.algorithm: https sasl.mechanism: PLAIN request.timeout.ms: 20000 retry.backoff.ms: 500 sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret"; (2) security.protocol: SASL_SSL # CCloud Schema Registry Connection parameter schema.registry.url: https://schema-registry.aws.confluent.cloud (3) basic.auth.credentials.source: USER_INFO (4) schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key (5) consumer: group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer template: default-topic: logging: level: root: info
- Cloud bootstrap server
- Broker key and secret
- Confluent Cloud Schema Registry URL
- Schema Registry authentication configuration
- Cloud Schema Registry key and secret
- Note: Make sure to replace the dummy login and password information with actual values from your Confluent Cloud account.
要在云模式下运行此应用程序,请激活cloud Spring配置文件。在本例中,Spring Boot将获取application-cloud.yaml配置文件,该文件包含到合流云中数据的连接。
java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar
原文:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/
讨论:请加入知识星球或者微信圈子【首席架构师圈】
- 登录 发表评论
- 182 次浏览
最新内容
- 18 hours ago
- 19 hours ago
- 3 days ago
- 5 days 22 hours ago
- 5 days 22 hours ago
- 5 days 22 hours ago
- 1 week ago
- 1 week ago
- 1 month 1 week ago
- 1 month 2 weeks ago