此教程假设你刚刚开始没有任何 Kafka 或 ZooKeeper 数据。Kafka的控制台脚本在类Unix和Windows平台不同,Windows平台使用bin\windows\\
代替bin/
,脚本的扩展名改为.bat
。
第一步:下载代码
下载0.10.1.0发行版并解压。
> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0
第二步:启动服务
Kafka使用Zookeeper,所以如果你没有的话需要首先启动Zookeeper服务。你可以使用kafka自带的脚本启动一个简单的单一节点Zookeeper实例。
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
现在启动Kafka服务:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
第三步:创建一个主题
让我们来创建一个名为test
的topic,只使用单个分区和一个复本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
我们现在可以运行list topic命令看到我们的主题。
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
另外,当没有主题存在的时候,你也可以通过配置代理自动创建主题而不是手动创建。
第四步:发送消息
Kafka有自带的命令行客户端会从文件或者标准输入接受数据当作消息发送到Kafka集群。默认情况下,每行作为一个独立的消息发送。
运行生产者控制台并且打几行消息到控制台发送到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
第五步:启动一个消费者
Kafka还有个消费者控制台,会把消息输出到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你上面的命令是在不同的终端运行,那么你可以在生产者终端输入消息然后在消费者终端看到。
所有的命令行工具都有一些额外的参数:如果没有使用参数运行命令,将会显示它们的详细用法。
第六步:设置多个代理集群
目前为止,我们已经在单个代理上运行了,但这不好玩。对于Kafka,单个代理只是大小为1的集群。所以没什么改变除了多启动几个代理实例。只是为了感受一下,我们把集群扩展到3个节点(仍然在我们的本地机器上)。
首先,我们为每个代理新建一个配置文件(在windows上使用copy
命令):
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
现在编辑新文件设置一下属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id
属性是唯一的,在集群的每个节点永久不变。因为我们在单台机器上运行代理,必须重写端口和日志目录。
我们已经有了Zookeeper并运行了一个节点,所以只需要启动下面的两个新节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
现在创建一个含有三个副本的主题:
```sh
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
好,现在我们有了一个集群但是怎么知道哪个代理正在做什么?使用describe topice
命令查看:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
这是输出解释。第一行给出了各个分区的概况,额外的每行都给出了一个分区的信息。由于我们只有一个主题的分区,所以只有一行。
leader
是负责当前分区的所有读写请求。每个节点都将领导一个随机选择的分区。replicas
是节点列表,复制分区日志,不管他们是不是leader或者即使它们还活着。isr
是in-sync
的集合。这是replicas
列表当前还活着的子集。
注意在我们的示例中节点1是唯一的主题分区领导者。
我们运行同样的命令查看我们已经创建的原始主题在哪里:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
所以没什么惊奇的,原始的主题没有副本在节点0上,当我们创建它时唯一存在的节点服务器。
让我们发布一些消息到新的主题:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在消费这些消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
现在测试容错性。节点1是领导者,我们kill它。
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564
在Windows上使用
> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe java -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar" kafka.Kafka config\server-1.properties 644
> taskkill /pid 644 /f
领导关系已经改为了从节点中的一个,节点1也不再in-sync复本集中:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
但是我们依然可以消费消息即使之前接受的领导者已经挂掉:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
第七部:使用Kafka Connect导入导出数据
在控制台输入输出数据是很方便,但是你可能使用来自其他数据源的数据或者把Kafka的数据导出到其他的系统中。对于很多系统,你可以直接使用Kafka Connect导入导出数据而不需要手写自定义的集成代码。
Kafka Connect是Kafka自带的导入导出工具。它是运行连接器的可扩展工具,实现了集成外部系统的自定义逻辑。在快速教程里,我们会看到如何使用Kafka Connect的简单连接器从文件导入数据到Kafka主题,再从kafka主题导出数据到文件。
首先,我们先创建一些测试数据:
echo -e "foo\nbar" >test.txt
然后我们在Standalone模式启动两个连接器,Standalone模式表示他们运行在一个本地进程中。我们提供了三个配置文件作为参数,第一个配置Kafka Connect进程,包含通用的配置如Kafka 代理连接和数据序列话工具。剩余的文件每个都指定一个连接器。这些文件包含一个唯一连接器名,实例化的连接器类,和一些其他的连接器配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这些示例配置文件,包含在Kafka中,使用默认的本地集群配置并创建了两个连接器:第一个是源连接器从文件读取数据行每行都生成消息发送到kafka主题,第二个是目标连接器从Kafka主题读取消息生成行输出到文件中。
在启动的时候你会看到大量的日志信息,包含一些表示连接器初始化的。一旦Kafka Connect进程启动,源连接器开始从test.txt
读取行并发送到connect-test
主题,sink连接器开始从connect-test
主题读取消息把他们写到test.sink.txt
文件。我们可以检查输出文件看到数据已经通过管道传递完毕:
> cat test.sink.txt
foo
bar
注意数据存储在Kafka的主题connect-test
中,我们可以运行消费者控制台查看主题数据(或者消费者代码处理它):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
连接器持续生成数据,所以我们可以给测试文件添加数据,看它通过管道。
> echo "Another line" >> test.txt
你应该看到这行出现在消费者控制台和目标文件中。
第八步:用Kafka Streams处理数据
Kafka Streams 是一个客户端库,为了实时流计算和分析Kafka集群中的存储数据。此快速教程示例将会描述如何运行一个用这个库编写的流程序。这是WordCountDemo
的主要示例代码(改成Java8 lambda表达式为了容易阅读)。
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
它实现了WordCount算法,计算输入文本单词直方图。然而,不同你之前可能见过的WordCount例子(数据是有限),这个Demo程序有些不同,因为它设计为操作无限且没有边界的流数据。与有界变量类似,它是一个有状态的算法跟踪和修改单词的总数。然而,由于它必须假设潜在的输入数据无限多,当处理更多数据时它会定期输出它的当前状态和结果,因为他不知道什么时候所有输入数据会处理完成。
我们将给Kafka主题添加一些数据,随后会被Kafka流程序处理。
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
在Windows上:
> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt
下一步,我们使用生产者控制台发送一些输入数据给主题streams-file-input
(在实践中,流数据会在系统启动时持续不断的流向Kafka系统)。
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
现在可以运行WordCount示例程序处理输入数据:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
不会有任何标准输出,日志项作为结果持续写到另一个名为streams-wordcount-output
Kafka主题中。示例会运行几秒钟后自动停止而不像以便的流处理程序。
我们现在通过读取它的输出主题检查WordCount 示例输出:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
以下输出数据将会打印在控制台上:
all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1
第一列是Kafka消息键,第二列是消息的值,都是java.lang.String
格式。注意输出实际上是一个持续更新的流,每个数据记录(例如上面的每行)是一个单词的更新总数,又或者是如kafka这样的键。对一个键有多条记录,每个后面的记录都会更新前面的。