## Pipe-filter pattern
![](https://box.kancloud.cn/0a627d1cd99df5bbf529bf3031a9eb56_842x435.png)
* 非常适合与数据处理及数据分析系统
* Filter 封装数据处理的功能
* 松耦合: Filter只跟数据(格式)耦合
* Pipe用于连接Filter传递数据或者在异步处理过程中缓冲数据流进程内同步调用时,pipe演变为数据在方法调用间传递
示例,pipe处理流程:
SplitFilter -> StringToIntFilter -> SumFilter
filter.go
~~~
package pipefilter
import "errors"
type Request interface{}
type Response interface{}
type Filter interface {
Process(data Request) (Response, error)
}
var DataTypeErr = errors.New("type error of data")
var StringToIntErr = errors.New("string to int error")
~~~
split_filter.go
~~~
package pipefilter
import (
"fmt"
"strings"
)
type SplitFilter struct {
separator string
}
func NewSplitFilter(separator string) *SplitFilter {
return &SplitFilter{separator}
}
func (filter *SplitFilter) Process(data Request) (Response, error) {
fmt.Println("SplitFilter data:", data)
result, ok := data.(string)
if !ok {
return nil, DataTypeErr
}
parts := strings.Split(result, filter.separator)
return parts, nil
}
~~~
string_to_int_filter.go
~~~
package pipefilter
import (
"fmt"
"strconv"
)
type StringToIntFilter struct {
}
func NewStringToIntFilter() *StringToIntFilter {
return new(StringToIntFilter)
}
func (filter *StringToIntFilter) Process(data Request) (Response, error) {
fmt.Println("StringToIntFilter data:", data)
parts, ok := data.([]string)
if !ok {
return nil, DataTypeErr
}
result := []int{}
for _, str := range parts {
num, err := strconv.Atoi(str)
if err != nil {
return nil, StringToIntErr
}
result = append(result, num)
}
return result, nil
}
~~~
sum_filter.go
~~~
package pipefilter
import "fmt"
type SumFilter struct {
}
func NewSumFilter() *SumFilter {
return new(SumFilter)
}
func (filter *SumFilter) Process(data Request) (Response, error) {
fmt.Println("SumFilter data:", data)
arr, ok := data.([]int)
if !ok {
return nil, DataTypeErr
}
summary := 0
for _, num := range arr {
summary += num
}
return summary, nil
}
~~~
pipeline.go
~~~
package pipefilter
type PipeLine struct {
Name string
Filters *[]Filter
}
func NewPipeLine(name string, filters ...Filter) *PipeLine {
return &PipeLine{
Name: name,
Filters: &filters,
}
}
func (pipe *PipeLine) Process(param Request) (Response, error) {
var err error
var data interface{}
for _, filter := range *pipe.Filters {
data, err = filter.Process(param)
if err != nil {
return data, err
}
param = data
}
return data, err
}
~~~
filter_test.go
~~~
package pipefilter
import "testing"
func TestStringSplitFilter(t *testing.T) {
pipe := NewPipeLine("pipeline", NewSplitFilter(","), NewStringToIntFilter(), NewSumFilter())
data := "1,3,5,7,9"
result, err := pipe.Process(data)
if err != nil {
t.Error(err)
} else {
t.Log("result=>", result)
}
}
~~~
- 概述
- go语言基础特性
- Go语言声明
- Go项目构建及编译
- go command
- 程序设计原则
- Go基础
- 变量
- 常量
- iota
- 基本类型
- byte和rune类型
- 类型定义和类型别名
- 数组
- string
- 高效字符串连接
- string底层原理
- 运算符
- new
- make
- 指针
- 下划线 & import
- 语法糖
- 简短变量申明
- 流程控制
- ifelse
- switch
- select
- select实现原理
- select常见案例
- for
- range
- range实现原理
- 常见案例
- range陷阱
- Goto&Break&Continue
- Go函数
- 函数
- 可变参数函数
- 高阶函数
- init函数和main函数
- 匿名函数
- 闭包
- 常用内置函数
- defer
- defer常见案例
- defer规则
- defer与函数返回值
- defer实现原理
- defer陷阱
- 数据结构
- slice
- slice内存布局
- slice&array
- slice底层实现
- slice陷阱
- map
- Map实现原理
- 集合
- List
- Set
- 线程安全数据结构
- sync.Map
- Concurrent Map
- 面向对象编程
- struct
- 匿名结构体&匿名字段
- 嵌套结构体
- 结构体的“继承”
- struct tag
- 行为方法
- 方法与函数
- type Method Value & Method Expressions
- interface
- 类型断言
- 多态
- 错误机制
- error
- 自定义错误
- panic&recover
- reflect
- reflect包
- 应用示例
- DeepEqual
- 反射-fillObjectField
- 反射-copyObject
- IO
- 读取文件
- 写文件
- bufio
- ioutil
- Go网络编程
- tcp
- tcp粘包
- udp
- HTTP
- http服务
- httprouter
- webSocket
- go并发编程
- Goroutine
- thread vs goroutine
- Goroutine任务取消
- 通过channel广播实现
- Context
- Goroutine调度机制
- goroutine调度器1.0
- GMP模型调度器
- 调度器窃取策略
- 调度器的生命周期
- 调度过程全解析
- channel
- 无缓冲的通道
- 缓冲信道
- 单向信道
- chan实现原理
- 共享内存并发机制
- mutex互斥锁
- mutex
- mutex原理
- mutex模式
- RWLock
- 使用信道处理竞态条件
- WaitGroup
- 工作池
- 并发任务
- once运行一次
- 仅需任意任务完成
- 所有任务完成
- 对象池
- 定时器Timer
- Timer
- Timer实现原理
- 周期性定时器Ticker
- Ticker对外接口
- ticker使用场景
- ticker实现原理
- ticker使用陷阱
- 包和依赖管理
- package
- 依赖管理
- 测试
- 单元测试
- 表格测试法
- Banchmark
- BDD
- 常用架构模式
- Pipe-filter pattern
- Micro Kernel
- JSON
- json-内置解析器
- easyjson
- 性能分析
- gc
- 工具类
- fmt
- Time
- builtin
- unsafe
- sync.pool
- atomic
- flag
- runtime
- strconv
- template