合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
# **生产者配置参数解释:** ## **bootstrap.servers** :kafka集群broker的地址 ## **key.serializer**:关键字的序列化方式 ## **value.serializer**:消息值的序列化方式 ## **acks**:指定必须要有多少个分区的副本接收到该消息,服务端才会向生产者发送响应,可选值为:0,1,2,…,all,如果设置为0,producter就只管发出不管kafka server有没有确认收到。设置all则表示kafka所有的分区副本全部确认接收到才返回。 ## **buffer.memory**:生产者的内存缓冲区大小。如果生产者发送消息的速度 > 消息发送到kafka的速度,那么消息就会在缓冲区堆积,导致缓冲区不足。这个时候,send()方法要么阻塞,要么抛出异常。 ## **max.block.ms**:表示send()方法在抛出异常之前可以阻塞多久的时间,默认是60s ## **compression.type**:消息在发往kafka之前可以进行压缩处理,以此来降低存储开销和网络带宽。默认值是null,可选的压缩算法有snappy、gzip和lz4 ## **retries**:生产者向kafka发送消息可能会发生错误,有的是临时性的错误,比如网络突然阻塞了一会儿,有的不是临时的错误,比如“消息太大了”,对于出现的临时错误,可以通过重试机制来重新发送 ## **retry.backoff.ms**:每次重试之间间隔的时间,第一次失败了,那么休息一会再重试,休息多久,可以通过这个参数来调节 ## **batch.size**:生产者在发送消息时,可以将即将发往同一个分区的消息放在一个批次里,然后将这个批次整体进行发送,这样可以节约网络带宽,提升性能。该参数就是用来规约一个批次的大小的。但是生产者并不是说要等到一个批次装满之后,才会发送,不是这样的,有时候半满,甚至只有一个消息的时候,也可能会发送,具体怎么选择,我们不知道,但是不是说非要等装满才发。因此,如果把该参数调的比较大的话,是不会造成消息发送延迟的,但是会占用比较大的内存。但是如果设置的太小,会造成消息发送次数增加,会有额外的IO开销 ## **linger.ms**:生产者在发送一个批次之前,可以适当的等一小会,这样可以让更多的消息加入到该批次。这样会造成延时增加,但是降低了IO开销,增加了吞吐量 ## **client.id**:服务器用来标志消息的来源,是一个任意的字符串 ## **max.in.flight.requests.per.connection**:一个消息发送给kafka集群,在收到服务端的响应之前的这段时间里,生产者还可以发n-1个消息。这个参数配置retries,可以保证消息的顺序,后面会介绍 ## **request.timeout.ms**:生产者在发送消息之后,到收到服务端响应时,等待的时间限制 ## **max.request.size**:生产者发送消息的大小。可以是一个消息的大小,也可以发送的一个批次的消息大小 ## **receive.buffer.bytes和send.buffer.bytes**:tcp socket接收和发送消息的缓冲区大小,其实指的就是ByteBuffer的大小 # **消费者配置参数解释:** ## **groupid**:一个字符串用来指示一组consumer所在的组群。实现同一个topic可由不同的组群消费 ## **auto.offset.reset:可选三个参数** earliest ---当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest---当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none---topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 ## **socket.timeout.ms**:默认值:3000,socket超时时间。 ## **socket.buffersize**: 默认值:64\*1024,socket receive buffer。 ## **fetch.size** :默认值:300 \* 1024,控制在一个请求中获取的消息的字节数。 这个参数在0.8.x中由fetch.message.max.bytes,fetch.min.bytes取代。 ## **backoff.increment.ms**:默认值:1000,这个参数避免在没有新数据的情况下重复频繁的拉数据。 如果拉到空数据,则多推后这个时间。 ## **queued.max.message.chunks**:默认值:2,consumer内部缓存拉回来的消息到一个队列中。 这个值控制这个队列的大小。 ## **auto.commit.enable**:默认值:true,如果true,consumer定期地往zookeeper写入每个分区的offset。 ## **auto.commit.interval.ms**:默认值:10000,往zookeeper上写offset的频率。 ## **auto.offset.reset**:默认值:largest,如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常。 ## **consumer.timeout.ms**:默认值:\-1,默认-1,consumer在没有新消息时无限期的block。如果设置一个正值, 一个超时异常会抛出。 ## **rebalance.retries.max**:默认值:4,rebalance时的最大尝试次数。 max.poll.interval.ms:拉取的最大时间间隔,如果你一次拉取的比较多,建议加大这个值,长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了 ## **max.poll.records**:默认值:500,Consumer每次调用poll()时取到的records的最大数。 # **kafka常用命令:** ## 1、启动kafka服务 bin/kafka-server-start.sh config/server.properties & ## 2、停止kafka服务 ./kafka-server-stop.sh ## 3、查看所有的话题 ./kafka-topics.sh --list --zookeeper localhost:2181 或: ./kafka-topics.sh--list \--bootstrap-server localhost:9092 ## 4、查看所有话题的详细信息 ./kafka-topics.sh --zookeeper localhost:2181 --describe ## 5、列出指定话题的详细信息 ./kafka-topics.sh --zookeeper localhost:2181--describe  --topic test-topic ## 6、删除一个话题,开启删除权限[delete.topic.enable](#brokerconfigs_delete.topic.enable)\=true,且最好关闭自动创建auto.create.topics.enable=false。并保证没有生产者与消费者此时作用于该主题,手动清除日志或设置检查log.retention.check.interval.ms,默认5min ./kafka-topics.sh --zookeeper localhost:2181 --delete  --topic test\-topic ## 7、创建一个叫test的话题,有两个分区,每个分区3个副本 ./kafka-topics.sh --zookeeper localhost:2181--create --topic test --replication-factor 3--partitions 2  ## 8、测试kafka发送和接收消息(启动两个终端) **发送消息(注意端口号为配置文件里面的端口号)** ./kafka-console-producer.sh --broker-list localhost:9092 --topic test **消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)** ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   #加了\--from-beginning 重头消费所有的消息 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test         #不加\--from-beginning 从最新的一条消息开始消费 **老版本消费者命令需要指定 \--zookeeper localhost:2181 具体在0.9等之前版本, 0.10版本过度的可以指定zookeeper也可以标明是 \--new-consumer:** ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test \--new-consumer ## 9、查看某个topic对应的消息数量 ./kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1 ## 10、显示所有消费者 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list ## 11、获取正在消费的topic(console-consumer-63307)的group的offset ./kafka-consumer-groups.sh --describe --group console-consumer-63307 --bootstrap-server localhost:9092 ## 12、显示消费者 ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list # **kafka****集群配置****:** [https://blog.csdn.net/xuesp/article/details/88094326](https://blog.csdn.net/xuesp/article/details/88094326) **(注意zookeeper端口2181为常用即可)** [https://kafka.apache.org/documentation/#brokerconfigs](https://kafka.apache.org/documentation/#brokerconfigs)