这篇博客文章展示了如何配置Spring Kafka和Spring Boot来使用JSON发送消息并以多种格式接收它们:JSON、纯字符串或字节数组。基于此配置,您还可以将Kafka生产者从发送JSON切换到其他序列化方法。
这个示例应用程序还演示了在同一个使用者组中使用三个Kafka使用者,因此消息在这三个使用者组之间是负载平衡的。每个使用者实现不同的反序列化方法。
此外,在这篇文章的最后,你会发现一些实践练习,以防你想掌握一些Kafka的概念,如消费者群体和主题划分。
一个消费群中有多个消费者
逻辑视图
为了更好地理解配置,请看下图。如您所见,我们创建了一个包含三个分区的Kafka主题。在使用者端,只有一个应用程序,但它使用相同的group.id属性实现了三个Kafka使用者。这是将它们放在同一个Kafka消费者组中所需的配置。
当我们启动应用程序时,Kafka为每个使用者分配一个不同的分区。此使用者组将以负载平衡的方式接收消息。在这篇文章的后面,您将看到如果我们使它们具有不同的组标识符(如果您熟悉Kafka,您可能知道结果)会有什么不同。
示例用例
我们要建立的逻辑很简单。每次我们调用一个给定的REST端点hello时,应用程序将生成一个可配置数量的消息,并将它们发送到同一主题,使用一个序列号作为Kafka键。它将等待(使用倒计时闩锁)所有消息被消费,然后返回消息,你好Kafka!。将有三个使用者,每个使用者使用不同的反序列化机制,当他们接收到新消息时,这将减少闩锁计数。
轻松,对吧?让我们看看如何构建它。
设置Spring Boot和Kafka
启动Kafka
首先,需要有一个正在运行的Kafka集群来连接。对于这个应用程序,我将使用docker compose和Kafka在单个节点中运行。这显然远远不是一个生产配置,但对于本文的目标来说已经足够了。
docker-compose.yml文件
version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
注意,我将Kafka配置为不自动创建主题。我们将从Spring Boot应用程序创建主题,因为我们无论如何都要传递一些自定义配置。如果你想玩这些Docker图片(例如,使用多个节点),看看wurstmeister/zookeeper图片文档。
要启动Kafka和Zookeeper容器,只需从这个文件所在的文件夹运行docker compose up
基本的Spring Boot和Kafka应用程序
弹簧初始化器Kafka
获取应用程序框架的最简单方法是导航到start.spring.io,填写项目的基本详细信息并选择Kafka作为依赖项。然后,下载zip文件并使用您喜爱的IDE加载源代码。
让我们使用YAML进行配置。您可能需要将src/main/java/resources中的application.properties文件重命名为application.yml。以下是我们将用于此示例应用程序的配置值:
spring: kafka: consumer: group-id: tpd-loggers auto-offset-reset: earliest # change this property if you are using your own # Kafka cluster or your Docker IP is different bootstrap-servers: localhost:9092 tpd: topic-name: advice-topic messages-per-request: 10
第一个属性块是Spring Kafka配置:
用户默认使用的group id。
auto offset reset属性设置为earliest,这意味着当该使用者没有现有的偏移量时,使用者将从最早的可用偏移量开始读取消息。
用于连接到Kafka的服务器,在本例中,如果使用单节点配置,则是唯一可用的服务器。请注意,如果使用默认值localhost:9092,则此属性是多余的。
第二个块是特定于应用程序的。我们定义了Kafka主题名和每次执行HTTP REST请求时要发送的消息数。
消息类
这是我们将用作Kafka消息的Java类。这里没有什么复杂的东西,只是一个在构造函数参数中带有@JsonProperty注释的不可变类,这样Jackson可以正确地反序列化它。
PracticalAdvice class
package io.tpd.kafkaexample; import com.fasterxml.jackson.annotation.JsonProperty; public class PracticalAdvice { private final String message; private final int identifier; public PracticalAdvice(@JsonProperty("message") final String message, @JsonProperty("identifier") final int identifier) { this.message = message; this.identifier = identifier; } public String getMessage() { return message; } public int getIdentifier() { return identifier; } @Override public String toString() { return "PracticalAdvice::toString() {" + "message='" + message + '\'' + ", identifier=" + identifier + '}'; } }
Spring Boot中的Kafka Producer配置
为了保持应用程序简单,我们将在主Spring Boot类中添加配置。最后,我们希望在这里包括生产者和消费者配置,并使用三种不同的变体进行反序列化。请记住,您可以在GitHub存储库中找到完整的源代码。
首先,让我们关注一下生产者配置。
Spring Boot Kafka Producer
@SpringBootApplication public class KafkaExampleApplication { public static void main(String[] args) { SpringApplication.run(KafkaExampleApplication.class, args); } @Autowired private KafkaProperties kafkaProperties; @Value("${tpd.topic-name}") private String topicName; // Producer configuration @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public NewTopic adviceTopic() { return new NewTopic(topicName, 3, (short) 1); } }
在此配置中,我们将设置应用程序的两个部分:
KafkaTemplate实例,我们将使用它向Kafka发送消息。我们不想使用默认版本,所以需要在Spring的应用程序上下文中注入自定义版本。
我们键入(使用泛型)KafkaTemplate,使其具有一个纯字符串键和一个对象作为值。将对象作为值的原因是我们希望使用同一模板发送多个对象类型。KafkaTemplate接受我们在配置中创建的ProducerFactory作为参数。
我们使用的ProducerFactory是默认的,但是我们需要在这里显式配置,因为我们想将自定义的producer配置传递给它。
生产者配置是一个简单的键值映射。我们使用@Autowired注入默认属性以获得Kafka properties bean,然后构建传递生产者默认值的映射,并重写默认的Kafka键和值序列化器。生产者将使用Kafka库的StringSerializer将键序列化为字符串,并对值执行相同的操作,但这次使用JSON和JsonSerializer,在本例中由springkafka提供。
我们要用的
Kafka主题。通过注入一个NewTopic实例,我们指示Kafka的AdminClient bean(已经在上下文中)使用给定的配置创建一个主题。第一个参数是名称(来自应用程序配置的建议主题),第二个参数是分区数(3),第三个参数是复制因子(1,因为我们使用的是单个节点)。
关于Java的Kafka序列化器和反序列化器
核心Kafka库(javadoc)中提供了一些基本的序列化程序,用于字符串、各种数字类和字节数组,以及springkafka(javadoc)提供的JSON序列化程序。
除此之外,您还可以通过实现序列化程序或扩展序列化程序或其相应的反序列化版本来创建自己的序列化程序和反序列化程序。这给了您很大的灵活性,以优化通过
Kafka的数据量,以防您需要这样做。正如您在这些接口中看到的,Kafka使用的是纯字节数组,因此,最终,不管您使用的是哪种复杂类型,都需要将其转换为字节[]。
知道了这一点,您可能会想知道为什么有人会想在Kafka中使用JSON。由于要将对象转换为JSON,然后再转换为字节数组,因此效率相当低。但你必须考虑这样做的两个主要优点:
JSON对于人类来说比字节数组更具可读性。如果你想调试或分析你的
Kafka主题的内容,它将比看裸字节简单得多。
JSON是一个标准,而默认字节数组序列化器依赖于编程语言实现。因此,如果要使用来自多个编程语言的消息,则需要在所有这些语言中复制(反)序列化程序逻辑。
另一方面,如果您担心Kafka中的流量负载、存储或(反)序列化的速度,则可能需要选择字节数组,甚至选择自己的序列化器/反序列化器实现。
使用Spring Boot和Kafka发送消息
按照计划,我们创建一个Rest控制器,并在请求端点时使用注入的KafkaTemplate生成一些JSON消息。
这是控制器的第一个实现,只包含生成消息的逻辑。
Hellokafka控制器
@RestController public class HelloKafkaController { private static final Logger logger = LoggerFactory.getLogger(HelloKafkaController.class); private final KafkaTemplate<String, Object> template; private final String topicName; private final int messagesPerRequest; private CountDownLatch latch; public HelloKafkaController( final KafkaTemplate<String, Object> template, @Value("${tpd.topic-name}") final String topicName, @Value("${tpd.messages-per-request}") final int messagesPerRequest) { this.template = template; this.topicName = topicName; this.messagesPerRequest = messagesPerRequest; } @GetMapping("/hello") public String hello() throws Exception { latch = new CountDownLatch(messagesPerRequest); IntStream.range(0, messagesPerRequest) .forEach(i -> this.template.send(topicName, String.valueOf(i), new PracticalAdvice("A Practical Advice", i)) ); latch.await(60, TimeUnit.SECONDS); logger.info("All messages received"); return "Hello Kafka!"; } }
在构造函数中,我们传递一些配置参数和自定义的KafkaTemplate,以发送字符串键和JSON值。然后,当API客户机请求/hello端点时,我们发送10条消息(这是配置值),然后阻塞线程最多60秒。如您所见,Kafka使用者还没有实现来减少锁存计数。锁销解锁后,我们会返回信息Hello Kafka!给我们的客户。
整个锁的思想并不是一个在实际应用程序中可以看到的模式,但是对于这个例子来说,这是很好的。这样,您就可以检查收到的消息数。如果你愿意的话,你可以取下门闩并返回“你好,
Kafka!“在收到消息之前发送消息。
Kafka消费者配置
正如前面在这篇文章中提到的,我们想演示使用Spring Boot和Spring Kafka进行反序列化的不同方法,同时,看看当多个消费者是同一个消费者组的一部分时,他们是如何以负载平衡的方式工作的。
Spring Boot Kafka configuration - Consumer
@SpringBootApplication public class KafkaExampleApplication { public static void main(String[] args) { SpringApplication.run(KafkaExampleApplication.class, args); } @Autowired private KafkaProperties kafkaProperties; @Value("${tpd.topic-name}") private String topicName; // Producer configuration // omitted... // Consumer configuration // If you only need one kind of deserialization, you only need to set the // Consumer configuration properties. Uncomment this and remove all others below. // @Bean // public Map<String, Object> consumerConfigs() { // Map<String, Object> props = new HashMap<>( // kafkaProperties.buildConsumerProperties() // ); // props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, // StringDeserializer.class); // props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, // JsonDeserializer.class); // props.put(ConsumerConfig.GROUP_ID_CONFIG, // "tpd-loggers"); // // return props; // } @Bean public ConsumerFactory<String, Object> consumerFactory() { final JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(); jsonDeserializer.addTrustedPackages("*"); return new DefaultKafkaConsumerFactory<>( kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer ); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } // String Consumer Configuration @Bean public ConsumerFactory<String, String> stringConsumerFactory() { return new DefaultKafkaConsumerFactory<>( kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new StringDeserializer() ); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerStringContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(stringConsumerFactory()); return factory; } // Byte Array Consumer Configuration @Bean public ConsumerFactory<String, byte[]> byteArrayConsumerFactory() { return new DefaultKafkaConsumerFactory<>( kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new ByteArrayDeserializer() ); } @Bean public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerByteArrayContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(byteArrayConsumerFactory()); return factory; } }
这个配置看起来可能是扩展的,但是考虑到,为了演示这三种反序列化,我们已经重复了三次ConsumerFactory和KafkaListenerContainerFactory实例的创建,以便我们可以在消费者中在它们之间切换。
配置使用者的基本步骤是:
[略]以与我们为生产商所做的类似的方式设置消费者属性。我们可以跳过这一步,因为我们需要的唯一配置是在Spring Boot属性文件中指定的组ID,以及键和值反序列化器,我们将在创建自定义的使用者和KafkaListener工厂时覆盖它们。如果您只需要一个配置,也就是说始终需要相同类型的键和值反序列化程序,那么您只需要这个带注释的代码块。将反序列化程序类型调整为要使用的类型。
创建要由KafkaListenerContainerFactory使用的ConsumerFactory。我们创建了三个,将每种情况下的值反序列化器切换为1)JSON反序列化器,2)字符串反序列化器和3)字节数组反序列化器。
注意,在创建JSON反序列化程序之后,我们将包含一个额外的步骤来指定我们信任所有包。如果需要,可以在应用程序中对此进行微调。如果不这样做,我们将收到一条错误消息,内容如下:java.lang.IllegalArgumentException:类[]不在受信任的包中。
使用先前配置的使用者工厂构造Kafka侦听器容器工厂(并发工厂)。同样,我们这样做三次,每个实例使用不同的一个。
接收JSON、String和byte[]格式的Spring Boot和Kafka消息
是时候展示一下
Kafka的消费者是什么样子了。我们将使用@KafkaListener注释,因为它简化了过程并负责对传递的Java类型进行反序列化。
Kafka listeners
@RestController public class HelloKafkaController { private static final Logger logger = LoggerFactory.getLogger(HelloKafkaController.class); private final KafkaTemplate<String, Object> template; private final String topicName; private final int messagesPerRequest; private CountDownLatch latch; public HelloKafkaController( final KafkaTemplate<String, Object> template, @Value("${tpd.topic-name}") final String topicName, @Value("${tpd.messages-per-request}") final int messagesPerRequest) { this.template = template; this.topicName = topicName; this.messagesPerRequest = messagesPerRequest; } @GetMapping("/hello") public String hello() throws Exception { latch = new CountDownLatch(messagesPerRequest); IntStream.range(0, messagesPerRequest) .forEach(i -> this.template.send(topicName, String.valueOf(i), new PracticalAdvice("A Practical Advice", i)) ); latch.await(60, TimeUnit.SECONDS); logger.info("All messages received"); return "Hello Kafka!"; } @KafkaListener(topics = "advice-topic", clientIdPrefix = "json", containerFactory = "kafkaListenerContainerFactory") public void listenAsObject(ConsumerRecord<String, PracticalAdvice> cr, @Payload PracticalAdvice payload) { logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), typeIdHeader(cr.headers()), payload, cr.toString()); latch.countDown(); } @KafkaListener(topics = "advice-topic", clientIdPrefix = "string", containerFactory = "kafkaListenerStringContainerFactory") public void listenasString(ConsumerRecord<String, String> cr, @Payload String payload) { logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), typeIdHeader(cr.headers()), payload, cr.toString()); latch.countDown(); } @KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray", containerFactory = "kafkaListenerByteArrayContainerFactory") public void listenAsByteArray(ConsumerRecord<String, byte[]> cr, @Payload byte[] payload) { logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(), typeIdHeader(cr.headers()), payload, cr.toString()); latch.countDown(); } private static String typeIdHeader(Headers headers) { return StreamSupport.stream(headers.spliterator(), false) .filter(header -> header.key().equals("__TypeId__")) .findFirst().map(header -> new String(header.value())).orElse("N/A"); } }
这个班有三个听众。首先,让我们描述一下@KafkaListener注释的参数:
所有的听众都在消费同一个话题,建议话题。此参数是必需的。
参数clientiprefix是可选的。我在这里使用它,所以日志更人性化。你将知道哪个消费者用它的名字前缀做什么。
Kafka会在这个前缀后面加上一个数字。
containerFactory参数是可选的,您还可以依赖命名约定。如果不指定它,它将查找名为kafkaListenerContainerFactory的bean,这也是Spring Boot在自动配置Kafka时使用的默认名称。您还可以使用相同的名称来覆盖它(尽管对于不知道该约定的人来说,它看起来很神奇)。我们需要显式地设置它,因为我们希望为每个侦听器使用不同的侦听器,以便能够使用不同的反序列化程序。
请注意,传递给所有侦听器的第一个参数是相同的,即ConsumerRecord。如果我们使用第一个,第二个注释为@Payload是多余的。我们可以使用ConsumerRecord中的value()方法访问负载,但是我包含了它,所以您可以看到通过推断的反序列化直接获取消息负载是多么简单。
Kafka的TypeId头
默认情况下,Kafka库会自动设置“TypeId”标题。我在这里使用的实用程序方法typeIdHeader只是为了获得字符串表示,因为在ConsumerRecord的toString()方法的输出中只能看到一个字节数组。此TypeId头可用于反序列化,因此您可以找到要将数据映射到的类型。JSON反序列化不需要它,因为特定的反序列化器是由Spring团队创建的,他们从方法的参数中推断出类型。
运行应用程序
现在我们已经完成了
Kafka的生产者和消费者,我们可以运行
Kafka和Spring Boot应用程序:
$ docker-compose up -d Starting kafka-example_zookeeper_1 ... done Starting kafka-example_kafka_1 ... done $ mvn spring-boot:run ...
Spring Boot应用程序启动,消费者在Kafka中注册,Kafka为他们分配一个分区。我们将主题配置为三个分区,因此每个使用者都会得到其中一个分区。
输出-Kafka主题分区
[Consumer clientId=string-0, groupId=tpd-loggers] Successfully joined group with generation 28 [Consumer clientId=string-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-2] [Consumer clientId=bytearray-0, groupId=tpd-loggers] Successfully joined group with generation 28 [Consumer clientId=bytearray-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-0] [Consumer clientId=json-0, groupId=tpd-loggers] Successfully joined group with generation 28 [Consumer clientId=json-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-1] partitions assigned: [advice-topic-1] partitions assigned: [advice-topic-2] partitions assigned: [advice-topic-0]
我们现在可以尝试对服务进行HTTP调用。您可以使用浏览器或curl,例如:
调用终结点
$ curl localhost:8080/hello
日志中的输出应如下所示:
Kafka侦听器输出
INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":0} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 44, CreateTime = 1542911788418, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = {"message":"A Practical Advice","identifier":0}) INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 49, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 44, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = [B@39113414) INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":2} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = {"message":"A Practical Advice","identifier":2}) INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 45, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@476e998b) INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 46, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3}) INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 55, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 46, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = [B@7a229d60) INFO 15292 --- [ntainer#1-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 2, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9}) INFO 15292 --- [ntainer#2-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 3 [ByteArray] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 56, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 47, CreateTime = 1542911788423, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = [B@536adff4) INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 22, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4}) INFO 15292 --- [ntainer#0-0-C-1] i.tpd.kafkaexample.HelloKafkaController : Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 23, CreateTime = 1542911788422, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6}) INFO 15292 --- [nio-8080-exec-1] i.tpd.kafkaexample.HelloKafkaController : All messages received
解释
Kafka正在散列消息键(一个简单的字符串标识符),并在此基础上,将消息放置到不同的分区中。每个使用者在其分配的分区中获取消息,并使用其反序列化程序将其转换为Java对象。记住,我们的生产者总是发送JSON值。
如您在日志中所见,每个反序列化程序都设法执行其任务,以便字符串使用者打印原始JSON消息,字节数组显示该JSON字符串的字节表示,JSON反序列化程序使用Java类型映射器将其转换为原始类PracticalAdvice。您可以查看记录的ConsumerRecord,您将看到标题、分配的分区、偏移量等。
这就是使用Spring Boot和Kafka发送和接收JSON消息的方法。我希望您觉得这本指南很有用,下面您有一些代码变体,这样您就可以进一步了解
Kafka是如何工作的。
如果你有任何反馈,请通过Twitter或评论告诉我。
试试
Kafka的练习
如果您是
Kafka的新手,您可能需要尝试一些代码更改,以更好地了解
Kafka是如何工作的。
多次请求 /hello
提出一些请求,然后看看消息是如何跨分区分布的。具有相同密钥的Kafka消息总是放在相同的分区中。当您希望确保给定用户、进程或您正在处理的任何逻辑的所有消息都由同一个消费者以与生成消息相同的顺序接收时,无论您正在进行多少负载平衡,此功能都非常有用。
减少分区数
首先,确保重新启动Kafka,这样就放弃了以前的配置。
然后,在应用程序中将主题重新定义为只有两个分区:
重新定义Kafka主题
@Bean public NewTopic adviceTopic() { return new NewTopic(topicName, 2, (short) 1); }
现在,再次运行应用程序并向/hello端点发出请求。
不带分区的Kafka消费者
Logger 3 [ByteArray] received key 0: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 48, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 0, CreateTime = 1542952988174, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 0, value = [B@20a05831)
Logger 3 [ByteArray] received key 2: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 50, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 1, CreateTime = 1542952988177, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 2, value = [B@1ce73893)
Logger 3 [ByteArray] received key 5: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 53, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 2, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 5, value = [B@637be7d7)
Logger 3 [ByteArray] received key 6: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: [123, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 34, 65, 32, 80, 114, 97, 99, 116, 105, 99, 97, 108, 32, 65, 100, 118, 105, 99, 101, 34, 44, 34, 105, 100, 101, 110, 116, 105, 102, 105, 101, 114, 34, 58, 54, 125] | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 3, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 6, value = [B@75f374e7)
Logger 1 [JSON] received key 1: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=1} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 0, CreateTime = 1542952988177, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=1})
Logger 1 [JSON] received key 3: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=3} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 1, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 3, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=3})
Logger 1 [JSON] received key 4: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 2, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 4, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=4})
Logger 1 [JSON] received key 7: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=7} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 3, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 7, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=7})
Logger 1 [JSON] received key 8: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=8} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 4, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 8, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=8})
Logger 1 [JSON] received key 9: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=9} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 5, CreateTime = 1542952988178, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 9, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=9})
All messages received
其中一个消费者没有收到任何消息。这是预期的行为,因为在同一使用者组中没有可用的分区。
更改一个消费者的组标识符
保留以前的更改,主题现在只有两个分区。我们现在正在更改一个消费者的组id,因此它可以独立工作。
更改Kafka组id
@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray", containerFactory = "kafkaListenerByteArrayContainerFactory", groupId = "tpd-loggers-2") public void listenAsByteArray(ConsumerRecord<String, byte[]> cr, @Payload byte[] payload) { logger.info("Logger 3 [ByteArray] received a payload with size {}", payload.length); latch.countDown(); }
注意,我们还更改了记录的消息。现在,这个消费者负责打印有效负载的大小,而不是有效负载本身。此外,我们需要更改倒计时锁存器,以便它预期的消息数是原来的两倍。
闩锁设置为数字的两倍
latch = new CountDownLatch(messagesPerRequest * 2);
为什么?这次,让我们解释一下在运行应用程序之前会发生什么。正如我在本文开头所描述的,当消费者属于同一个消费者群体时,他们(概念上)正在处理同一个任务。我们正在实现一种负载平衡机制,在这种机制中,并发工作器从不同的分区获取消息,而无需处理彼此的消息。
在本例中,我还更改了最后一个消费者的“任务”,以便更好地理解这一点:它正在打印不同的内容。由于我们更改了组id,这个使用者将独立工作,Kafka将为它分配两个分区。字节数组使用者将接收所有消息,与其他两个单独工作。
两个
Kafka消费者
Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 2 [String] received key 1: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":1} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 12, CreateTime = 1542954145932, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 1, value = {"message":"A Practical Advice","identifier":1}) Logger 2 [String] received key 3: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":3} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 13, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 3, value = {"message":"A Practical Advice","identifier":3}) Logger 2 [String] received key 4: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":4} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 14, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 4, value = {"message":"A Practical Advice","identifier":4}) Logger 2 [String] received key 7: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":7} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 15, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 7, value = {"message":"A Practical Advice","identifier":7}) Logger 2 [String] received key 8: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":8} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 16, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 8, value = {"message":"A Practical Advice","identifier":8}) Logger 3 [ByteArray] received a payload with size 47 Logger 2 [String] received key 9: Type [io.tpd.kafkaexample.PracticalAdvice] | Payload: {"message":"A Practical Advice","identifier":9} | Record: ConsumerRecord(topic = advice-topic, partition = 1, offset = 17, CreateTime = 1542954145934, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [105, 111, 46, 116, 112, 100, 46, 107, 97, 102, 107, 97, 101, 120, 97, 109, 112, 108, 101, 46, 80, 114, 97, 99, 116, 105, 99, 97, 108, 65, 100, 118, 105, 99, 101])], isReadOnly = false), key = 9, value = {"message":"A Practical Advice","identifier":9}) Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 3 [ByteArray] received a payload with size 47 Logger 1 [JSON] received key 0: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=0} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 8, CreateTime = 1542954145929, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 0, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=0}) Logger 1 [JSON] received key 2: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=2} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 9, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 2, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=2}) Logger 1 [JSON] received key 5: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=5} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 10, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 5, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=5}) Logger 1 [JSON] received key 6: Type [N/A] | Payload: PracticalAdvice::toString() {message='A Practical Advice', identifier=6} | Record: ConsumerRecord(topic = advice-topic, partition = 0, offset = 11, CreateTime = 1542954145933, serialized key size = 1, serialized value size = 47, headers = RecordHeaders(headers = [], isReadOnly = false), key = 6, value = PracticalAdvice::toString() {message='A Practical Advice', identifier=6}) All messages received
有了这些练习,在这里和那里改变参数,我认为你可以更好地掌握这些概念。
原文:https://thepracticaldeveloper.com/2018/11/24/spring-boot-kafka-config/
本文:http://jiagoushi.pro/node/972
讨论:请加入知识星球或者微信圈子【首席架构师圈】
- 登录 发表评论
- 182 次浏览
最新内容
- 6 hours ago
- 1 day 19 hours ago
- 3 days 6 hours ago
- 3 days 15 hours ago
- 3 days 17 hours ago
- 3 days 17 hours ago
- 3 days 17 hours ago
- 4 days 18 hours ago
- 1 week 3 days ago
- 1 week 3 days ago