TL;DR
接下来介绍如何在您的Spring Boot应用程序中使用Apache Kafka,这将展示如何开始使用Spring Boot和Apache Kafka?,这里我将演示如何在您的Spring Boot应用程序中启用合流模式注册表和Avro序列化格式。
使用Avro模式,可以在微服务应用程序之间建立数据协定。
完整的源代码可以在GitHub上下载。
| Version | Date | Date |
| v1.0 | 7/31/19 | Initial revision |
先决条件
- Java 8+
- Confluent Platform 5.3 or newer
- Optional: Confluent Cloud account
我们开始写吧
一如既往,我们将从generating a project starter 开始。在这个启动程序中,您应该启用“Spring for Apache Kafka”和“Spring Web starter”

Figure 1. Generate a new project with Spring Initializer.
<project>
<dependencies>
<!-- other dependencies -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId> (1)
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> (2)
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId> (3)
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
<!-- other maven repositories the project -->
<repository>
<id>confluent</id> (4)
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<plugins>
<!-- other maven plugins in the project -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory> (5)
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</project>
- Confluent Schema Registry client
- Avro dependency
- Avro SerDes
- Confluent Maven repository
- Source directory where you put your Avro files and store generated Java POJOs
Spring Boot应用程序的体系结构
您的应用程序将包括以下组件:
- use.avsc:一个Avro文件
- SpringAvroApplication.java:应用程序的起点。这个类还包括应用程序正在使用的新主题的配置。
- Producer.java:封装Kafka Producer的组件
- Consumer.java:来自Kafka主题的消息的侦听器
- java:一个RESTful控制器,它接受HTTP命令以便在Kafka主题中发布消息
创建用户Avro文件
{
"namespace": "io.confluent.developer", (1)
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string",
"avro.java.string": "String"
},
{
"name": "age",
"type": "int"
}
]
}
avro maven插件将在io.confluent.developer包中生成用户POJO。此POJO具有名称和年龄属性。
创建Spring Boot应用程序类
@SpringBootApplication
public class SpringAvroApplication {
@Value("${topic.name}") (1)
private String topicName;
@Value("${topic.partitions-num}")
private Integer partitions;
@Value("${topic.replication-factor}")
private short replicationFactor;
@Bean
NewTopic moviesTopic() { (2)
return new NewTopic(topicName, partitions, replicationFactor);
}
public static void main(String[] args) {
SpringApplication.run(SpringAvroApplication.class, args);
}
}
这些是Spring从application.yaml文件中注入的主题参数。
Spring Boot基于提供的配置创建了一个新的Kafka主题。作为应用程序开发人员,您负责创建主题,而不是依赖于自动创建主题,这在生产环境中应该是错误的。
创建生产者组件
@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {
@Value("${topic.name}") (1)
private String TOPIC;
private final KafkaTemplate<String, User> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2)
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(User user) {
this.kafkaTemplate.send(this.TOPIC, user.getName(), user); (3)
log.info(String.format("Produced user -> %s", user));
}
}
将从application.yaml中注入主题名。
Spring将使用application.yaml中提供的属性初始化KafkaTemplate。
我们将使用 User 作为主键向主题发送消息。
Spring在应用程序启动期间实例化所有这些组件,应用程序准备好通过REST端点接收消息。默认的HTTP端口是9080,可以在application.yaml配置文件中进行更改。
创建消费者组件
@Service
@CommonsLog(topic = "Producer Logger")
public class Producer {
@Value("${topic.name}") (1)
private String TOPIC;
private final KafkaTemplate<String, User> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<String, User> kafkaTemplate) { (2)
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(User user) {
this.kafkaTemplate.send(this.TOPIC, user.getName(), user); (3)
log.info(String.format("Produced user -> %s", user));
}
}
主题名称将从application.yaml中注入。
使用@KafkaListener注释,spring kafka框架将实例化一个新的使用者。
创建KafkaController组件
@RestController
@RequestMapping(value = "/user") (1)
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) { (2)
this.producer = producer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("name") String name, @RequestParam("age") Integer age) {
this.producer.sendMessage(new User(name, age)); (3)
}
}
kafkaontroller映射到/user HTTP端点。
Spring注入producer组件。
当一个新请求到达/user/publish端点时,生产者将其发送给Kafka。
运行示例
先决条件
提示:在本指南中,我假设您已经安装了Java开发工具包(JDK)。如果你没有,我强烈建议使用SDKMAN!安装它。
您还需要在本地安装合流平台5.3或更新版本。如果你还没有它,跟随汇合平台快速启动。请确保也安装Confluent CLI(请参阅快速入门本节中的步骤4)。
启动Kafka和Schema注册表
confluent local start schema-registry
Confluent CLI提供用于管理本地Confluent平台安装的本地模式。合流CLI以正确的顺序启动每个组件。

您应该在终端中看到类似的输出。
构建和运行Spring Boot应用程序
在examples目录中,运行./mvnw clean package编译并生成一个可运行的JAR。之后,可以运行以下命令:
java-jar目标/kafka-avro-0.0.1-SNAPSHOT.jar
测试生产者/消费者REST服务
为了简单起见,我喜欢使用curl命令,但您可以使用任何REST客户端(如Postman或IntelliJ IDEA中的REST客户端):
curl -X POST -d “name=vik&age=33” http://localhost:9080/user/publish

2019-06-06 22:52:59.485 INFO 28910 --- [nio-9080-exec-1] Producer Logger : Produced user -> {"name": "vik", "age": 33}
2019-06-06 22:52:59.559 INFO 28910 --- [ntainer#0-0-C-1] Consumer Logger : Consumed message -> {"name": "vik", "age": 33}
使用Confluent Cloud运行应用程序
要将此演示应用程序与confluent cloud一起使用,您将需要托管架构注册表的端点和一个API键/密钥。一旦您选择了一个环境,就可以很容易地从confluent的cloud UI中检索到这两者。

必须至少创建一个Kafka群集才能访问托管架构注册表。一旦选择Schema Registry选项,就可以检索端点并创建新的API/secret。

confluent cloud 配置的示例可以在application-Cloud.yaml中找到:
topic:
name: users
partitions-num: 6
replication-factor: 3
server:
port: 9080
spring:
kafka:
bootstrap-servers:
- mybootstrap.confluent.cloud:9092 (1)
properties:
# CCloud broker connection parameters
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="ccloud_key" password="ccloud_secret"; (2)
security.protocol: SASL_SSL
# CCloud Schema Registry Connection parameter
schema.registry.url: https://schema-registry.aws.confluent.cloud (3)
basic.auth.credentials.source: USER_INFO (4)
schema.registry.basic.auth.user.info: sr_ccloud_key:sr_ccloud_key (5)
consumer:
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
template:
default-topic:
logging:
level:
root: info
- Cloud bootstrap server
- Broker key and secret
- Confluent Cloud Schema Registry URL
- Schema Registry authentication configuration
- Cloud Schema Registry key and secret
- Note: Make sure to replace the dummy login and password information with actual values from your Confluent Cloud account.
要在云模式下运行此应用程序,请激活cloud Spring配置文件。在本例中,Spring Boot将获取application-cloud.yaml配置文件,该文件包含到合流云中数据的连接。
java -jar -Dspring.profiles.active=cloud target/kafka-avro-0.0.1-SNAPSHOT.jar
原文:https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/
讨论:请加入知识星球或者微信圈子【首席架构师圈】
- 登录 发表评论
- 192 次浏览