ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## Pipe-filter pattern ![](https://box.kancloud.cn/0a627d1cd99df5bbf529bf3031a9eb56_842x435.png) * 非常适合与数据处理及数据分析系统 * Filter 封装数据处理的功能 * 松耦合: Filter只跟数据(格式)耦合 * Pipe用于连接Filter传递数据或者在异步处理过程中缓冲数据流进程内同步调用时,pipe演变为数据在方法调用间传递 示例,pipe处理流程: SplitFilter -> StringToIntFilter -> SumFilter filter.go ~~~ package pipefilter import "errors" type Request interface{} type Response interface{} type Filter interface { Process(data Request) (Response, error) } var DataTypeErr = errors.New("type error of data") var StringToIntErr = errors.New("string to int error") ~~~ split_filter.go ~~~ package pipefilter import ( "fmt" "strings" ) type SplitFilter struct { separator string } func NewSplitFilter(separator string) *SplitFilter { return &SplitFilter{separator} } func (filter *SplitFilter) Process(data Request) (Response, error) { fmt.Println("SplitFilter data:", data) result, ok := data.(string) if !ok { return nil, DataTypeErr } parts := strings.Split(result, filter.separator) return parts, nil } ~~~ string_to_int_filter.go ~~~ package pipefilter import ( "fmt" "strconv" ) type StringToIntFilter struct { } func NewStringToIntFilter() *StringToIntFilter { return new(StringToIntFilter) } func (filter *StringToIntFilter) Process(data Request) (Response, error) { fmt.Println("StringToIntFilter data:", data) parts, ok := data.([]string) if !ok { return nil, DataTypeErr } result := []int{} for _, str := range parts { num, err := strconv.Atoi(str) if err != nil { return nil, StringToIntErr } result = append(result, num) } return result, nil } ~~~ sum_filter.go ~~~ package pipefilter import "fmt" type SumFilter struct { } func NewSumFilter() *SumFilter { return new(SumFilter) } func (filter *SumFilter) Process(data Request) (Response, error) { fmt.Println("SumFilter data:", data) arr, ok := data.([]int) if !ok { return nil, DataTypeErr } summary := 0 for _, num := range arr { summary += num } return summary, nil } ~~~ pipeline.go ~~~ package pipefilter type PipeLine struct { Name string Filters *[]Filter } func NewPipeLine(name string, filters ...Filter) *PipeLine { return &PipeLine{ Name: name, Filters: &filters, } } func (pipe *PipeLine) Process(param Request) (Response, error) { var err error var data interface{} for _, filter := range *pipe.Filters { data, err = filter.Process(param) if err != nil { return data, err } param = data } return data, err } ~~~ filter_test.go ~~~ package pipefilter import "testing" func TestStringSplitFilter(t *testing.T) { pipe := NewPipeLine("pipeline", NewSplitFilter(","), NewStringToIntFilter(), NewSumFilter()) data := "1,3,5,7,9" result, err := pipe.Process(data) if err != nil { t.Error(err) } else { t.Log("result=>", result) } } ~~~