kafka
一、Kafka是什么:kafka一般用来缓存数据。
1.开源消息系统
2.最初是LinkedIn公司开发,2011年开源。2012年10月从Apache Incubator毕业。
- 项目目标是为处理实时数据,提供一个统一、高通量、低等待的平台。
3.Kafka是一个分布式消息队列。
- 消息根据Topic来归类,发送消息 Producer,接收 Consumer。
- kafka集群有多个kafka实例组成,每个实例成为broker
4.无论是kafka集群,还是 producer consumer 都依赖于 zookeeper 集群保存元信息,来保证系统可用性。
二、消息队列
1.点对点
2.发布、订阅模式
三、为什么需要消息队列
1.解耦
2.冗余
- 消息队列把数据进行持久化,直到他们已经被完全处理。
3.扩展性
4.灵活性
5.可恢复性
6.顺序保证(相对)
- kafka保证一个Partition内部的消息有序。
7.缓冲
8.异步通信
- 很多时候,用户不想也不需要立即处理消息。
- 消息队列提供异步处理机制,允许用户把消息放入队列,但不立即处理。
四、Kafka架构
1.Producer 消息生产者,就是往kafka中发消息的客户端
2.consumer:消息消费者,向kafka broker中取消息的客户端。
3.topic 理解为队列。
4.Consumer Group 消费者组
- 组内有多个消费者实例,共享一个公共的ID,即groupID。
- 组内所有消费者协调在一起,消费topic。
- 每个分区,只能有同一个消费组内的一个consumer消费。
5.broker
- 一台kafka服务器就是一个broker
6.partition:一个topic分为多个partition
- 每个partition是一个有序队列。
- kafka保证按一个partition中的顺序将消息发送个consumer。
- 不能保证topic整体有序
7.offset:Kafka存储文件按照offset.kafka命名。
五.kafka安装部署
1.前提:
- 安装好zookeeper
2.下载
3.上传到linux
4.解压
cd /opt/software
tar -zxvf kafka_2.11-2.1.1.tgz -C /opt/module
cd /opt/module
mv kafka_2.11-2.1.1/ kafka/
5.修改配置文件
a.server.properties
broker.id=0 //broker 的 全局唯一编号,不能重复
delete.topic.enable=true //允许删除topic
//处理网络请求的线程数量
num.io.threads=8 //用来处理磁盘IO的线程数量
image.png
log.dirs=/usr/local/kafka_2.11-2.1.1/logs //kafka运行日志存放的路径
zookeeper.connect=bigdata121:2181,bigdata122:2181,bigdata123:2181 //zookeeper相关信息
image.png
image.png
6.发送到其他两台
(1)注意:一定要更改broker.id的值
(2)分发
cd /opt/module
scp -r kafka/ bigdata122:/opt/module
scp -r kafka/ bigdata123:/opt/module
(3)更改另外两台的broker.id
7.启动
(1)三台都启动
cd /opt/module/kafka
bin/kafka-server-start.sh config/server.properties &
8.关闭
cd /opt/module/kafka
bin/kafka-server-stop.sh stop
六.kafka常用命令:
1.topic相关:
(1)创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.127.121:2181 --replication-factor 2 --partitions 3 --topic KafkaTest
参数 | 意义 |
---|---|
--create | 表示建立 |
--zookeeper | 表示ZK地址,可以传递多个,用逗号分隔--zookeeper IP:PORT,IP:PORT,IP:PORT |
--replication-factor | 表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量 |
--partitions | 表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每个线程处理一个分区,分区越多吞吐量就会越大,但是分区越多也意味着需要打开更多的文件句柄数量,这样也会带来一些开销。 |
--topic | 表示主题名称 |
(2)查看kafka集群的所有topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
(3)创建名称为topic_test的topic,partitions为1个,副本为1个
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
(4)为topic_test添加partition为10,只能增加,不能减少
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
(5)删除名称为topic_test的topic(只删除zookeeper内的元数据,消息文件须手动删除)
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test
(6) 查看topic为topic_test的详细信息
bin/kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic_test
2.consumer相关
(1)查看所有consumer的group
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
(2) 查看group为group_test消费的机器以及partition
bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
3.控制台相关
(1)控制台向topic_test发送数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test
(2)控制台接收topic_test数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning
()
()
()
()