# 重复消费 & 数据丢失
* 消费者从节点读取到数据后处理完业务再去修改偏移量 这样可以确保数据不会丢失, 但是有可能出现重复消费, 在后续修改偏移量的时候可能失败
* 消费者从节点读取到数据时顺便修改偏移量,这样可以确保不会重复消费, 但是业务可能处理失败, 导致数据丢失
* 精典的做法是引入两阶段提交, 但是很多都不支持两阶段提交, 目前offset是存于Zookeeper中的,无法存于HDFS
* 去重处理
* 重复消费问题时, 引入第三方中间键, 在偏移量存入redis, 每次消费前判断redis的偏移量(双重保证,看业务接收程度 )
* 如果要保证exactly once 那要么把数据处理和提交offset放在一个事务里 要么把数据处理操作编程幂等的(消费一个和多次效果是一样的)
---
* [Kafka 数据丢失和重复消费问题](https://blog.csdn.net/lisheng5218/article/details/88343773)
* [kafka参数设置](https://zhuanlan.zhihu.com/p/54287819)
* [kafka重复修复原因](http://blog.zollty.com/b/archive/about-kafka-repeated-consumption-and-lost-data.html)
* [高吞吐、高可用MQ对比分析](http://blog.zollty.com/b/archive/high-throughput-high-availability-MQ-comparative-analysis.html)
* 数据不重复,建议通过业务手段来保证
----
偏移量 ?
golang 将kafka的offset置为最新 : https://blog.csdn.net/u011677067/article/details/81026314
```
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"strconv"
"sync"
)
//全局注释 kafka 虽然性能比 rabbitmq要快 但是他丢失数据库的可能性更大,而且还会存在重复接受消息的情况
var Topic = "266"
var partition = int32(0)
func main() {
sarama.Logger = log{}
cfg := sarama.NewConfig()
cfg.Version = sarama.V2_2_0_0
cfg.Producer.Return.Errors = true
cfg.Net.SASL.Enable = false
cfg.Producer.Return.Successes = true //这个是关键,否则读取不到消息
cfg.Producer.RequiredAcks = sarama.WaitForAll
cfg.Producer.Partitioner = sarama.NewManualPartitioner //允许指定分组
cfg.Consumer.Return.Errors = true
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
//cfg.Group.Return.Notifications = true
cfg.ClientID = "service-exchange-api"
var kafka = KafkaConfig{
Addrs: []string{"127.0.0.1:9092"},
Config: cfg,
}
_, _, err := NewKafkaClient(kafka)
fmt.Println("err:", err)
}
//发送消息 此为异步发送消息
func NewAsyncProducer(client sarama.Client, i int) error {
c, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return err
}
defer c.Close()
p, o, err := c.SendMessage(&sarama.ProducerMessage{Topic: Topic, Value: sarama.StringEncoder("消息发送成功拉ssssssss!!!!" + strconv.Itoa(i))})
if err != nil {
fmt.Printf("err:", err)
return err
}
fmt.Println(p, o)
/*c, err := sarama.NewAsyncProducerFromClient(client)
//sarama.NewSyncProducerFromClient() 此为同步
if err != nil {
return err
}
defer c.Close()
//Topic 为主题,Partition为区域 Partition如果不给默认为0 记得设置cfg.Producer.Partitioner = sarama.NewManualPartitioner 这里为允许设置指定的分区
//分区是从0开始,记得在启动配置文件时修改Partition的分区
//不同的主题包括不同的分区都是有着不同的offset
c.Input() <- &sarama.ProducerMessage{Topic: Topic,Key:sarama.StringEncoder(fmt.Sprintf("/topic/market/order-trade")), Value: sarama.StringEncoder("消息发送成功拉ssssssss!!!!"+strconv.Itoa(i))}
select {
//case msg := <-producer.Successes():
// log.Printf("Produced message successes: [%s]\n",msg.Value)
case err := <-c.Errors():
fmt.Println("Produced message failure: ", err)
default:
//fmt.Println("Produced message success",err)
}*/
return nil
}
//客户端接收消息
func NewKafkaClient(cfg KafkaConfig) (sarama.Client, func(), error) {
//创建链接 创建客户机
c, err := sarama.NewClient(cfg.Addrs, cfg.Config)
if err != nil {
return nil, nil, err
}
go func() {
//目前默认是肯定能使用的
consumer, err := sarama.NewConsumerGroupFromClient("default-group", c)
//client, err := sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "group-1", cfg.Config)
if err != nil {
fmt.Println(err)
}
loopConsumer(consumer, Topic, partition, "b")
consumer.Close()
}()
go func() {
for i := 0; i < 10; i++ {
NewAsyncProducer(c, i)
}
}()
wg := &sync.WaitGroup{}
wg.Add(1)
wg.Wait()
return c, func() {
err := c.Close()
if err != nil {
fmt.Print(err)
}
}, nil
}
func loopConsumer(consumer sarama.ConsumerGroup, topic string, partition int32, item string) {
go func() {
for err := range consumer.Errors() {
fmt.Println(err)
}
}()
ctx, _ := context.WithCancel(context.Background())
hand := MainHandler{}
for {
err := consumer.Consume(ctx, []string{topic}, &hand)
if err != nil {
fmt.Println(err)
break
}
if ctx.Err() != nil {
break
}
}
/*for {
msg := <-partitionConsumer.Messages()
pom.MarkOffset(msg.Offset + 1, "备注")
fmt.Printf("[%s] : Consumed message: [%s], offset: [%d]\n",item, string(msg.Value), msg.Offset)
}*/
}
type KafkaConfig struct {
Addrs []string
Config *sarama.Config
}
type MainHandler struct {
}
func (m *MainHandler) Setup(sess sarama.ConsumerGroupSession) error {
// 如果极端情况下markOffset失败,需要手动同步offset
return nil
}
func (m *MainHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
// 如果极端情况下markOffset失败,需要手动同步offset
return nil
}
//此方法会自动控制偏移值,当分组里的主题消息被接收到时,则偏移值会进行加1 他是跟着主题走的
func (m *MainHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
fmt.Println(fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s Offset = %s", string(message.Value), message.Timestamp, message.Topic, message.Offset))
sess.MarkMessage(message, "")
}
return nil
}
type log struct{}
func (log) Print(v ...interface{}) {
fmt.Println(v...)
}
func (log) Printf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
func (log) Println(v ...interface{}) {
fmt.Println(v...)
}
```
- 111
- 日记
- 工具11
- 20200723
- 20200724
- 20201019
- 更多阅读
- 回收站
- kafka 消费失败和重复消费问题
- ABC
- 20200127
- MySQL从删库到跑路
- PHP从放弃到入门
- help
- 我的日志
- 博客验证码
- 项目版本管理
- C++ Json序列化
- 20190425
- 图片
- 关键字
- 链接
- 分布式, 分库, 分表
- 游戏开发
- goLand 编辑器
- 区块链
- A-计划
- B-计划
- gin框架
- 锁
- 力扣-答题
- 数据库
- mysql 索引优化
- 挖矿
- 分布式锁
- 跨域问题
- kafka
- 长连接
- 面向对象 面向过程 函数式编程
- websocket
- 其它问题
- zeroMq
- 工具
- linux - systemctl
- gitbook 部署
- Ubantu 基础配置
- 备注服务
- 更换身份证(身份证到期了)
- 资源05
- 备注服务2
- 分布式
- TODO
- 资料准备
- 文章阅读
- mysql 高可用
- 日志1
- 日记2 - 区块链
- centos7 系统服务脚本
- copy_service 服务替换
- go kafka 孤人自嘲 - 偏移量 - kafka
- go vendor
- golang 显示git工具栏
- 图片资源
- 资讯01
- 资源01
- 资源02
- 资源03-数据库
- 资源04
- php历史数据
- golang 数据
- 文件1
- 文件2
- 文件3
- 文件4
- 文件5
- 文件6
- 文件7
- 文件8
- 文件9
- 文件10
- Flutter
- 管理后台系统
- 重装系统