[TOC]
## Channel
### 基本特性
#### 两种模式
单向:
~~~
//只允许发送的通道:chan <- T
onlySend := make(chan <- int)
//只允许接收的通道:<- chan T
onlyRecv := make(<-chan int)
~~~
双向:chan T
~~~
ch := make(chan int)
~~~
#### 缓冲
有缓冲
有缓存的 channel(buffered channel),其缓存区大小是根据所设置的值来调整。在功能上,若缓冲区未满则不会阻塞,会源源不断的进行传输。当缓冲区满了后,发送者就会阻塞并等待。而当缓冲区为空时,接受者就会阻塞并等待,直至有新的数据
~~~
缓冲为10
ch := make(chan int,10)
~~~
无缓冲
无缓冲的 channel(unbuffered channel),其缓冲区大小则默认为 0。在功能上其接受者会阻塞等待并阻塞应用程序,直至收到通信和接收到数据。
~~~
ch := make(chan int)
~~~
### channel 本质
>本质就是一个环形队列的配合,其包含发送方队列、接收方队列,加上互斥锁`mutex`等结构。
#### 基本原理
![](https://img.kancloud.cn/41/f3/41f30f447ae0f8873a4d414a485ecc3f_1080x542.png)
#### 数据结构
hchan 结构体是 channel 在运行时的具体表现形式
~~~
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex //互斥锁,chan不允许并发读写
}
~~~
qcount:channel里面的元素计数。内建函数 len 可以返回这个字段的值。已接收还没被取走
dataqsiz:环形队列大小,即可存放元素的个数。make(chan int,10),10就是这个值
buf:当 channel 设置了缓冲数量时,该 buf 指向一个存储缓冲数据的区域,该区域是一个循环队列的数据结构
elemsize :要发送或接收的数据类型大小
closed :标识关闭状态
elemtype :元素类型
sendx :当 channel 设置了缓冲数量时,数据区域即循环队列此时已发送数据的索引位置
recvx:当 channel 设置了缓冲数量时,数据区域即循环队列此时已接收数据的索引位置
recvq :想读取数据但又被阻塞住的 goroutine 队列,即:等待读消息的goroutine队列
sendq :想发送数据但又被阻塞住的 goroutine 队列,即:等待写消息的goroutine队列
在数据结构中,我们可以看到`recvq`和`sendq`,其表现为等待队列,
其类型为`runtime.waitq`的双向链表结构:
~~~
type waitq struct {
first *sudog
last *sudog
}
~~~
且无论是`first`属性又或是`last`,其类型都为`runtime.sudog`结构体:
~~~
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer
...
}
~~~
* g:指向当前的 goroutine。
* next:指向下一个 g。
* prev:指向上一个 g。
* elem:数据元素,可能会指向堆栈。
### channel 实现原理
channel 的四大块操作,分别是:“创建、发送、接收、关闭”。
#### 创建 chan
```
ch := make(chan string)
```
编译器翻译后对应`runtime.makechan`或`runtime.makechan64`方法:
```
// 通用创建方法
func makechan(t *chantype, size int) *hchan
// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan
```
`makechan`方法
~~~
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
~~~
创建 channel 的逻辑主要分为三大块:
* 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用`mallocgc`方法分配一段连续的内存空间。
* 当前 channel 存储的类型存在指针引用,就会连同`hchan`和底层数组同时分配一段连续的内存空间。
* 通用情况,默认分配相匹配的连续内存空间。
> 那就是 channel 的创建都是调用的`mallocgc`方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要`close`方法来进行显示关闭了。
从整体上来讲,`makechan`方法的逻辑比较简单,就是创建`hchan`并分配合适的`buf`大小的堆上内存空间。
#### 发送
发送的时候会把send转换成chansend1,chansend1再调用chansend
~~~
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
~~~
~~~
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//第一部分
if c == nil { // 先判断通道是不是nil
if !block { //block是写死的true
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //是nil就阻塞休眠
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
//2、没有阻塞,没有关闭 但是满了,就直接返回
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//3、chan已经被close的情景
lock(&c.lock) //开始加锁
if c.closed != 0 { //已被 close 了,再发送数据的话会 panic。
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//5、查看接收队列是不是有接收者
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// buf还没满
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
//就放到缓冲区
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { //
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 当bug满了、没有缓冲那种
gp := getg()
mysg := acquireSudog() //获取sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //放入发送等待队列
......
return true
}
~~~
解读:
第一部分:如果 chan 是 nil 的话,就把调用者 goroutine park(阻塞休眠),调用者就永远被阻塞住了,
第二部分:往一个已经满了的 chan 实例发送数据时,并且想不阻塞当前调用,那么这里的逻辑是直接返回。chansend1 方法在调用 chansend 的时候设置了阻塞参数,所以不会执行到第二部分的分支里。
第三部分:如果 chan 已经被 close 了,再往里面发送数据的话会 panic。
第四部分:如果等待队列中有等待的 receiver,那么这段代码就把它从队列中弹出,然后直接把数据交给它(通过 memmove(dst, src, t.size)),而不需要放入到 buf 中,速度可以更快一些。
第五部分:当前没有 receiver,需要把数据放入到 buf 中,放入之后,就成功返回了。
第六部分:处理 buf 满的情况。如果 buf 满了,发送者的 goroutine 就会加入到发送者的等待队列中,直到被唤醒。这个时候,数据或者被取走了,或者 chan 被 close 了。
#### 接收
在处理从 chan 中接收数据时,Go 会把代码转换成 chanrecv1 函数,如果要返回两个返回值,会转换成 chanrecv2,chanrecv1 函数和 chanrecv2 会调用 chanrecv。
~~~
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
~~~
~~~
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan { //常量false,到处都是写死的常量
print("chanrecv: chan=", c, "\n")
}
//1、判断chan是不是nil
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//2、没有阻塞,而且chan还是空的
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//3、加锁,返回时释放锁
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { //被关闭了,且没有缓冲元素了
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock) //释放锁
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//4、没有数据,在先读后写的情况下,即读的g先到了
//查看写队列是不是有g,有就拿走
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//5、没有等待的sender,buf中有数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
//6、没有元素就阻塞,挂起等待
gp := getg()
mysg := acquireSudog() /获取sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //放入写goroutine列列
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)//挂起等待
......
}
~~~
解读:
第一部分:chan 为 nil 的情况。和 send 一样,从 nil chan 中接收(读取、获取)数据时,调用者会被永远阻塞。
第二部分:没有阻塞,而且chan还是空的
第三部分: chan 已经被 close 的情况。如果 chan 已经被 close 了,并且队列中没有缓存的元素,那么返回 true、false。
第四部分:处理 sendq 队列中有等待者的情况。这个时候,如果 buf 中有数据,优先从buf 中读取数据,否则直接从等待队列中弹出一个 sender,把它的数据复制给这个receiver。
第五部分:处理没有等待的 sender 的情况。这个是和 chansend 共用一把大锁,所以不会有并发的问题。如果 buf 有元素,就取出一个元素给 receiver。
第六部分:处理 buf 中没有元素的情况。如果没有元素,那么当前的 receiver 就会被阻塞,直到它从 sender 中接收了数据,或者是 chan 被 close,才返回。
#### 关闭close
* 如果 chan 为 nil,close 会 panic;
* 如果 chan 已经 closed,再次 close 也会 panic。
* 如果 chan 不为 nil,chan 也没有closed,就把等待队列中的 sender(writer)和 receiver(reader)从队列中全部移除并唤醒。
~~~
func closechan(c *hchan) {
if c == nil { //关闭nil chan,panic
panic(plainError("close of nil channel"))
}
lock(&c.lock) //加锁
if c.closed != 0 { //关闭已经关闭的chan,panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
//释放所有的reader
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
//释放所有的writer(它们会panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
~~~
### 总结
图形化:https://www.jianshu.com/p/78df8ab49495
**向 channel 写数据:**
>recvq队列不为空
直接从 recvq 取出 G ,并把数据写入,最后把该 G 唤醒,结束发送过程。
>recvq队列为空
1、buf没有满,直接把数据发到buf队尾,结束发送过程。
2、buf满了或者就没有,阻塞休眠,加入sendq队列,等待唤醒
**从 channel 读数据**
>sendq队列不为空
1、没有缓冲区,直接从 sendq 中取出 G ,把 G 中数据读出,最后把 G 唤醒,结束读取过程。
2、说明缓冲区已满,从缓冲区中首部读出数据,把 G 中数据写入缓冲区尾部,把 G 唤醒,结束读取过程。
>sendq队列为空
1、缓冲区中有数据,则从缓冲区取出数据,结束读取过程。
2、缓冲区中没有数据,将当前 goroutine 加入 recvq ,进入睡眠,等待被写 goroutine 唤醒。
**关闭 channel**
1.关闭 channel 时会将 recvq 中的 G 全部唤醒,本该写入 G 的数据位置为 nil。将 sendq 中的 G 全部唤醒,但是这些 G 会 panic。
panic 出现的场景还有:
* 关闭值为 nil 的 channel
* 关闭已经关闭的 channel
* 向已经关闭的 channel 中写数据
- Go准备工作
- 依赖管理
- Go基础
- 1、变量和常量
- 2、基本数据类型
- 3、运算符
- 4、流程控制
- 5、数组
- 数组声明和初始化
- 遍历
- 数组是值类型
- 6、切片
- 定义
- slice其他内容
- 7、map
- 8、函数
- 函数基础
- 函数进阶
- 9、指针
- 10、结构体
- 类型别名和自定义类型
- 结构体
- 11、接口
- 12、反射
- 13、并发
- 14、网络编程
- 15、单元测试
- Go常用库/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go优化
- Go问题排查
- Go框架
- 基础知识点的思考
- 面试题
- 八股文
- 操作系统
- 整理一份资料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小点
- 树
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面试题
- 基础
- Map
- Chan
- GC
- GMP
- 并发
- 内存
- 算法
- docker