0%

初识Kafka

什么是Kafka

Kafka用于构建实时数据管道和流应用程序。它具有横向可扩展性、容错性、还有极快的速度。

Kafka介绍

什么是流平台

流平台具有三个关键功能:

  • 发布和订阅数据流,类似于消息队列或企业消息系统。
  • 以容错的持久方式存储数据流
  • 处理数据流

Kafka使用场景

  • 建立实时数据流管道,可以可靠地在系统或应用程序之间获取数据
  • 构建实时流应用程序以转换或响应数据流

要了解Kafka如何执行这些操作,让我们从头开始深入研究Kafka的功能。

Kafka概念:

  • Kafka在一个或多个跨越多数据中心的服务器上作为集群运行。
  • Kafka集群将数据流存储在称为Topic的类别中。
  • 每条数据记录由一个键、一个值和一个时间戳组成。

Kafka四个核心API

  • Producer API 允许应用程序向一个或多个Topic发布数据流
  • Consumer API 允许应用程序订阅一个或多个Topic,并处理他们发布的数据流
  • Streams API 允许应用程序充当流处理器,从一个或多个Topic获取输入流,然后向一个或多个Topic发布输出流,有效地将输入流转换为输出流。
  • Connector API 允许构建和运行可重复使用的生产者消费者,连接现有的应用程序或数据系统中的Topic

image

在Kafka中,客户端和服务器之间所有的通信都构建在TCP 协议之上。

Topic和Logs

Topic是将记录发布到的类别或订阅源的名称。Kafka中的Topic始终是多用户的;也就是说,一个Topic可以允许零个、一个或多个消费者来订阅或生产者来发布。

对于每个Topic,Kafka集群都会维护一个分区日志,如下所示:

image

每个分区(Partition)都有一个有序的、不变的序列,将提交日志记录连续追加。每个分区中的记录都有一个偏移的顺序ID,该ID是分区中的每个记录的唯一标识。

Kafka群集支持永久的保留所有的发布记录(无论是否已消费它们),这个周期是可配置的。例如,如果将保留策略设置为两天,则在发布记录后的两天内,该记录可供使用,之后将被丢弃以释放空间。Kafka的性能相对于数据大小实际上是恒定的,因此长时间存储数据不是问题。

image

实际上,每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由消费者控制:通常,消费者在读取记录时会线性地推进其偏移量,由于位置是由消费者控制的,因此它可以任意顺序使用记录。例如,消费者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录从新数据开始处理。

这些功能的组合意味着对于Kafka的消费者来说,操作非常轻便,他们任何偏移对集群或其他消费者没有影响。

日志中的分区有几个用途。首先,它们允许日志扩展到超出单个服务器的大小。每个单独的分区必须适合承载它的服务器,一个Topic可能有许多分区,因此它可以处理任意数量的数据,分区也可以随意扩充。

分布

日志的分区分布在Kafka集群中的服务器上,每个服务器处理数据并同步其他分区。每个分区都在可配置数量的服务器之间复制,以实现容错功能。

每个分区都有一个充当“领导者”的服务器和零个或多个充当“跟随者”的服务器。领导者处理对分区的所有读写请求,而跟随者则被动地复制领导者。如果领导者出现错误,则跟随者之一将自动随机成为新领导者。

生产者

生产者将数据发布到他们选择的Topic。生产者负责选择要分配给Topic中哪个分区添加记录。

消费者

消费者使用消费者组名称标记自己,并且发布到Topic的每条记录都会传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在单独的计算机上。

如果所有消费者实例具有相同的消费者组,记录将均衡负载给所有消费者。

如果所有消费者实例具有不同的消费者组,则每条记录将广播到所有消费者。

image

在Kafka中,消费的实现方式是将日志中的分区除以消费者实例,以便每个实例在任何时间点都是分区“公平共享”的唯一消费者。这个维护组成员身份的过程是由Kafka协议动态处理的。如果新实例加入组,则它们将从组的其他成员接管某些分区;如果实例死亡,则其分区将分发到其余实例。

Kafka只提供分区内记录的总顺序,而不提供Topic中不同分区之间的总顺序。但是,如果需要记录的总顺序,则可以使用只有一个分区的Topic来实现,尽管这意味着每个消费者组只有一个消费者进程。

保障

Kafka提供以下保障:

  • 生产者发送到特定Topic分区的消息将按其发送顺序追加。也就是说,如果记录M1与记录M2是相同的生产者发送,并且首先发送M1,则M1的偏移量将小于M2,并在记录中更早出现。
  • 消费者实例按记录在日志中的存储顺序读取记录。
  • 对于集群节点数为N的Topic,我们最多可以容忍N-1个服务器故障,才不会丢失提交的数据记录。

Kafka作为消息系统

Kafka的流概念与传统的企业消息系统相比如何?

传统上,消息具有两种模型:队列发布-订阅。在队列中,一组消费者可以从服务器读取数据,然后处理记录;在发布-订阅中广播给所有消费者。这两个模型中的每一个都有优点和缺点。队列的优势在于,它允许将数据处理划分到多个消费者实例上,从而扩展处理量。不幸的是,队列不是多用户的-进程读取完数据,数据就丢失了。发布-订阅允许您将数据广播到多个进程,但是由于每条消息都传递给每个订阅者,因此无法扩展处理。

Kafka的消费者组概括了这两个概念。与队列一样,消费者组允许您将处理划分为一组进程(消费者组的成员)。与发布-订阅一样,Kafka允许您将消息广播到多个消费者组。

与传统的消息系统相比,Kafka还具有以下优势。

传统队列将记录按顺序保留在服务器上,如果多个消费者从队列中消费,则服务器将按记录的存储顺序分发记录。但是,尽管服务器按顺序分发记录,但是这些记录是异步传递给消费者的,因此它们可能会在不同的消费者上无序到达。这实际上意味着在并行使用的情况下会丢失记录的顺序。消息传递系统通常通过具有“专用消费者”的概念来解决此问题,该概念仅允许一个进程从队列中使用,但是,这当然意味着在处理中没有并行性。

Kafka通过在Topic内的并行性(即分区)的概念,Kafka能够在用户进程池中提供排序保证和负载均衡。通过将Topic中的分区分配给消费者组中的消费者来实现的,保证每个分区都由组中的一个消费者完全消费。通过这样做,我们确保消费者是该分区的唯一消费,并按顺序使用数据。由于存在许多分区,因此仍然可以平衡许多消费者实例上的负载。但是请注意,消费者组中的消费者实例不能超过分区。

Kafka作为存储系统

写入Kafka的数据被写入磁盘并复制到其他节点以实现容错。Kafka允许生产者等待确认,以保证在被复制的服务器返回成功之前,不会认为写入是成功的。

由于重视存储并允许客户端控制其读取位置,所以可以将Kafka视为一种专用的分布式文件系统,专门用于高性能、低延迟的提交日志存储、复制和传播。

Kafka用于流处理

仅读取,写入和存储数据流是不够的,目的是实现对流的实时处理。

在Kafka中,流处理器是指从输入Topic中获取连续数据流,对该输入进行一些处理并生成连续数据流输出。

可以直接使用生产者和消费者API进行简单处理。但是,对于更复杂的转换,Kafka提供了完全集成的Streams API。这允许构建执行复杂处理的应用程序,这些应用程序计算流的聚合或将流连接在一起。

Streams API建立在Kafka提供的核心上:它使用生产者和消费者API作为输入,使用Kafka进行状态存储,并使用相同的组机制来实现流处理器实例之间的容错。

快速开始

本教程假定您是从头开始的,并且没有现有的Kafka或ZooKeeper数据。由于Kafka控制台脚本在基于Unix的平台和Windows平台上有所不同,因此在Windows平台上使用bin\windows\代替bin/,并将脚本扩展名更改为.bat

步骤1:下载

下载 2.4.0发行版并将其解压缩。

1
2
> tar -xzf kafka_2.12-2.4.0.tgz
> cd kafka_2.12-2.4.0

步骤2:启动服务

Kafka基于ZooKeeper启动,因此请先启动ZooKeeper服务器。可以使用kafka提供的脚本来快速启动单节点ZooKeeper实例。

1
2
3
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

现在启动Kafka服务器:

1
2
3
4
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

步骤3:建立Topic

创建一个分区和一个副本的Topic,名称“test”:

1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

现在,如果我们运行list topic命令,便可以看到该Topic

1
2
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

除了手动创建Topic外,还可以配置成发布不存在的Topic时自动创建Topic

步骤4:发送消息

Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行都作为一条消息发送。

运行生产者,然后在控制台中键入一些消息发送到服务器。

1
2
3
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

步骤5:启动消费者

Kafka还有一个命令行消费者,它将消息转储到标准输出。

1
2
3
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果上面的每个命令都在不同的终端上运行,那么您现在应该能够在生产者终端中键入消息,并看到它们出现在消费者终端中。

所有的命令行工具都有其他选项。在不带参数的情况下运行该命令将显示用法信息。

步骤6:建立集群

到目前为止,我们一直在操作单节点。下面我们搭建一个多节点的集群,将集群扩展到三个节点(仍然全部在本地计算机上)。

首先,我们为每个节点创建一个配置文件(在Windows上,使用copy命令代替):

1
2
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

现在编辑这些新文件并设置以下属性:

1
2
3
4
5
6
7
8
`config/server-1.properties:`
`broker.id=1`
`listeners=[PLAINTEXT://:9093](plaintext://:9093)`
`log.dirs=/tmp/kafka-logs-1`
`config/server-2.properties:`
`broker.id=2`
`listeners=[PLAINTEXT://:9094](plaintext://:9094)`
`log.dirs=/tmp/kafka-logs-2`

broker.id属性是集群中每个节点的唯一且永久的名称。我们只需要覆盖端口和日志目录,这是因为我们都在同一台计算机上运行它们,并且希望所有代理都不要试图在同一端口上注册或覆盖彼此的数据。

我们已经有Zookeeper并启动了单个节点,因此我们只需要启动两个新节点:

1
2
3
4
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在,创建一个具有三个副本的新Topic:

1
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

运行describe topics命令查看节点信息:

1
2
3
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

第一行给出了所有分区的摘要,每一行都给出了有关一个分区的信息。由于该Topic只有一个分区,因此只有一行。

  • leader“是负责给定分区的所有读取和写入的节点。每个节点有可能随机成为领导者。
  • replicas“ 是复制此分区的日志的节点列表,不管这些节点是主节点还是当前活动节点。
  • isr“是一组“同步”副本。这是副本列表的子集,该列表当前处于活动状态,并与领先者保持联系。

我们也可以在单节点上运行相同的命令,查看其信息:

1
2
3
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

原始Topic没有副本,并且位于服务器0上,他是集群中的唯一服务器。

向新Topic发布一些消息:

1
2
3
4
5
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在让我们使用这些消息:

1
2
3
4
5
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

现在让我们测试一下容错能力。节点1扮演领导者的角色,所以让我们杀死它:

1
2
3
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用:

1
2
3
4
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
ProcessId
6016
> taskkill /pid 6016 /f

领导者已切换到跟随者之一,并且节点1不再位于同步副本集中:

1
2
3
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

但是,即使最初进行写操作的领导者已经下线,消息仍然可以使用:

1
2
3
4
5
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C