> ### 生产者
~~~
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
number := 1
for {
msg := &sarama.ProducerMessage{}
msg.Topic = "my-topic"
msg.Value = sarama.StringEncoder(fmt.Sprint("this is a good test - ", number))
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed ", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(time.Second)
number++
}
}
~~~
> ### 消费者
~~~
package main
import (
"fmt"
"sync"
"github.com/Shopify/sarama"
)
var (
wg sync.WaitGroup
)
func main() {
//创建消费者
consumer, err := sarama.NewConsumer([]string{"192.168.1.170:9092", "192.168.1.171:9092", "192.168.1.172:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
//设置分区
partitionList, err := consumer.Partitions("my-topic")
if err != nil {
fmt.Println("Failed to get the list of partitions: ", err)
return
}
fmt.Println(partitionList)
//循环分区
for partition := range partitionList {
//sarama.OffsetNewest 获取最新的
//sarama.OffsetOldest 从头读到尾
pc, err := consumer.ConsumePartition("my-topic", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
defer pc.AsyncClose()
wg.Add(1)
go func(pc sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
fmt.Println()
}
}(pc)
}
//time.Sleep(time.Hour)
wg.Wait()
consumer.Close()
}
~~~
> ### 相关阅读
* [kafka学习笔记:知识点整理](https://www.cnblogs.com/cyfonly/p/5954614.html)
* [Kafka分区与消费者的关系](https://www.cnblogs.com/cjsblog/p/9664536.html)
* [golang kafka小试消息队列](https://www.cnblogs.com/cjsblog/p/9664536.html)
* * *
* windows gcc not fount 安装 mingw-w64
* [https://sourceforge.net/projects/mingw-w64/](https://sourceforge.net/projects/mingw-w64/)
- 第一序 入门教程(一)
- 1.1环境配置
- 1.1 环境配置(补充:Linux下安装)
- 1.1 环境配置(补充:线上部署)
- 1.2 开发工具GoLand
- 1.3 准备工作
- 1.4 第一个应用程序 Hello World
- 1.4 补充 go get github 超时
- 第二序 入门教程(二)
- 2.1 语法结构
- 2.2 常量, 变量
- 2.2.1 命名规则
- 2.2.2 变量
- 2.2.2 变量(补充:类型推断的好处)
- 2.2.2 变量(补充:泛型)
- 2.2.3 常量
- 2.2.4 iota
- 2.2.5 Unicode字符编码
- 2.2.6 GBK 转 UTF8
- 2.3 条件语句
- 2.3.1 判断语句 if
- 2.3.2 选择语句 switch
- 2.3.3 循环语句 for
- 2.3.4 遍历 range
- 2.3.5 跳转语句 goto, break, continue
- 2.3.6 for 和 for range区别
- 2.4 数组, 切片, 集合, 通道
- 2.4.1 make, len, cap, new, nil
- 2.4.1 make, len, cap, new, nil (补充:nil)
- 2.4.2 数组 array
- 2.4.3.1 切片 slice - 1
- 2.4.3.2 切片 slice - 2
- 2.4.3.3 slice list ring
- 2.4.4 集合 map
- 2.4.5 goroutine
- 2.4.6 channel
- 2.5 函数, 结构, 方法, 接口
- 2.5.1 函数 function
- 2.5.2 结构 struct
- 2.5.3 方法 method
- 2.5.4 接口 interface
- 2.5.5 Go是面向对象的语言吗?
- 2.5.6 json序列化和反序列化
- 2.5.7 T和指针T
- 2.6 defer, panic, recover
- 2.6.1 defer
- 2.6.2 painc, recover
- 2.7 指针
- 2.7 指针(补充: 可寻址和不可寻址)
- 2.8 反射
- 第三序 相关阅读
- 3.1 相关阅读1
- 3.2 相关阅读2
- 3.3 相关阅读3
- 第四序 性能分析和调试工具
- 4.1 pprof工具介绍
- 4.2 CPU信息采集
- 4.3 Heap信息采集
- 4.4 Http信息采集
- 4.5 单元测试(功能测试)
- 4.6 基准测试(压力测试/性能测试)
- 4.7 示例测试(example)
- 4.8 gdb调试
- 第五序 网络编程
- 5.1 http请求和响应
- 5.2 socket
- 5.2.1 概念
- 5.2.2 服务端
- 5.2.3 客户端
- 5.3 WebSocket
- 5.3.1 第一版
- 5.3.1.1 服务端
- 5.3.1.2 客户端
- 5.3.1.3 相关阅读
- 5.3.2 服务端
- 5.3.3 客户端
- 5.3.4 nginx配置
- 5.3.5 修改版
- 5.3.5.1 草稿 - 1
- 5.3.5.2 草稿 - 2
- 5.3.5.3 草稿 - 3
- 5.3.5.4 服务端
- 5.3.5.5 客户端
- 5.4 打印客户端头部信息
- 第六序 算法
- 6.1 查找
- 6.1.1 二分查找
- 6.2 排序
- 6.2.1 交换排序 - 冒泡排序
- 6.2.2 插入排序 - 直接插入排序
- 6.2.3 插入排序 - 希尔排序
- 6.2.4 交换排序 - 快速排序
- 6.3 算法求解应用
- 第七序 微服务
- 7.1 相关阅读
- 7.2 gRPC
- 7.2.1 准备工作
- 7.2.2 编译.proto文件
- 7.2.3 gRPC服务端
- 7.2.4 gRPC客户端
- 7.3 micro/micro
- 7.3.1 服务发现
- 7.3.2 安装consul
- 7.3.3 准备工作
- 7.3.4 服务端
- 7.3.5 客户端
- 7.3.6 默认的服务发现
- 7.3.7 文档阅读
- 7.4 protobuf序列化
- 第八序 Web
- 8.1 视图模板
- 8.1.1 main.go
- 8.1.2 login.html
- 8.2 原生留言板
- 8.2.1 原生sql
- 8.2.1.1 main.go
- 8.2.1.2 view
- 8.2.1.2.1 index.html
- 8.2.1.2.2 create.html
- 8.2.2 sqlx
- 8.3 Gin框架
- 第九序 数据库
- 9.0 资料收集
- 9.1 Redis数据库 (gomodule/redigo)
- 9.1.1 介绍
- 9.1.2 消息队列
- 9.2 Redis数据库(go-redis/redis)
- 第十序 日记
- 10.1 SimplePanic
- 10.2 第一版日记库
- 10.2.1 winnielog
- 10.2.2 使用
- 第十一序 中间键
- 11.0 资料收集
- 11.1 NSQ
- 11.2 zookeeper
- 11.3 kafka
- 第十二序 加密
- 12.1 Token
- 12.2 SHA1
- 2.3 RSA + AES
- 第十三序 分布式锁
- 第十四序 标准库练习
- container/list
- 链表
- container/ring
- 环形链表
- context
- flag (获取命令行参数)
- io
- strconv
- sync
- 为什么需要锁?
- 互斥锁
- 读写锁
- 条件变量
- 计数器
- 并发安全字典
- 自制并发安全字典
- 官方并发安全字典
- 连接池
- sync/atomic
- 原子操作
- 第十五序 其它内容
- 文件读写
- 工作池
- 第十六序 相关阅读