Kafka串行化方案——在Confluent Streaming Platform中播放AVRO、Protobuf、JSON Schema。这些示例的代码可在https://github.com/saubury/kafka-serialization
Apache Avro在很长一段时间内一直是默认的Kafka串行化机制。Confluent刚刚更新了他们的Kafka流媒体平台,增加了对使用协议缓冲区(或protobuf)和JSON模式串行化串行化数据的支持。
Kafka与AVRO vs.,Kafka和Protobuf vs.,Kafka与JSON Schema
Protobuf特别酷,提供了一些巧妙的机会,超出了Avro的可能。Protobuf和JSON模式的包含适用于生产者和消费者库、模式注册表、Kafka连接、ksqlDB以及控制中心。花几分钟时间熟悉流媒体应用程序的串行化策略可能带来的新机会是值得的。
我关心结构化数据的序列化吗?
那么,为什么要对结构化数据进行串行化呢?让我们从一个示例数据字符串开始…“cookie,50,null”这个数据是什么意思?cookie是一个名字、一个地方还是一种吃的东西?50岁左右呢?这是年龄、温度还是其他原因?
如果您使用数据库(如Postgres或Oracle)来存储数据,您将创建一个表定义(具有命名良好的列和适当的数据类型)。流媒体平台也是如此——你真的应该选择数据格式来串行化结构化数据。在您的数据平台上保持一致的奖励积分!
直到最近,您在Kafka中串行化结构化数据的选择还是有限的。您有“糟糕”的选择(如自由文本或CSV),或者使用ApacheAvro是“更好”的选择。Avro是一个开源的数据序列化系统,它将您的数据(以及它的适当模式)封送为高效的二进制格式。Avro的核心功能之一是能够为我们的数据定义模式。因此,我们的数据cookie 50,null将与这样的小吃Avro模式相关联
{
"type": "record",
"name": "snacks",
"fields": [
{"name": "name", "type": "string" }
, {"name": "calories", "type": "float" }
, {"name": "colour", "type": "string", "default": null}
]
}
在这里我们可以看到我们的数据cookie,50,null是零食数据(最重要的数据类型)。我们可以看到饼干是一个字符串,代表小吃的名称。我们的模式为我们提供了很大的灵活性(我们的模式可以随着时间的推移而发展),并确保了数据的完整性(例如,确保卡路里是整数)。
尽管大多数ApacheKafka用户都使用ApacheAvro来定义消息的契约,但这始终有点像“Java的东西”。ApacheAvro编译器自动生成的类有利于JVM开发人员。你当然可以在几乎任何语言中使用AVRO,然而,谷歌协议缓冲区(protobuf)在其他语言(Python、Rust、Ruby、Go)中非常流行,用于串行化、去串行化和验证数据。
AVRO、Protobuf、JSON Schema与Kafka一起使用
这不是一个关于“最佳”连载策略的博客。然而,让我们熟悉如何使用新的选择来序列化结构化数据
我们将浏览几个例子。请记住,我们保存在snacks.txt文件中的最初一组美味数据如下所示
{"name": "cookie", "calories": 500, "colour": "brown"}
{"name": "cake", "calories": 260, "colour": "white"}
{"name": "timtam", "calories": 80, "colour": "chocolate"}
AVRO串行化
让我们提醒自己如何使用AVRO串行化对我们的零食进行编码。我们将使用include命令行工具kafka-avro控制台生产者作为kafka生产者,它可以执行串行化(提供模式作为命令行参数)。生产者是将数据写入卡夫卡经纪人的东西。
kafka-avro-console-producer --broker-list localhost:9092 --topic SNACKS_AVRO --property value.schema='
{
"type": "record",
"name": "myrecord",
"fields": [
{"name": "name", "type": "string" }
, {"name": "calories", "type": "float" }
, {"name": "colour", "type": "string" }
]
}' < snacks.txt
为了读取数据,我们可以使用kafka avro控制台consumer命令行应用程序作为kafka consumer来读取和串行化我们的avro数据
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic SNACKS_AVRO --from-beginning {"name":"cookie","calories":500.0,"colour":"brown"}
{"name":"cake","calories":260.0,"colour":"white"}
{"name":"timtam","calories":80.0,"colour":"chocolate"}
Protocol Buffers (Protobuf) serialization
这一次,我们将使用protobuf系列化与新的kafkaprotobuf控制台生产商kafka生产商。这个概念与我们在AVRO中采用的方法相似,但这一次我们的卡夫卡制作人将可以执行protobuf系列化。请注意,protobuf模式是作为命令行参数提供的。
kafka-protobuf-console-producer --broker-list localhost:9092 --topic SNACKS_PROTO --property value.schema='
message Snack {
required string name = 1;
required int64 calories = 2;
optional string colour = 3;
}' < snacks.txt
为了读取数据,我们可以使用kafka protobuf控制台consumerkafka consumer对我们的protobuf数据进行去串行化
kafka-protobuf-console-consumer --bootstrap-server localhost:9092 --topic SNACKS_PROTO --from-beginning {"name":"cookie","calories":"500","colour":"brown"}
{"name":"cake","calories":"260","colour":"white"}
{"name":"timtam","calories":"80","colour":"chocolate"}
JSON Schema serialization
最后,我们将使用JSON模式串行化与新的kafkajson模式控制台生成器kafka-productor。请注意,json模式模式是作为命令行参数提供的。
kafka-json-schema-console-producer --broker-list localhost:9092 --topic SNACKS_JSONSCHEMA --property value.schema='
{
"definitions" : {
"record:myrecord" : {
"type" : "object",
"required" : [ "name", "calories" ],
"additionalProperties" : false,
"properties" : {
"name" : {"type" : "string"},
"calories" : {"type" : "number"},
"colour" : {"type" : "string"}
}
}
},
"$ref" : "#/definitions/record:myrecord"
}' < snacks.txt
为了读取数据,我们可以使用kafkajson模式控制台consumerkafkaconsumer来解串行化我们的json模式数据
kafka-json-schema-console-consumer --bootstrap-server localhost:9092 --topic SNACKS_JSONSCHEMA --from-beginning {"name":"cookie","calories":"500","colour":"brown"}
{"name":"cake","calories":"260","colour":"white"}
{"name":"timtam","calories":"80","colour":"chocolate"}
What you can do with Protobuf and can’t do with Avro?
让我们看一个更复杂的建模示例,以说明Protobuf模式的一些可能性。想象一下,我们想为一顿饭建模,并描述饭中的成分。我们可以使用protobuf模式来描述一顿饭,比如由牛肉馅和奶酪浇头组成的玉米卷。
我们的玉米卷和炸鱼薯条的数据可能是这样的
{
"name": "tacos",
"item": [
{
"item_name": "beef",
"type": "FILLING"
},
{
"item_name": "cheese",
"type": "TOPPING"
}
]
}, {
"name": "fish and chips",
"alternate_name": "fish-n chips",
"item": []
}
一个代表我们膳食的protobuf模式示例如下
message Meal {
required string name = 1;
optional string alternate_name = 2; enum FoodType {
INGREDIENT = 0;
FILLING = 1;
TOPPING = 2;
} message MealItems {
required string item_name = 1;
optional FoodType type = 2 [default = INGREDIENT];
} repeated MealItems item = 4;
}
To try this modelling with protobuf in Kafka
kafka-protobuf-console-producer --broker-list localhost:9092 --topic MEALS_PROTO --property value.schema='
message Meal {
required string name = 1;
optional string alternate_name = 2; enum FoodType {
INGREDIENT = 0;
FILLING = 1;
TOPPING = 2;
} message MealItems {
required string item_name = 1;
optional FoodType type = 2 [default = INGREDIENT];
} repeated MealItems item = 4;
}' < meals.txt
这让您了解了在Kafka中使用protobuf可以如何灵活地表示数据。但是,如果我们需要对这些模式进行更改,会发生什么呢?
具有Confluent Schema注册表的Protobuf
您可能想知道上面的例子中的模式到底去了哪里?Confluent Schema Registry一直在努力存储这些模式(当使用kafka-blah控制台生成器时,作为序列化过程的一部分)。也就是说,模式名称(例如SNACKS_PROTO-value)、模式内容、版本和样式(protobuf、Avro)都已存储。我们可以使用curl在存储的模式上达到峰值。例如,为了探索我们的零食最近使用的protobuf模式
curl -s -X GET http://localhost:8081/subjects/SNACKS_PROTO-value/versions/1
Which responds the this snack schema (yummy)
{
"subject": "SNACKS_PROTO-value",
"version": 1,
"id": 6,
"schemaType": "PROTOBUF",
"schema": "\nmessage Snack {\n required string name = 1;\n required int64 calories = 2;\n required string colour = 3;\n}\n"
}
Protobuf的模式进化
俗话说,唯一不变的就是变化。任何好的数据平台都需要适应变化——比如对模式的添加或更改。支持模式进化是流媒体平台的基本要求,因此我们的序列化机制也需要支持模式更改(或进化)。Protobuf和Avro都提供了在不破坏下游消费者的情况下更新模式的机制——下游消费者可能仍然使用以前的模式版本。
在我们的膳食中添加饮料
墨西哥玉米饼和披萨听起来很棒——但让我们在用餐时喝点什么吧!现在,我们可以向我们的模式添加一些额外的属性,以包括膳食。这有时被称为模式进化。请注意,我们将继续使用现有的MEALS_PROTO主题。
新的数据有效载荷(包括啤酒)
{
"name": "pizza",
"drink": [
{
"drink_name": "beer",
"type": "ALCOHOLIC"
}
]
}
So to encode the command looks like
kafka-protobuf-console-producer --broker-list localhost:9092 --topic MEALS_PROTO --property value.schema='
message Meal {
required string name = 1;
optional string alternate_name = 2; enum FoodType {
INGREDIENT = 0;
FILLING = 1;
TOPPING = 2;
} enum DrinkType {
BUBBLY = 0;
ALCOHOLIC = 1;
} message MealItems {
required string item_name = 1;
optional FoodType type = 2 [default = INGREDIENT];
} message DrinkItems {
required string drink_name = 1;
optional DrinkType type = 2 ;
} repeated MealItems item = 4;
repeated DrinkItems drink = 5;
}' < meals-2.txt
Visualising schema difference with Confluent Control Center
One nice inclusion with the Confluent Control Center (the Web GUI included in the Confluent platform) is the ability to look at schemas, and the differences between schemas. For example, we can see version 1 and version 2 of the MEALS_PROTO-value
schema
Application Binding — Protobuf classes with Python
Let us now build an application demonstrating protobuf classes. To generate protobuf classes you must first install the protobuf compiler protoc
. See the protocol buffer docs for instructions on installing and using protoc.
We can compile our Python schema like this. This will take our schema (from meal.proto
) and will create the meal_pb2.py
Python class file.
protoc -I=. --python_out=. ./meal.proto
Excellent, with the meal_pb2.py
Python class file you can now build protobuf classes and produce into Kafka with code like this
import meal_pb2
mybeer = Meal.DrinkItems(drink_name="beer")
mywine = Meal.DrinkItems(drink_name="wine")
meal = Meal(name='pizza', drink=[mybeer,mywine])producer.produce(topic='MEAL_PROTO', value=meal)
看看producer-protobuf.py,了解Python中protobuf-Kafka生产者的完整示例。
结论
Kafka的功能不断增长,在Confluent平台中使用AVRO、Protobuf和JSON Schema的选项为构建酷炫的流媒体应用程序提供了更多机会
这些示例的代码可在
Tags
最新内容
- 4 hours ago
- 7 hours ago
- 7 hours ago
- 2 days 22 hours ago
- 3 days 6 hours ago
- 3 days 6 hours ago
- 3 days 6 hours ago
- 3 days 7 hours ago
- 1 week ago
- 1 week ago