# 1.3 顺序进程通讯
> 本节内容提供一个线上演讲:[YouTube 在线](https://www.youtube.com/watch?v=Z8ZpWVuEx8c),[Google Slides 讲稿](https://changkun.de/s/csp/)。
早在上个世纪七十年代,多核处理器还是一个科研主题,并没有进入普通程序员的视野。 Tony Hoare 于 1977 年提出通信顺序进程(CSP)理论,遥遥领先与他所在的时代。
CSP 的模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信, 这里发送消息时使用的就是通道(channel)。也就是我们常说的 『 Don't communicate by sharing memory; share memory by communicating 』。
多核处理器编程通常与操作系统研究、中断处理、I/O 系统、消息传递息息相关
* 信号量 semaphore \[Dijkstra, 1965\]
* 监控 monitors \[Hoare, 1974\]
* 锁与互斥 locks/mutexes
* 消息传递
研究证明了消息传递与如今的线程与锁是等价的 \[Lauer and Needham 1979\]。
## 1.3.1 产生背景
* Fortran: Subroutine
* Algol 60: Procedure
* PL/I: Entry
* UNIX: Coroutine
* SIMULA 64: Class
* Concurrent Pascal: Process and monitor
* CLU: Cluster
* Hewitt: Actor
* 硬件:CDC 6600 并行单元
* 软件:I/O 控制包、多编程操作系统
处理器技术在多核处理器上提出一组自洽的处理器可以更加高效、可靠 但为了使用机器的效能,必须在处理器之间进行通信与同步。为此也提出了很多方法
* 公共存储的检查与更新:Algol 68, PL/I, 各种不同的机器码(开销较大)
* semaphore
* events: PL/I
* 条件临界区
* monitors and queues: Concurrent Pascal
* path expression
C.A.R Hoare 1978 的萌芽论文,认为输入输出在一种模型编程语言中是基本要素。通信顺序过程的并行组成。通信是 I/O。**简洁设计、解决所有问题**
* Dijkstra's guarded command
* 条件分支
* 主要区别:若多个条件为真,则随机选择一个分支执行
* Dijkstra's parbegin parallel command
* 启动并发过程
* input and output command
* 过程间通信手段
* 通信值存在复制
* 没有缓存
* 指令推迟到其他过程能够处理该消息
* input + guarded command
* 等价于 Go 的 select 语句,条件分支仅当 input 源 ready 时开始执行
* 多个 input guards 同时就绪,只随机选择一个执行,其他 guards 没有影响
* repetitive command
* 循环
* 终止性取决于其所有 guards 是否已终止
* pattern-matching
## 1.3.2 设计细节
CSP 语言的结构非常简单,极度的数学形式化、简洁与优雅。 将 Dijkstra 守护指令、`!`发送 与`?`接受消息进行一般性的推广:
* `p!value`: 向过程`p`发送一个消息
* `p?var`: 从`p`接受一个值并存储到`var`
* `[A;B]`: A 运行后顺序的运行 B
* `[A||B]`: A 与 B 并行运行(组合)
* `*[A]`: 循环运行 A
* `[a -> A [] b -> B]`: 守护指令(如果`a`则`A`,否则如果`b`则`B`,但彼此并行)
* 顺序算符`;`
* 并行算符`||`
* 赋值算符`:=`
* 输入算符`?`(发送)
* 输出算符`!`(接受)
* 选择算符`□`
* 守卫算符`→`
* 重复算符`*`
### 程序结构
<cmd> ::= <simple cmd> | <structured cmd>
<simple cmd> ::= <assignment cmd> | <input cmd> | <output cmd>
<structured cmd> ::= <alternative cmd> | <repetitive cmd> | <parallel cmd>
<null cmd> ::= skip
<cmd list> ::= {<declaration>; | <cmd>; } <cmd>
### 并行指令
<parallel cmd> ::= [<proc>{||<proc>}]
<proc> ::= <proc label> <cmd list>
<proc label> ::= <empty> | <identifier> :: | <identifier>(<label subscript>{,<label subscript>}) ::
<label subscript> ::= <integer const> | <range>
<integer constant> ::= <numeral> | <bound var>
<bound var> ::= <identifier>
<range> ::= <bound variable>:<lower bound>..<upper bound>
<lower bound> ::= <integer const>
<upper bound> ::= <integer const>
X (i : 1 .. n) :: CL
↑ ↑ ↑ ↑ ↑
identifier bound var lower bound upper bound cmd list
X(1) :: CL1 || X(2) :: CL2 || … || X(n) :: CLn
<parallel cmd> ::= [<proc>{||<proc>}]
<proc> ::= <proc label> <cmd list>
<proc label> ::= <empty> | <identifier> :: | <identifier>(<label subscript>{,<label subscript>}) ::
<label subscript> ::= <integer const> | <range>
<integer constant> ::= <numeral> | <bound var>
<bound var> ::= <identifier>
<range> ::= <bound variable>:<lower bound>..<upper bound>
<lower bound> ::= <integer const>
<upper bound> ::= <integer const>
* `[cardreader ? cardimage || lineprinter ! lineimage]`
* 两个并行过程
* `[west::DISASSEMBLE || X::SQUASH || east::ASSEMBLE]`
* 三个并行过程
* `[room::ROOM || fork(i:0..4)::FORK || phil(i:0..4)::PHIL]`
* 十一个个并行过程,其中第二个和第三个并行过程分别包含五个并行的子过程
### 赋值指令
<assignment cmd> ::= <target var> := <expr>
<expr> ::= <simple expr> | <structured expr>
<structured expr> ::= <constructor> ( <expr list> )
<constructor> ::= <identifier> | <empty>
<expr list> ::= <empty> | <expr> {, <expr> }
<target var> ::= <simple var> | <structured target>
<structured target> ::= <constructor> ( <target var list> )
<target var list> ::= <empty> | <target var> {, <target var> }
x := x + 1 // 普通的赋值
(x, y) := (y, x) // 普通的赋值
x := cons(left, right) // 结构体赋值
cons(left, right) := x // 如果 x 是一个结构体 cons(y, z),则赋值成功 left:=y, right:=z,否则失败
insert(n) := insert(2*x+1) // 等价于 n := 2*x + 1
c := P() // 空结构体, 或称‘信号’
P() := c // 如果 c 同为信号,则无实际效果,否则失败
insert(n) := has(n) // 不允许不匹配的结构体之间进行赋值
### 输入输出指令
<input cmd> ::= <source> ? <target var>
<output cmd> ::= <destination> ! <expr>
<source> ::= <proc name>
<destination> ::= <proc name>
<proc name> ::= <identifier> | <identifier> ( <subscripts> )
<subscripts> ::= <integer expr> {, <integer expr> }
cardreader?cardimage // 类似于 cardimage <- cardreader
lineprinter!lineimage // 类似于 lineprinter := <- lineimage
X?(x,y) // 从过程 X 接受两个值,并赋值给 x 和 y
DIV!(3*a + b, 13) // 向过程 DIV 发送两个值,分别为 3*a+b 和 13
console(i)?c // 向第 i 个 console 发送值 c
console(j-1)!"A" // 向第 j-1 个 console 发送字符 "A"
X(i)?V() // 从第 i 个 X 接受一个信号 V
sem!P() // 向 sem 发送一个信号 P
思考:根据例子 (3) 和 (4),以下语句
[X :: DIV!(3*a+b, 13) || DIV :: X?(x,y)]
### 选择与重复指令
<repetitive cmd> ::= * <alternative cmd>
<alternative cmd> ::= [<guarded cmd> { □ <guarded cmd> }]
<guarded cmd> ::= <guard> → <cmd list> | ( <range> {, <range> }) <guard> → <cmd list>
<guard> ::= <guard list> | <guard list>;<input cmd> | <input cmd>
<guard list> ::= <guard elem> {; <guard elem> }
<guard elem> ::= <boolean expr> | <declaration>
// 如果某个条件成立,则执行 → 后的语句;若均成立则随机选择一个执行
[x ≥ y → m := x □ y ≥ x → m := y]
// 依次检查 content 中的值是否与 n 相同
i := 0; *[i < size; content(i) ≠ n → i := i+1]
// 从 west 复制一个字符串并输出到 east 中
*[c:character; west?c → east!c]
<repetitive cmd> ::= * <alternative cmd>
<alternative cmd> ::= [<guarded cmd> { □ <guarded cmd> }]
<guarded cmd> ::= <guard> → <cmd list> | ( <range> {, <range> }) <guard> → <cmd list>
<guard> ::= <guard list> | <guard list>;<input cmd> | <input cmd>
<guard list> ::= <guard elem> {; <guard elem> }
<guard elem> ::= <boolean expr> | <declaration>
// 监听来自 10 个 console 的值,并将其发送给 X,当收到 sign off 时终止监听
*[(i:1..10)continue(i); console(i)?c → X!(i,c); console(i)!ack(); continue(i) := (c ≠ sign off)]
// insert 操作: 执行 INSERT
// has 操作: 执行 SEARCH,并当 i < size 时,向 X 发送 true,否则发送 false
*[n:integer; X?insert(n) → INSERT □ n:integer; X?has(n) → SEARCH; X!(i<size)]
// V 操作: val++
// P 操作: val > 0 时, val--,否则失败
*[X?V() → val := val+1 □ val > 0; Y?P() → val := val-1]
## 1.3.3 协程
### S31 COPY
编写一个过程 X,复制从 west 过程输出的字符到 east 过程
COPY :: *[c:character; west?c → east!c]
func S31_COPY(west, east chan rune) {
for c := range west {
east <- c
### S32 SQUASH
SQUASH :: \*\[c:character; west?c → \[ c != asterisk → east!c □ c = asterisk → west?c; \[ c != asterisk → east!asterisk; east!c □ c = asterisk → east!upward arrow \] \] \]
SQUASH_EX :: *[c:character; west?c →
[ c != asterisk → east!c
□ c = asterisk → west?c;
[ c != asterisk → east!asterisk; east!c
□ c = asterisk → east!upward arrow
+ ] □ east!asterisk
] ]
func S32_SQUASH_EX(west, east chan rune) {
for {
c, ok := <-west
if !ok {
if c != '*' {
east <- c
if c == '*' {
c, ok = <-west
if !ok {
+ east <- '*'
if c != '*' {
east <- '*'
east <- c
if c == '*' {
east <- '↑'
从卡片盒中读取卡片上的内容,并以流的形式将它们输出到过程 X ,并在每个卡片的最后插入一个空格。
*[cardimage:(1..80)characters; cardfile?cardimage →
i:integer; i := 1;
*[i <= 80 → X!cardimage(i); i := i+1 ]
func S33_DISASSEMBLE(cardfile chan []rune, X chan rune) {
cardimage := make([]rune, 0, 80)
for tmp := range cardfile {
if len(tmp) > 80 {
cardimage = append(cardimage, tmp[:80]...)
} else {
cardimage = append(cardimage, tmp[:len(tmp)]...)
for i := 0; i < len(cardimage); i++ {
X <- cardimage[i]
X <- ' '
cardimage = cardimage[:0]
将一串流式字符串从过程 X 打印到每行 125 个字符的 lineprinter 上。必要时将最后一行用空格填满。
i:integer, i:=1;
*[c:character; X?c →
lineimage(i) := c;
[i <= 124 → i := i+1
□ i = 125 → lineprinter!lineimage; i:=1
] ];
[ i = 1 → skip
□ i > 1 → *[i <= 125 → lineimage(i) := space; i := i+1];
func S34_ASSEMBLE(X chan rune, lineprinter chan string) {
lineimage := make([]rune, 125)
i := 0
for c := range X {
lineimage[i] = c
if i <= 124 {
if i == 125 {
lineimage[i-1] = c
lineprinter <- string(lineimage)
i = 0
if i > 0 {
for i <= 124 {
lineimage[i] = ' '
lineprinter <- string(lineimage)
### S35 Reformat
从长度为 80 的卡片上进行读取,并打印到长度为 125 个字符的 lineprinter 上。每个卡片必须跟随一个额外的空格,最后一行 须使用空格进行填充。
[west::DISASSEMBLE || X::COPY || east::ASSEMBLE]
func S35_Reformat(cardfile chan []rune, lineprinter chan string) {
west, east := make(chan rune), make(chan rune)
go S33_DISASSEMBLE(cardfile, west)
go S31_COPY(west, east)
S34_ASSEMBLE(east, lineprinter)
### S36 Conway's Problem
func S35_ConwayProblem(cardfile chan []rune, lineprinter chan string) {
west, east := make(chan rune), make(chan rune)
go S33_DISASSEMBLE(cardfile, west)
go S32_SQUASH_EX(west, east)
S34_ASSEMBLE(east, lineprinter)
## 1.3.4 子程、数据表示与递归
SUBROUTINE::[X?(value params) → …; X!(result params)]
USER::[ …; subr!(arguments); …; subr?(results)]
数据表示可以视为一个具有多入口的子过程,根据 guarded command 进行分支选择:
*[X? method1(value params) → …
□ X? method2(value params) → … ]
[recsub(0)::USER || recsub(i:1..reclimit):: RECSUB]
USER::recsub(1)!(arguments); … ; recsub(1)?(results);
### S41 带余除法
[DIV::*[x,y:integer; X?(x,y)->
quot,rem:integer; quot := 0; rem := x;
*[rem >= y -> rem := rem - y; quot := quot + 1;]
func S41_DivisionWithRemainder(in chan S41_In, out chan S41_Out) {
v := <-in
x, y := v.X, v.Y
quot, rem := 0, x
for rem >= y {
rem -= y
out <- S41_Out{quot, rem}
### S42 阶乘
*[n:integer;fac(i-1)?n ->
[n=0 -> fac(i-1)!1
□ n>0 -> fac(i+1)!n-1;
r:integer; fac(i+1)?r; fac(i-1)!(n*r)
]] || fac(0)::USER ]
func S42_Factorial(fac []chan int, limit int) {
for i := 1; i <= limit; i++ {
go func(i int) {
n := <-fac[i-1]
if n == 0 {
fac[i-1] <- 1
} else if n > 0 {
// 注意,这里需要检查 i 的上限
// 原始解法没有对其进行检查,如果用户输入等于或超过则无法终止程序
if i == limit {
fac[i-1] <- n
} else {
fac[i] <- n - 1
r := <-fac[i]
fac[i-1] <- n * r
### S43 S44 S45 S46 集合
实现一个集合的 insert 与 has 方法
content(0..99)integer; size:integer; size := 0;
*[n:integer; X?has(n) -> SEARCH; X!(i<size)
□ n:integer; X?insert(n) -> SEARCH;
[i<size -> skip
□i = size; size<100 ->
content(size) := n; size := size+1
i:integer; i := 0;
*[i<size; content(i) != n -> i:=i+1]
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L392
## 1.3.5 监控与调度
*[(i:1..100)X(i)?(value param) → …; X(i)!(results)]
当两个用户过程同时选择一个 X(i) 时,guarded cmd 保护了监控结果不会被错误的发送到错误的用户过程中。
### S51 Buffered Channel
构造一个带缓冲的过程 X,用于平滑输出的速度(即 buffered channel)。
in,out:integer; in:=0; out := 0;
comment 0 <= out <= in <= out+10;
*[in < out + 10; producer?buffer(in mod 10) -> in := in + 1
□ out < in; consumer?more() -> consumer!buffer(out mod 10); out := out + 1 ]
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L609
### S52 信号量
实现证书信号量 S,在 100 个子过程间进行共享,每个过程可以通过 V 操作在信号量非正的情况下增加信号量。
S::val:integer; val:=0;
□ (i:1..100)val>0;X(i)?P()->val:=val-1]
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L649
### S53 哲学家进餐问题
PHIL = *[...during i-th lifetime ... ->
FORK = *[phil(i)?pickup()->phil(i)?putdown()
□ phil((i-1)mod5)?pickup()->phil((i-1)mod5)?putdown()]
ROOM = occupancy:integer; occupancy := 0;
□ (i:0..4)phil(i)?exit()->occupancy:=occupancy-1]
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L746
## 1.3.6 算法
### S61 Eratosthenes 素数筛法
实现 Eratosthenes 素数筛法。
mp:=p; comment mp is a multiple of p;
*[m:integer; SIEVE(i-1)?m->
[m=mp->skip □ m<mp->SIEVE(i+1)!m ]
|| SIEVE(0)::print!2; n:integer; n:=3;
|| SIEVE(101)::*[n:integer;SIEVE(100)?n->print!n]
|| print::*[(i:0..101)n:integer;SIEVE(i)?n->...]
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L833
### S62 矩阵乘法
实现 3x3 矩阵乘法。
NORTH = *[true -> M(1,j)!0]
EAST = *[x:real; M(i,3)?x->skip]
CENTER = *[x:real;M(i,j-1)?x->
Go 实现:https://github.com/changkun/gobase/blob/f787593b4467793f8ee0b07583ea9ffde5adf2be/csp/csp.go#L923
## 1.3.7 总结
CSP 设计是 Tony Hoare 的早期提出的设计,与随后将理论完整化后的 CSP(1985)存在两大差异:
缺陷1: 未对 channel 命名
并行过程的构造具有唯一的名词,并以一对冒号作为前缀:\[a::P || b::Q || … || c::R\]
在过程 P 中,命令 b!v 将 v 输出到名为 b 的过程。该值由在过程 Q 中出现的命令 a?x 输入。 过程名称对于引入它们的并行命令是局部的,并且组件过程间的通信是隐藏的。 虽然其优点是不需要在语言中引入任何 channel 或者 channel 声明的概念。
1. 子过程需要知道其使用过程的名称,使得难以构建封装性较好的子过程库
2. 并行过程组合本质上是具有可变数量参数的运算,不能进行简化(见 CSP 1985)
缺陷2: 重复指令的终止性模糊
重复指令默认当每个 guard 均已终止则指令中终止,这一假设过强。具体而言,对于 \*\[a?x → P □ b?x → Q □ …\] 要求当且仅当输入的所有过程 a,b,… 均终止时整个过程才自动终止。
1. 定义和实现起来很复杂
2. 证明程序正确性的方法似乎比没有简单。
综合来说,CSP 1978 中描述的编程语言(与 Go 所构建的基于通道的 channel/select 同步机制进行对比):
1. channel 没有被显式命名
2. channel 没有缓冲,对应 Go 中 unbuffered channel
3. buffered channel 不是一种基本的编程源语,并展示了一个使用 unbuffered channel 实现其作用的例子
4. guarded command 等价于 if 与 select 语句的组合,分支的随机触发性是为了提供公平性保障
5. guarded command 没有对确定性分支选择与非确定性(即多个分支有效时随机选择)分支选择进行区分
6. repetitive command 的本质是一个无条件的 for 循环,但终止性所要求的条件太苛刻,不利于理论的进一步形式化
7. CSP 1978 中描述的编程语言对程序终止性的讨论几乎为零
8. 此时与 Actor 模型进行比较,CSP 与 Actor 均在实体间直接通信,区别在于 Actor 支持异步消息通信,而 CSP 1978 是同步通信
## 进一步阅读的参考文献
* \[Hoare 78\] Hoare, C. A. R. (1978). Communicating sequential processes. Communications of the ACM, 21(8), 666–677.
* \[Brookes 84\] S. D. Brookes, C. A. R. Hoare, and A. W. Roscoe. 1984. A Theory of Communicating Sequential Processes. J. ACM 31, 3 (June 1984), 560-599.
* \[Hoare 85\] C. A. R. Hoare. 1985. Communicating Sequential Processes. Prentice-Hall, Inc., Upper Saddle River, NJ, USA.
* \[Milner 82\] R. Milner. 1982. A Calculus of Communicating Systems. Springer-Verlag, Berlin, Heidelberg.
* \[Fidge 94\] Fidge, C., 1994. A comparative introduction to CSP, CCS and LOTOS. Software Verification Research Centre, University of Queensland, Tech. Rep, pp.93-24.
* \[Lauer and Needham 1979\] Hugh C. Lauer and Roger M. Needham. 1979. On the duality of operating system structures. SIGOPS Oper. Syst. Rev. 13, 2 (April 1979), 3-19.
- 第一部分 :基础篇
- 第1章 Go语言的前世今生
- 1.2 Go语言综述
- 1.3 顺序进程通讯
- 1.4 Plan9汇编语言
- 第2章 程序生命周期
- 2.1 从go命令谈起
- 2.2 Go程序编译流程
- 2.3 Go 程序启动引导
- 2.4 主Goroutine的生与死
- 第3 章 语言核心
- 3.1 数组.切片与字符串
- 3.2 散列表
- 3.3 函数调用
- 3.4 延迟语句
- 3.5 恐慌与恢复内建函数
- 3.6 通信原语
- 3.7 接口
- 3.8 运行时类型系统
- 3.9 类型别名
- 3.10 进一步阅读的参考文献
- 第4章 错误
- 4.1 问题的演化
- 4.2 错误值检查
- 4.3 错误格式与上下文
- 4.4 错误语义
- 4.5 错误处理的未来
- 4.6 进一步阅读的参考文献
- 第5章 同步模式
- 5.1 共享内存式同步模式
- 5.2 互斥锁
- 5.3 原子操作
- 5.4 条件变量
- 5.5 同步组
- 5.6 缓存池
- 5.7 并发安全散列表
- 5.8 上下文
- 5.9 内存一致模型
- 5.10 进一步阅读的文献参考
- 第二部分 运行时篇
- 第6章 并发调度
- 6.1 随机调度的基本概念
- 6.2 工作窃取式调度
- 6.3 MPG模型与并发调度单
- 6.4 调度循环
- 6.5 线程管理
- 6.6 信号处理机制
- 6.7 执行栈管理
- 6.8 协作与抢占
- 6.9 系统监控
- 6.10 网络轮询器
- 6.11 计时器
- 6.12 非均匀访存下的调度模型
- 6.13 进一步阅读的参考文献
- 第7章 内存分配
- 7.1 设计原则
- 7.2 组件
- 7.3 初始化
- 7.4 大对象分配
- 7.5 小对象分配
- 7.6 微对象分配
- 7.7 页分配器
- 7.8 内存统计
- 第8章 垃圾回收
- 8.1 垃圾回收的基本想法
- 8.2 写屏幕技术
- 8.3 调步模型与强弱触发边界
- 8.4 扫描标记与标记辅助
- 8.5 免清扫式位图技术
- 8.6 前进保障与终止检测
- 8.7 安全点分析
- 8.8 分代假设与代际回收
- 8.9 请求假设与实务制导回收
- 8.10 终结器
- 8.11 过去,现在与未来
- 8.12 垃圾回收统一理论
- 8.13 进一步阅读的参考文献
- 第三部分 工具链篇
- 第9章 代码分析
- 9.1 死锁检测
- 9.2 竞争检测
- 9.3 性能追踪
- 9.4 代码测试
- 9.5 基准测试
- 9.6 运行时统计量
- 9.7 语言服务协议
- 第10章 依赖管理
- 10.1 依赖管理的难点
- 10.2 语义化版本管理
- 10.3 最小版本选择算法
- 10.4 Vgo 与dep之争
- 第12章 泛型
- 12.1 泛型设计的演进
- 12.2 基于合约的泛型
- 12.3 类型检查技术
- 12.4 泛型的未来
- 12.5 进一步阅读的的参考文献
- 第13章 编译技术
- 13.1 词法与文法
- 13.2 中间表示
- 13.3 优化器
- 13.4 指针检查器
- 13.5 逃逸分析
- 13.6 自举
- 13.7 链接器
- 13.8 汇编器
- 13.9 调用规约
- 13.10 cgo与系统调用
- 结束语: Go去向何方?