🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# Kafka 无消息丢失配置 Q * Kafka 里什么算消息丢失? * 什么配置能保证消息不丢失? A * Kafka 只对"已提交"的消息(committed message)做有限度的持久化保证。 --- * 已提交的消息 * Kafka 的若干 Broker 成功接收到一条消息并写入日志文件后,告诉 Producer 消息已成功提交。此时为“已提交“消息。 * 有限度的持久化保证 * 假如消息保存在 N 个 Broker,至少有一个存活,即不会丢失。 ## 消息丢失案例 ### Producer 丢失数据 * 背景 * Kafka Producer 是异步发送消息 * 如果调用的是 `producer.send(msg)`,则会立即返回,但此时不能认为消息发送成功 * 这种方式称为 `fire and forget` * 解决 * Producer 永远要使用带有回调通知的发送 API * 因此要使用 `producer.send(msg, callback)` ### Consumer 丢失数据 * 背景 * Consumer 有位移概念 ![](https://img.kancloud.cn/0c/97/0c97bed3b6350d73a9403d9448290d37_2041x1243.png) * 如果提前更新位移,而未消费完,则会丢失 * 解决 * 保证先消费消息,再更新位移 * 这种办法带来的问题是消息的重复处理 ### Consumer 另一种丢失数据 * 背景 * Consumer 从 Kafka 获取到消息后开启多个线程异步处理 * Consumer 自动地向前更新 offset * 如果某个线程失败,则该线程上消息丢失 * 解决 * 如果多线程异步处理消费信息,Consumer 程序不要开启自动提交位移,而是应用程序手动提交 offset * 此种处理可能出现消息被多次消费 ### 一个隐秘的消息丢失场景 * 增加 Topic Partition * 在某段不巧的时间间隔后,Producer 先于 Consumer 感知到新增加的 Partition * 此时 Consumer 设置的是从最新位移处开始读取消息 * 因此在 Consumer 感知到新分区前,Producer 发送的这些消息就全部丢失了 ## 最佳实践 * 不要使用 `producer.send(msg)`,而是 `producer.send(msg, callback)` * 设置 acks = all * acks 是 Producer 的一个参数,代表了你对“已提交“消息的定义。 * 如果设置为 all,表明所有副本 Broker 都要接收到消息,才算“已提交“。这是最严谨的定义。 * 设置 retries 为一个较大的值 * retries 是 Producer 的参数,能够让 Producer 自动重试 * 设置 unclean.leader.election.enable = false * 这是 Broker 端参数,控制哪些 Broker 有资格竞选分区的 Leader。 * 如果一个 Broker 落后原先的 Leader 太多,那么它一旦是新的 Leader,则会造成消息丢失。 * 设置 replicationn.factor >= 3 * Broker 端参数 * 最好将消息多保存几份 * 设置 min.insync.replicas > 1 * Broker 端参数 * 控制的是消息至少要被写入到几个副本才算“已提交“ * 生产环境要设置大于 1 * 确保 replication.factor > min.insync.replicas * 如果两者相等,只要有一个副本挂机,整个分区无法正常工作 * 推荐配置 replication.factor = min.insync.replicas + 1 * 确保消息消费完再提交 * Consumer 端参数 enable.auto.commit 设置成 false,采用手动提交位移 * 这对于单 Consumer 多线程处理很重要