消息队列概念:(Message queue):
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
“消息队列”是在消息的传输过程中保存消息的容器。消息被发送到队列中。 消息队列管理器在将消息从它的源中继到它的目标时充当中间人。
使用场景:
当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异。
例子:
1)业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力。就可以把短信发送申请丢到消息队列,直接返回用户成功,短信发送模块再可以慢慢去消息队列中取消息进行处理。
2)调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送。
3)任务处理类的系统,先把用户发起的任务请求接收过来存到消息队列中,然后后端开启多个应用程序从队列中取任务进行处理。
消息队列特点及作用:
主要解决应用异步消息,解耦,流量削锋等问题,实现高性能,高可用,可伸缩架构
1, 异步处理 减少请求响应时间,提高系统响应速度
使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回,响应用户了。无需等待处理结果。
2,解耦,提高系统稳定性
电商系统下订单,发送数据给生产系统的情况。电商系统和生产系统之间的网络有可能掉线,生产系统可能会因维护等原因暂停服务。如果不使用消息队列,电商系统数据发不出去,顾客无法下单,影响业务开展。两个系统间不应该如此紧密耦合。应该通过消息队列解耦。同时让系统更健壮、稳定。
3,消除峰值
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
a、可以控制活动的人数
b、可以缓解短时间内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理
主流MQ框架:
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等
官网:
简介:
Kafka® 用于构建实时的数据管道和流式的app.它可以水平扩展,高可用,速度快,并且已经运行在数千家公司的生产环境。
基本术语:
Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).,一个逻辑上的概念,落到磁盘上是一个partition的目录。
Producer
发布消息的对象称之为主题生产者(Kafka topic producer),只负责数据生产,生产者的代码可以集成到任务系统中。
Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。只管数据存储,不管是谁生产,不管是谁消费。在集群中每个broker都有一个唯一brokerid,不得重复。
ConsumerGroup
数据消费者组,ConsumerGroup可以有多个,每个ConsumerGroup消费的数据都是一样的。可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据,组员之间不能重复消费。
工作流程图:
集群搭建
下载:
或者
wget
kafka 集群配置:
kafka依赖zookeeper,在安装 kafka前提是zookeeper集群搭建好
1,解压
[root@hdcluster1 soft]# tar -xzf kafka_2.11-1.0.0.tgz
2,进入解压文件目录,创建日志目录
[root@hdcluster1 kafka_2.11-1.0.0]# mkdir logs
3,修改server.properties配置文件
[root@hdcluster1 kafka_2.11-1.0.0]# vim config/server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#是否允许删除topic
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的最大缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/root/soft/kafka_2.11-1.0.0/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除,单位小时,默认是168小时,也就是7天
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hdcluster1:2181,hdcluster2:2181,hdcluster3:2181
比较老的版本需要配置host.name
4,修改producer.properties
[root@hdcluster1 kafka_2.11-1.0.0]# vim config/producer.properties
#配置kafka节点消息往哪个端口发,broker的端口
bootstrap.servers=hdcluster1:9092,hdcluster2:9092,hdcluster3:9092
5,修改consumer.properties
[root@hdcluster1 kafka_2.11-1.0.0]# vim config/consumer.properties
#配置kafka节点消息往哪个端口发,broker的端口
bootstrap.servers=hdcluster1:9092,hdcluster2:9092,hdcluster3:9092
配置完成后,向集群其他机子发送
[root@hdcluster1 soft]# scp -r kafka_2.11-1.0.0 hdc2:/usr/
[root@hdcluster1 soft]# scp -r kafka_2.11-1.0.0 hdc3:/usr/
修改:broker.id
如果使用ssh 链接,使用exit退出.
启动集群:(分别执行,或者在所有session中执行一次)
启动集群之前,确保zookeeper必须启动。。。。
[root@hdcluster1kafka_2.11-1.0.0]#./bin/kafka-server-start.sh -daemon config/server.properties
常用命令:
1)查看当前服务器中的所有topic
[root@hdcluster1 kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper hdcluster1:2181 --list
[root@hdc1 kafka_2.11-1.0.0]# ./bin/kafka-topics.sh --zookeeper hdc1:2181,hdc2:2181 --list
默认的__consumer_offsets解释:由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息
2)创建topic
[root@hdcluster1 kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper hdcluster1:2181 --create --replication-factor 3 --partitions 1 --topic tp1
选项说明:
--zookeeper:为zk服务器地址,逗号分割配置多个
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
3)删除topic
[root@hdcluster1 kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper cluster1:2181 --delete --topic tp2
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。(和前面配置相呼应)
4)发送消息(先启动消费者再发送)
[root@hdcluster1 kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list hdcluster1:9092 --topic tp1
>aaa
>abc (如果写错,需要ctrl+backspace删除)
5)消费消息
[root@hdcluster2 kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --zookeeper hdcluster1:2181 --from-beginning --topic tp1
--from-beginning:会把tp1主题中以往所有的数据都读取出来。
6)查看某个Topic的详情
[root@hdcluster1 kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper hdcluster1:2181 --describe --topic tp1
parition数量、replica因子以及每个partition的leader、replica信息
leader:负责处理消息的读和写,
Replicas:列出了所有的副本节点,不管节点是否在服务中.
Lsr:是正在服务中的节点
工作流程细节分析
Kafka生产过程分析
写入方式 :
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。 数据的分发策略由producer决定,默认是defaultPartition Utils.abs(key.hashCode) % numPartitions
主题:
Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。通常,不同应用产生不同类型的数据,可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生成者写入的新消息。partition的目录中有多个segment组合(index,log),一个Topic对应多个partition,一个partition对应多个segment组合。一个segment有默认的大小是1G。每个partition可以设置多个副本(replication-factor 1),会从所有的副本中选取一个leader出来。所有读写操作都是通过leader来进行的。
分区(Partition) :
集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题(topic)看作进行了分区的日志文件(partition log)。主题的每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫做偏移量(offset),这个偏移量能够唯一地定位当前分区中的每一条消息。
分区原因:方便在集群中扩展
可以提高并发
副本(Replication) :
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
存储方式:
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
存储策略 :
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
Zookeeper存储结构 :
消息生产流程:
1)producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
消费过程分析
消费模型
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。
基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理)。如果要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。
Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。
消费者组
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
参考帖子: