MQ系列(十五):Kafka 介绍和安装运行、发布订阅
Kafka 是一个开源的流处理平台,由 Scala 和 Java 编写;是一种高吞吐量的分布式发布订阅消息系统。
Kafka 用于构建实时的数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快。并在数千家公司投入生产。
Kafka 目前最新的版本是 2.2.0,本篇以该版本为例。Apache Kafka 官网,Apache 软件基金会(ASF) 官网。
Kafka 介绍
Kafka 可以搭建集群运行在一个或多个可跨多个数据中心的服务器上。Kafka 集群以称为主题的类别存储流,每条记录由一个键,一个值和一个时间戳组成。
Kafka 特性
Kafka 是基于消息 消息发布-订阅 模式实现的消息系统,主要特性有如下:
- 持久化:以时间复杂度 O(1) 的方式提供消息持久化功能,即使对 TB 级以上的数据也能保证常数时间复杂度的访问性能。支持容错持久化。
- 高吞吐:在廉价的商用机器上也能支持单机每秒 10 万条以上的吞吐量。
- 分布式:支持消息分区以及分布式消息,并保证分区内的消息顺序。
- 跨平台:支持不同的技术平台的客户端(如 Java,PHP,Python 等)。
- 实时性:支持实时数据处理和离线数据处理。
- 伸缩性:支持水平扩展。
Kafka 概念
- Broker:Kafka 集群包含一个或多个服务器,这些服务器被称为 Broker。
- Topic:逻辑上与 RabbitMq 中 Queue 队列类似。每条发布到 Kafka 集群的消息都必须有一个 Topic。(物理上不现的 Topic 消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上,但用户只需指定 Topic,即可生产或消费数据而不必关心数据存于保处)。
- Partition:是物理概念上的分区,为了提高系统吞吐率,在物理上,每个 Topic 会分成一个或多个 Partition,每个 Partition 对应一个文件夹(存储对应分区的消息内容和索引文件)。
- Producer:消息生产者,负责生产消息并发送到 Kafka Broker。
- Consumer:消息消费者,向 Kafka Broker 读取消息并处理的客户端。
- Consumer Group:每个 Consumer 属于一个特定的组(可为每个 Consumer 指定属于一个组,若不指定则属性默认组),组可以用来实现一条消息被组内多个成员消费等功能。
Kafka 应用
Kafka通常应用主要有两大类:
- 构建实时流数据管道,可在系统或应用程序之间可靠地获取数据。
- 构建实例流式应用程序,或转换和响应流数据。
Kafka 核心 API
Kafka 有 4 个核心的 API:
- Producer API:充许应用发布流数据到一个或多个 Kafka topic(主题)。
- Consumer API:允许应用订阅一个或多个主题,并处理这些流数据。
- Stream API:允许应用作为流处理器,消费一个或多个主题的输入流,并生产输出流到一个或多个主题 ,从而有效地将输入流转换为输出流。
- Connector API:允许构建和运行可重复使用的生产者或消费者,将 Kafka 主题 连接到现有的应用程序或数据系统。例如,关系数据库的链接器可能会捕获表的每个更改。
在 Kafka 中,客户端 和 服务器之间的通信是通过简单,高性能,语言无关的 TCP 协议完成的。 此协议已版本化并保持与旧版本的向后兼容性。 并为 Kafka 提供 Java 客户端,但客户端有多种语言版本。
Kafka 安装
Kafka 的安装全程参数官方文档的 Quickstart。Kafka 支持 Linux 和 Windows 操作系统,操作系统必须安装有 Java 环境。这里记录 Kafka 在 Ubuntu 系统的安装。
安装运行
下载安装包
1
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
解压安装包
1
tar zxvf kafka_2.12-2.2.0.tgz -C /usr/local/
Kafka 解压的 bin 目录存储的是可执许 sh 脚本,里面包含了 Kafka 和 Zookeeper 相关内容。bin 目录下面的 windows 存放的是可在 Windows 系统执行的批处理文件(bat),即支持在 Windows 运行;解压的 cofnig 目录存放的是相关的配置文件。
运行 Kafka
Kafka 依赖了 Zookeeper ,需要连接到 Zookeeper 服务,若没有外部独立的 Zookeeper 服务器,可在 bin 目录里启动 Zookeeper 服务。默认绑定的端口是 2181,可在 config/zookeeper.properties 配置文件中修改。
1
./zookeeper-server-start.sh config/zookeeper.properties
启动员 Kafka 服务:如果使用外部的 Zookeeper,在 config/server.properties 配置文件中修改 zookeeper.connect 的地址,可设置多个,逗号分隔。
1
./kafka-server-start.sh config/server.properties
备注:启动 Kafka 服务可能会报错未知主机名的错误(java.net.UnknownHostException: Name or service not known),原因是主机名没有映射到 127.0.0.1 地址上,两种解决方法:
- 修改主机名:修改 /etc/hostname 的值为:localhost。
- 添加主机名映射:在 /etc/hosts 添加 127.0.0.1 到 hostname 的映射。
配置允许外部 Kafka 客户端建立连接
因 Kafka 是通过 Socket 进行的,所以需要一直监听某一个端口是否有建立连接的请求。
Kafka Server 默认配置只允许本地生产者或消费者建立连接,因为默认监听的是 localhost:9092。
如果生产者和消费者是外部应用或在外部服务器而需要建立连接,则需要修改配置文件
server.properties
中的 Socket Server Settings 配置项里的监听地址,其中有 listeners 和 advertised.listeners 两个配置属性,要给外部 Kafka Client 建立连接,这两个属性必须配置其中一个。- listeners:Socket 服务器的监听地址。
如果没有设置,则使用 java.net.InetAddress.getCanonicalHostName() 获取的值(localhost)。
显式配置示例:listeners=PLAINTEXT://10.0.3.4:9092。 - advertised.listeners:Broker(代理) 向生产者和消费者发布的监听地址(hostname:port)。
如果没有设置,而 listeners 项有设置,则使用 listeners 的配置。否则使用 java.net.InetAddress.getCanonicalHostName() 获取的值(localhost)。
如果没有显式指定监听的地址,则外部的 Kafka Client 发起建立连接则会报异常,如下:
1
2
314:37:44,875 WARN [LogId:] [o.a.k.c.NetworkClient->processDisconnection:671]
| adminclient-1] [AdminClient clientId=adminclient-1]
Connection to node 0 could not be established. Broker may not be available.- listeners:Socket 服务器的监听地址。
发布订阅
创建主题:创建一个名为 test 的主题,该主题包含一个 分区和一个副本,
1
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看主题:
1
2./kafka-topics.sh --list --bootstrap-server localhost:9092
test以上是手动创建主题,也可在发送信息时自动创建主题,如下:
运行生产者发送消息
1
2
3./kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another messageKafka 附带了一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。运行生产者,然后输入一些消息到控制台发送到服务器。
运行消费者消费消息
另起一个连接 Ubuntu 的 SSH 控制台,生产者控制台输入内容回转后,消费者控制台可以看到消息说明已消费。
1
2
3./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
注意:如果生产者和消费者是外部应用或在外部服务器上,则上面示例的命令中的 localhost:9092 需要改为 Kafka Server Socket 监听的地址端口。
MQ系列(十五):Kafka 介绍和安装运行、发布订阅
http://blog.gxitsky.com/2020/01/16/MQ-15-Kafka-feature-install/