MQ系列(十五):Kafka 介绍和安装运行、发布订阅

  Kafka 是一个开源的流处理平台,由 Scala 和 Java 编写;是一种高吞吐量的分布式发布订阅消息系统。

  Kafka 用于构建实时的数据管道和流式应用程序。它具有水平可扩展性、容错性、速度极快。并在数千家公司投入生产。

  Kafka 目前最新的版本是 2.2.0,本篇以该版本为例。Apache Kafka 官网Apache 软件基金会(ASF) 官网

Kafka 介绍

Kafka 可以搭建集群运行在一个或多个可跨多个数据中心的服务器上。Kafka 集群以称为主题的类别存储流,每条记录由一个键,一个值和一个时间戳组成。

Kafka 特性

Kafka 是基于消息 消息发布-订阅 模式实现的消息系统,主要特性有如下:

  1. 持久化:以时间复杂度 O(1) 的方式提供消息持久化功能,即使对 TB 级以上的数据也能保证常数时间复杂度的访问性能。支持容错持久化。
  2. 高吞吐:在廉价的商用机器上也能支持单机每秒 10 万条以上的吞吐量。
  3. 分布式:支持消息分区以及分布式消息,并保证分区内的消息顺序。
  4. 跨平台:支持不同的技术平台的客户端(如 Java,PHP,Python 等)。
  5. 实时性:支持实时数据处理和离线数据处理。
  6. 伸缩性:支持水平扩展。

Kafka 概念

  1. Broker:Kafka 集群包含一个或多个服务器,这些服务器被称为 Broker。
  2. Topic:逻辑上与 RabbitMq 中 Queue 队列类似。每条发布到 Kafka 集群的消息都必须有一个 Topic。(物理上不现的 Topic 消息分开存储,逻辑上一个 Topic 的消息虽然保存于一个或多个 Broker 上,但用户只需指定 Topic,即可生产或消费数据而不必关心数据存于保处)。
  3. Partition:是物理概念上的分区,为了提高系统吞吐率,在物理上,每个 Topic 会分成一个或多个 Partition,每个 Partition 对应一个文件夹(存储对应分区的消息内容和索引文件)。
  4. Producer:消息生产者,负责生产消息并发送到 Kafka Broker。
  5. Consumer:消息消费者,向 Kafka Broker 读取消息并处理的客户端。
  6. Consumer Group:每个 Consumer 属于一个特定的组(可为每个 Consumer 指定属于一个组,若不指定则属性默认组),组可以用来实现一条消息被组内多个成员消费等功能。

Kafka 应用

Kafka通常应用主要有两大类:

  1. 构建实时流数据管道,可在系统或应用程序之间可靠地获取数据。
  2. 构建实例流式应用程序,或转换和响应流数据。

Kafka 核心 API

Kafka 有 4 个核心的 API:

  1. Producer API:充许应用发布流数据到一个或多个 Kafka topic(主题)。
  2. Consumer API:允许应用订阅一个或多个主题,并处理这些流数据。
  3. Stream API:允许应用作为流处理器,消费一个或多个主题的输入流,并生产输出流到一个或多个主题 ,从而有效地将输入流转换为输出流。
  4. Connector API:允许构建和运行可重复使用的生产者或消费者,将 Kafka 主题 连接到现有的应用程序或数据系统。例如,关系数据库的链接器可能会捕获表的每个更改。

在 Kafka 中,客户端服务器之间的通信是通过简单,高性能,语言无关的 TCP 协议完成的。 此协议已版本化并保持与旧版本的向后兼容性。 并为 Kafka 提供 Java 客户端,但客户端有多种语言版本。

Kafka 安装

Kafka 的安装全程参数官方文档的 Quickstart。Kafka 支持 Linux 和 Windows 操作系统,操作系统必须安装有 Java 环境。这里记录 Kafka 在 Ubuntu 系统的安装。

安装运行

  1. 下载安装包

    1
    /home/download# wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
  2. 解压安装包

    1
    /home/download# tar zxvf kafka_2.12-2.2.0.tgz -C /usr/local/

    Kafka 解压的 bin 目录存储的是可执许 sh 脚本,里面包含了 Kafka 和 Zookeeper 相关内容。bin 目录下面的 windows 存放的是可在 Windows 系统执行的批处理文件(bat),即支持在 Windows 运行;解压的 cofnig 目录存放的是相关的配置文件。

  3. 运行 Kafka

    Kafka 依赖了 Zookeeper ,需要连接到 Zookeeper 服务,若没有外部独立的 Zookeeper 服务器,可在 bin 目录里启动 Zookeeper 服务。默认绑定的端口是 2181,可在 config/zookeeper.properties 配置文件中修改。

    1
    /usr/local/kafka_2.12-2.2.0/bin# ./zookeeper-server-start.sh config/zookeeper.properties

    启动员 Kafka 服务:如果使用外部的 Zookeeper,在 config/server.properties 配置文件中修改 zookeeper.connect 的地址,可设置多个,逗号分隔。

    1
    /usr/local/kafka_2.12-2.2.0/bin# ./kafka-server-start.sh config/server.properties

    备注:启动 Kafka 服务可能会报错未知主机名的错误(java.net.UnknownHostException: Name or service not known),原因是主机名没有映射到 127.0.0.1 地址上,两种解决方法:

    • 修改主机名:修改 /etc/hostname 的值为:localhost。
    • 添加主机名映射:在 /etc/hosts 添加 127.0.0.1 到 hostname 的映射。
  4. 配置允许外部 Kafka 客户端建立连接

    因 Kafka 是通过 Socket 进行的,所以需要一直监听某一个端口是否有建立连接的请求。

    Kafka Server 默认配置只允许本地生产者或消费者建立连接,因为默认监听的是 localhost:9092。

    如果生产者和消费者是外部应用或在外部服务器而需要建立连接,则需要修改配置文件 server.properties 中的 Socket Server Settings 配置项里的监听地址,其中有 listenersadvertised.listeners 两个配置属性,要给外部 Kafka Client 建立连接,这两个属性必须配置其中一个。

    • listeners:Socket 服务器的监听地址。
      如果没有设置,则使用 java.net.InetAddress.getCanonicalHostName() 获取的值(localhost)。
      显式配置示例:listeners=PLAINTEXT://10.0.3.4:9092。
    • advertised.listeners:Broker(代理) 向生产者和消费者发布的监听地址(hostname:port)。
      如果没有设置,而 listeners 项有设置,则使用 listeners 的配置。否则使用 java.net.InetAddress.getCanonicalHostName() 获取的值(localhost)。

    如果没有显式指定监听的地址,则外部的 Kafka Client 发起建立连接则会报异常,如下:

    1
    2
    3
    2019-05-09 14:37:44,875 WARN [LogId:] [o.a.k.c.NetworkClient->processDisconnection:671] 
    [kafka-admin-client-thread | adminclient-1] [AdminClient clientId=adminclient-1]
    Connection to node 0 could not be established. Broker may not be available.

发布订阅

  1. 创建主题:创建一个名为 test 的主题,该主题包含一个 分区和一个副本,

    1
    /usr/local/kafka_2.12-2.2.0/bin# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    查看主题:

    1
    2
    /usr/local/kafka_2.12-2.2.0/bin# ./kafka-topics.sh --list --bootstrap-server localhost:9092
    test

    以上是手动创建主题,也可在发送信息时自动创建主题,如下:

  2. 运行生产者发送消息

    1
    2
    3
    /usr/local/kafka_2.12-2.2.0/bin# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message

    Kafka 附带了一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。运行生产者,然后输入一些消息到控制台发送到服务器。

  3. 运行消费者消费消息

    另起一个连接 Ubuntu 的 SSH 控制台,生产者控制台输入内容回转后,消费者控制台可以看到消息说明已消费。

    1
    2
    3
    /usr/local/kafka_2.12-2.2.0/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message

注意:如果生产者和消费者是外部应用或在外部服务器上,则上面示例的命令中的 localhost:9092 需要改为 Kafka Server Socket 监听的地址端口。

MQ系列(十五):Kafka 介绍和安装运行、发布订阅

http://blog.gxitsky.com/2020/01/16/MQ-15-Kafka-feature-install/

作者

光星

发布于

2020-01-16

更新于

2022-06-07

许可协议

评论