• Java并发编程
  • JVM
  • JMX
  • Java数据结构与算法
  • 动态字节码生成技术
  • 常用工具
  • 2.0 安装以及使用

    2016-03-01 21:40:39 7,743 2

    以下的ob欧宝体育在线登录,假设读者是刚刚开始学习,之前并没有操作kafka和zookeeper的经验:

    Step 1: Download the code

    下载0.9.0.0 release版本,并解压:

    >wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
    > tar -xzf kafka_2.11-0.9.0.0.tgz
    > cd kafka_2.11-0.9.0.0

    Step 2: Start the server

    kafka集群使用到了zookeeper,因此需要先安装好zookeeper。你可以通过kafka自带的一个脚本来快速的安装一个单节点的zookeeper实例:

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (
    org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...

    现在来启动kafka服务:

    启动脚本语法:

    kafka-server-start.sh [-daemon] server.properties

    可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。

    > bin/kafka-server-start.sh -daemon config/server.properties
    [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 
    (kafka.utils.VerifiableProperties)
    ...

    Step 3: Create a topic

    现在我们来创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    现在我们可以通过以下命令来查看kafka中目前存在的topic

    > bin/kafka-topics.sh --list --zookeeper localhost:2181
    test

    除了我们通过手工的方式创建Topic,我们可以配置broker,当producer发布一个消息某个指定的Topic,但是这个Topic并不存在时,就自动创建。

    Step 4: Send some messages

    kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。

    首先我们要运行发布消息的脚本,然后在命令中输入要发送的消息的内容:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message

    Step 5: Start a consumer

    对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出:

    > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    This is a message
    This is another message

    如果你是通过不同的终端窗口来运行以上的命令,你将会看到在producer终端输入的内容,很快就会在consumer的终端窗口上显示出来。

    以上所有的命令都有一些附加的选项;当我们不携带任何参数运行命令的时候,将会显示出这个命令的详细用法。

    Step 6: Setting up a multi-broker cluster

    到目前为止,我们都是在一个单节点上运行broker,这并没有什么意思。对于kafka来说,一个单独的broker意味着kafka集群中只有一个接点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。为了有更好的理解,现在我们在一台机器上同时启动三个broker实例。

    首先,我们需要建立好其他2个broker的配置文件:

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties

    配置文件的内容分别如下:

    config/server-1.properties:
        broker.id=1
        port=9093
        log.dir=/tmp/kafka-logs-1
    
    config/server-2.properties:
        broker.id=2
        port=9094
        log.dir=/tmp/kafka-logs-2

    broker.id属性在kafka集群中必须要是唯一的。我们需要重新指定port和log目录,因为我们是在同一台机器上运行多个实例。如果不进行修改的话,consumer只能获取到一个instance实例的信息,或者是相互之间的数据会被影响。

    目前我们已经有一个zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

    > bin/kafka-server-start.sh -daemon config/server-1.properties &
    ...
    > bin/kafka-server-start.sh -daemon config/server-2.properties &
    ...

    现在我们创建一个新的topic,备份因子设置为3:

    >bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic  my-replicated-topic

    现在我们已经有了集群,并且创建了一个3个备份因子的topic,但是到底是哪一个broker在为这个topic提供服务呢(因为我们只有一个分区,所以肯定同时只有一个broker在处理这个topic)?

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

    以下是输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。因为目前我们只有一个partition,因此关于partition的信息只有一行。

  • leader节点负责给定partition的所有读写请求。如果一个topic有多个partitions,那么每个节点都会其中一部分partition的leader。

  • replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。

  • isr 是replicas的一个子集,它只列出当前还存活着的,并且备份了该partition的节点。

  • 现在我们的案例中,1号节点是leader,即使用server-1.properties启动的那个进程。

    我们可以运行相同的命令查看之前创建的名称为”test“的topic

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
            Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

    没有什么值得惊讶的地方,我们之前设置了topic的partition数量为1,备份因子为1,因此显示就如上所示了。

    现在我们向新建的topic中发送一些message:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C

    现在开始消费:

    > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C

    现在我们来测试我们容错性,因为broker1目前是leader,所以我们要将其kill

    > ps | grep server-1.properties
    7564  ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
    > kill -9 7564

    现在再执行命令:

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0

    我们可以看到,leader节点已经变成了broker 2.要注意的是,在Isr中,已经没有了1号节点。leader的选举也是从ISR中进行的。
    此时,我们依然可以 消费新消息:

    > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C

    Step 7: Use Kafka Connect to import/export data

    通过命令行来发布和接受消息,是一个很好的起步,但是有时我们的数据可能有其他来源,或者我们想将kafka的数据导出到其他系统中。对于很多系统来说,我们可以使用Kafka Connect,自己编写代码来实现导入导出的操作。

    Kafka Connect是Kafka自带的一个可以往Kafka集群中导入或者导出数据的工具。它是connectors的扩展工具,我们可以实现自定义的逻辑,从而和外部系统进行交互。在这个快速入门的章节,我们将会介绍如何基于一些简单的connectors使用Kafka Connect来往kafka中的一个topic中导入一个文件中的信息,并且将kafka中一个topic的信息,导入一个文件中。首先我们需要创建一些测试数据:

    > echo -e "foo\nbar" > test.txt

    接下来我们要以standalone模式运行两个conncetors。我们提供三个配置文件作为参数,第一个总是kafka连接进程的配置文件,内容包括:需要连接的kafka的brokers以及序列化文件的格式等。剩下的配置文件分别指定connector,包括唯一的connector名称,需要实例化的connector的全路径,以及其他的connector需要使用到的配置信息。

    > bin/connect-standalone.sh config/connect-standalone.properties \
    config/connect-file-source.properties config/connect-file-sink.properties

    待续

    上一篇:1.0 Kafka简介 下一篇:3.0 Kafka Java客户端