标签归档:kafka

Kafka入门及Kafka-Python初体验

本文介绍了以下内容:

1.什么是Kafka?

2.为什么我们需要使用Kafka这样的消息系统及使用它的好处

3.如何将Kafka使用到我们的后端设计中。

译自https://timber.io/blog/hello-world-in-kafka-using-python/,有部分删改。

1.Kafka是什么、为什么我们需要它?

简而言之,Kafka是一个分布式消息系统。这是什么意思呢?

想象一下,你现在有一个简单的Web应用,其包含了网页前端客户端(Client)、服务端和数据库:

你需要记录所有发生在你的Web应用的事件,比如点击、请求、搜索等,以便后续进行计算和运营分析。

假设每个事件都由单独的APP完成,那么一个简单的解决方案就是将数据存储在数据库中,所有APP连接到数据库进行存储:

这看起来简单,但是其中还会出现许多问题:

1.点击、请求、搜索等事件会产生大量的数据到数据库中,这可能会导致插入事件存在延迟。

2.如果选择将高频数据存储在SQL或MongoDB等数据库中,很难再原有历史数据的基础上扩展数据库。

3.如果你需要用这些数据进行数据分析,你可能无法直接对数据库进行高频率的读取操作。

4.每个APP可以遵循自己的数据格式,这就意味着当你需要在不同的APP进行数据交换时,你需要进行数据格式的转换。

通过使用像Kafka这样的消息流系统,可以很好地解决这些问题,因为他们可以执行以下操作:

1.存储的大量数据可以被持久化、校验和复制,具备容错能力。

2.支持跨系统实时处理连续的数据流。

3.允许APP独立发布数据或数据流,并与使用它的APP无关。

那么它和传统数据库有何不同?

尽管Kafka可以持久化地存储数据,但它不是数据库。

Kafka不仅允许APP存储或提取连续的数据流,还支持实时处理。这与对被动数据执行CRUD操作或对传统数据库执行查询的方式不同。

听起来不错,那么Kafka是如何解决以上挑战的?

Kafka是一个分布式平台,是为规模而构建的,这意味着它可以处理高频率的读写和存储大量数据。它确保数据始终可靠。它还支持从故障中恢复的强大机制。

以下是为什么应该使用Kafka的一些关键因素:

1.1 简化后端架构

在Kafka的帮助下,我们前面的结构会变得简单一些:

1.2 通用数据管道

如上所示,Kafka充当多个APP和服务的通用数据管道,这给了我们两个好处:

1.数据是集成的,我们将来自不同系统的数据都存在一个地方,这使得Kafka成为真正的数据源。任何APP都可以将数据推送到该平台,然后由另一个APP提取数据。

2.Kafka使得应用程序之间交换数据变得容易。因为我们可以标准化数据格式,减少了数据格式的转换。

1.3 通用连接性

尽管Kafka允许你使用标准数据格式,但并不意味着你的APP就不需要数据转换了,它只是减少了我们转换数据的频率罢了。

此外,Kafka提供了一个叫 Kafka Connect 的框架允许我们维护遗留的老系统。

1.4 实时数据处理

类似于监控系统这样的实时APP,往往需要连续的数据流,这些数据需要被立即处理或尽量减少延迟处理。

Kafka的流式处理,使得处理引擎可以在很短的时间内(几毫米到几分钟)内取数、分析、以及响应。

2.Kafka入门

2.1 安装

安装Kafka是一个相当简单的过程。只需遵循以下给定步骤:

1.下载最新的1.1.0版本的Kafka

2.使用以下命令解压缩下载文件: tar -xzf kafka_2.11-1.1.0.tgz

3.cd到Kafka目录开始使用它: cd kafka_2.11-1.1.0

2.2 启动服务器

ZooKeeper是一个针对Kafka等分布式环境的集中管理工具,它为大型分布式系统提供配置服务、同步服务及命名注册表。

因此,我们需要先启动ZooKeeper服务器,然后再启动Kafka服务器。使用以下命令即可:

# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties

# Start Kafka Server
bin/kafka-server-start.sh config/server.properties

2.3 Kafka 基本概念

我们快速介绍一下Kafka体系结构的核心概念:

1.Kafka在一个或多个服务器上作为集群运行。

2.Kafka将数据流存储在名为topics的类别中。每条数据均由键、值、时间戳组成。

3.Kafka使用发布-订阅模式。它允许某些APP充当producers(生产者),记录数据并将数据发布到Kafka topic中。

同样,它允许某些APP充当consumer(消费者)和订阅Kafka topic并处理由它产生的数据。

4.除了Prodcuer API 和 Consumer API,Kafka还为应用提供了一个 Streams API 作为流处理器。通过 Connector API 我们可以将Kafka连接到其他现有的应用程序和数据系统。

2.4 架构

如你所见,每个Kafka的 Topic 可以分为多个Partition(分区),可以使用broker(经纪人)在不同的计算机上复制这些 Topic,从而使消费者可以并行读取 Topic.

kafka的复制是针对分区的:

比如上图中有4个broker, 1个topic, 2个分区,复制因子是3。当producer发送一个消息的时候,它会选择一个分区,比如topic1-part1分区,将消息发送给这个分区的leader, broker2、broker3会拉取这个消息,一旦消息被拉取过来,slave会发送ack给master,这时候master才commit这个log。

因此,整个系统的容错级别极高。当系统正常运行时,对Topic的所有读取和写入都将通过leader,且leader会保证所有其他broker均被更新。

如果Broker失效了,系统会自动重新配置,此时副本也可以接管成为Leader.

2.5 创建Kafka Topic

让我们创建一个名为 sample,含有一个partition(分区)和一个replica(副本)的Kafka Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample

列出所有的Kafka Topics,检查是否成功创建了sample Topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

describe topics 命令还可以获得特定Topic的详细信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sample

2.6 创建生产者与消费者

这里是代码实战部分,利用Kafka-Python实现简单的生产者和消费者。

1.首先需要安装kafka-python:

pip install kafka-python

2.创建消费者(consumer.py)

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
    print (message)

3.创建生产者(producer.py)

现在,有一个消费者正在订阅我们的消息流,因此我们要创建一个生产者,发布消息到Kafka:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

现在,你重新运行消费者(consumer.py),你就会接收到生产者发送过来的消息。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红色字体的验证信息,进入互助群询问。


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典