您的当前位置:首页正文

(二十一)大数据学习之kafka

来源:华拓网

kafka

一、Kafka是什么:kafka一般用来缓存数据。

1.开源消息系统

2.最初是LinkedIn公司开发,2011年开源。2012年10月从Apache Incubator毕业。

  • 项目目标是为处理实时数据,提供一个统一、高通量、低等待的平台。

3.Kafka是一个分布式消息队列。

  • 消息根据Topic来归类,发送消息 Producer,接收 Consumer。
  • kafka集群有多个kafka实例组成,每个实例成为broker

4.无论是kafka集群,还是 producer consumer 都依赖于 zookeeper 集群保存元信息,来保证系统可用性。

二、消息队列

1.点对点
2.发布、订阅模式

三、为什么需要消息队列

1.解耦

2.冗余

  • 消息队列把数据进行持久化,直到他们已经被完全处理。

3.扩展性

4.灵活性

5.可恢复性

6.顺序保证(相对)

  • kafka保证一个Partition内部的消息有序。

7.缓冲

8.异步通信

  • 很多时候,用户不想也不需要立即处理消息。
  • 消息队列提供异步处理机制,允许用户把消息放入队列,但不立即处理。

四、Kafka架构

1.Producer 消息生产者,就是往kafka中发消息的客户端

2.consumer:消息消费者,向kafka broker中取消息的客户端。

3.topic 理解为队列。

4.Consumer Group 消费者组

  • 组内有多个消费者实例,共享一个公共的ID,即groupID。
  • 组内所有消费者协调在一起,消费topic。
  • 每个分区,只能有同一个消费组内的一个consumer消费。

5.broker

  • 一台kafka服务器就是一个broker

6.partition:一个topic分为多个partition

  • 每个partition是一个有序队列。
  • kafka保证按一个partition中的顺序将消息发送个consumer。
  • 不能保证topic整体有序

7.offset:Kafka存储文件按照offset.kafka命名。

五.kafka安装部署

1.前提:

  • 安装好zookeeper
    2.下载
    3.上传到linux
    4.解压
cd /opt/software

tar -zxvf kafka_2.11-2.1.1.tgz -C /opt/module

cd /opt/module

mv kafka_2.11-2.1.1/ kafka/ 

5.修改配置文件
a.server.properties

broker.id=0           //broker 的 全局唯一编号,不能重复
delete.topic.enable=true    //允许删除topic

      //处理网络请求的线程数量
num.io.threads=8          //用来处理磁盘IO的线程数量
image.png
log.dirs=/usr/local/kafka_2.11-2.1.1/logs      //kafka运行日志存放的路径

zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181           //zookeeper相关信息
image.png image.png

6.发送到其他两台
(1)注意:一定要更改broker.id的值
(2)分发

cd /opt/module

scp -r kafka/ bigdata122:/opt/module

scp -r kafka/ bigdata123:/opt/module

(3)更改另外两台的broker.id
7.启动
(1)三台都启动

cd /opt/module/kafka

bin/kafka-server-start.sh config/server.properties &

8.关闭

cd /opt/module/kafka

bin/kafka-server-stop.sh stop

六.kafka常用命令:

1.topic相关:
(1)创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.127.121:2181 --replication-factor 2 --partitions 3 --topic KafkaTest
参数 意义
--create 表示建立
--zookeeper 表示ZK地址,可以传递多个,用逗号分隔--zookeeper IP:PORT,IP:PORT,IP:PORT
--replication-factor 表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量
--partitions 表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每个线程处理一个分区,分区越多吞吐量就会越大,但是分区越多也意味着需要打开更多的文件句柄数量,这样也会带来一些开销。
--topic 表示主题名称

(2)查看kafka集群的所有topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

(3)创建名称为topic_test的topic,partitions为1个,副本为1个

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10

(4)为topic_test添加partition为10,只能增加,不能减少

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10

(5)删除名称为topic_test的topic(只删除zookeeper内的元数据,消息文件须手动删除)

bin/kafka-topics.sh  --delete --zookeeper localhost:2181  --topic topic_test

(6) 查看topic为topic_test的详细信息

bin/kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic_test

2.consumer相关
(1)查看所有consumer的group

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

(2) 查看group为group_test消费的机器以及partition

bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092

3.控制台相关
(1)控制台向topic_test发送数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test

(2)控制台接收topic_test数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning

()
()
()
()