小工具      在线工具  汉语词典  css  js  c++  java

kafka 的默认配置比较简单,但想把其端口暴露给外网(指定端口),则有一些额外的注意情况
kafka 的版本,0.8和0.10 的配置不同,这里以0.10.0.0 为主,
没有用集群,一台机做测试

zookeeper安装与启动

kafka 的安装包里自带有zookeeper,不过这里还是自行下载.
这里采用3.4.8版本
下载及安装请参考
http://zookeeper.apache.org/doc/r3.4.8/zookeeperStarted.html
启动:
bin/zkServer.sh start

zk默认客户端的连接
bin/zkCli.sh -server 127.0.0.1:2181

kafka的安装与启动

kafka采用0.10.0.0版本,文档请查看
http://kafka.apache.org/quickstart

【2018年7月最新版本为1.1.0,以下命令已确认,部分变更会注明】

官网内置了zookeeper,注意不要执行。

关闭kfk服务器(kafka没有shutdown命令,如果想关闭请参考kill命令)

ps ax | grep -i 'kafkaServer-gc.log' | grep java | awk '{print $1}' | xargs kill -9

启动
bin/kafka-server-start.sh config/server.properties

配置
conf/service.properties
里面有一个和zookeeper的配置,

zookeeper.connect=localhost:2181

一些操作:

日志
tail -111f  logs/server.log

创建1个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic list 
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic 描述
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

删除topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181/chroot --delete --topic test


开启生产者 (使用内置工具开启一个通过产者,这是一个单独的进程,和kafka服务端无关,下面消费者同理)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test2


开启消费者
0.10.0版本 :bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
1.1.0版本 :bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 --from-beginning

如果 在1.1.0 中执行老版本的开启消费者,会有下面这个warning,但也可以使用
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

创建kafka集群(这一部分内容在官网可直接查看)

准备另两个节点的配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改端口等参数:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
开启另两个节点
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
使用kafka集群

创建主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic

复制因子为3,这意味着数据将被分为三份。这是因为我们有三台服务器。如果设置大于3,将会抛出错误:

Error while executing topic command : Replication factor: 5 larger than available brokers: 3.
[2018-07-05 11:34:58,818] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)

查看主题列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看主题详细信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

为了使消费者和生产者能够,参考上面

相关配置

数据目录,kfk有持久化,这里是放文件的地方(就是所有的数据)
server.properties 中的 log.dirs=/tmp/kafka-logs
里面能看到 数据 /tmp/kafka-logs/…

  • 防范措施

上面步骤后,基本上就能直接使用了,不过要改端口,和有外网访问,则需要修改些东西
server.properties 中,
重要配置
配置好 广告主机名 和 advertised.port
1,如果你的消费者是内网侧,然后这个adverted.host.name 直接设置 kafka的内网ip即可。

2,如果你的消费者是外网侧,这个广告的主机名 则要设置为外网ip,但是有些机器有网络限制,此kafak的机器访问不了这个外网ip,那么,这个广告的主机名 设置为外网ip则会抛错.所以,一般的做法,如果是外网侧,广告主机名 设置为主机名,通过/etc/hosts的配置,来达到不同机器不同ip.

以上都是配置的IP,不要配置为127.0.0.1,否则其他机器访问kafka时,kafka不会响应。

下面是具体配置

  • kafka配置文件server.properties
listeners=PLAINTEXT://0.0.0.0:8180
advertised.host.name=ZN-YST-Backup03
advertised.port=8180
  • 卡夫卡服务器/etc/hosts
127.0.0.1   ZN-YST-Backup03
  • 消费者/etc/主机
120.xxx.xxx.90 ZN-YST-Backup03

java客户端

也可以直接在官网上找到

生产者java客户端

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "120.197.3.90:8180");
//        props.put("bootstrap.servers", "10.1.1.29:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10000; i++) {
            Future<RecordMetadata> futureresult = producer.send(new ProducerRecord<String, String>("test2", Integer.toString(i), Integer.toString(i)+"gogo"), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 发送的返回

                }
            });
//            futureresult.get();//阻塞
        }

        producer.close();
    }

消费者java客户端

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "120.xx.xx.xx:8180");
//        props.put("bootstrap.servers", "10.1.1.29:9092");
        props.put("group.id", "tests");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        }
    }

关于spring 结合 ,参考:http://www.baeldung.com/spring-kafka

问题

  • 为什么java客户端不需要 zookeeper ip和端口?
    kafak内置的消费端工具是这样子的
    /home/huge/projects/kfk_cluster/kfk/kafka_2.11-0.10.0.0/bin/kafka-console-consumer.sh --zookeeper ZN-YST-Backup03:8181 --topic test2 --from-beginning
    而java没有填zookeeper的ip端口,而是kafka的ip端口>>.

2018年7月更新,Kafka最新内置的consumer也是直接填写broker地址,没有填写zookeeper。

  • java producer 访问不了服务端 ,没有抛错,如果用回调,可以看到有 timeout 的情况
    server.properties 加上 advertised.host.name=10.1.1.29

如果消费者出现
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition

消费者配置加上:auto.offset.reset=latest
这个参数关于官网的说法:
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):(当没有存在的offset时候,用哪个?)

  • 最早:自动将偏移量重置为最早的偏移量
  • 最新:自动将偏移量重置为最新偏移量
  • none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
  • 其他任何事情:向消费者抛出异常。
. . .

相关推荐

额外说明

责任链模式- 业务校验工具类

1.FluentValidator import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List;

额外说明

【JavaEE】IO 操作

文章目录 前言 什么是 IO Reader 读操作 1. 创建 Reader 类 2. 理解 Reader 类中的不同 read 方法 3. 使用 Reader 类当中的不同 read 方法 3. 关闭文件操作 Writer 写操作 1. 创建出 Wri

额外说明

1024快乐!!!

1024快乐!!! 我也想要一个1024节日勋章! 哈哈哈哈哈哈哈!!! 哈哈哈哈哈哈哈!!! 哈哈哈哈哈哈哈!!!

额外说明

mysql 事务原理详解

前言 事务是mysql Innodb引擎的一大特点,可以说,在日常开发中,对于mysql事务的使用无处不在,因此深入了解并掌握mysql的事务原理很有必要。 一、mysql事务简介 事务 是一组操作集合,一个不可分割的工作单位; 事务会将所有操作作为一个

额外说明

注释的两难之道:程序员的反思

文章目录 代码即注释? 注释的艺术 1. 注释要言简意赅 2. 注释的时机 3. 注释内容 4. 避免无意义的注释 5. 维护注释 如何看待注释? -欢迎来到Java学习路线专栏~注释的两难之道:程序员的反思 ☆* o(≧▽≦)o *☆嗨~我是IT·陈寒

额外说明

java使用Quartz任务调用crontab表达式的时候报错:Based on configured schedule, the given trigger will never fire

项目升级适配改造时遇到的问题,定时任务和crontab表达式理论上都是没有问题的,但实际测试验证时报错了 这里先推荐一个crontab执行时间计算工具:crontab时间计算 之前一直在使用bejson的Cron表达式校验工具,但是无法验证Quartz类

额外说明

TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中

一、TiDB Binlog 在上篇文章中我们介绍了使用TiDB Binlog将数据同步至下游的Mysql 中,本篇我们学习下使用TiDB Binlog工具将数据同步至Kafka中自定义业务逻辑,比如可以做TIDB和ES、MongoDB 或 Redis的数

额外说明

leetcode 886. 可能的二分法

给定一组 n 人(编号为 1, 2, …, n), 我们想把每个人分进任意大小的两组。每个人都可能不喜欢其他人,那么他们不应该属于同一组。 给定整数 n 和数组 dislikes ,其中 dislikes[i] = [ai, bi] ,表示不允许将编号为

额外说明

honeypot_如何使用Honeypot阻止WordPress中的垃圾邮件评论机器人

蜜罐 垃圾评论是一个巨大的痛苦。为了防止垃圾评论,博主最终采取了严格的措施,例如要求人们在提交评论之前注册或输入验证码。验证码、数学测验和注册等垃圾邮件预防控制措施使普通访问者很难发表评论。通过实施其中一种方法,您正在惩罚错误的人群。许多垃圾评论是由机器

额外说明

如何通过巨量千川快速起爆新老账号,巨量千川引爆直播间线上速成

新标题:《探索如何利用巨额千川快速提升账号影响力,引爆直播间人气》 文本: 随着社交媒体的快速发展,很多人都希望能够快速提升自己账号的影响力,点燃直播间的人气。本文将揭示利用千川庞大平台的技巧和策略,帮助您快速实现这一目标。 首先要明确聚道千川平台的特点

ads via 小工具