哈哈好久不见啦 今天课程的安排: 调度任务讲解 (这个是全部课程的精华,大家认真练习啊) ***** ### 调度任务 应用场景: 我们现在有一个业务就是用户注册返送短信验证码 用户注册->执行短信验证码方法 一个用户注册就会发送短信验证码,这是同步操作的,中间就会发生阻塞,一个人注册没有什么感觉,但是10w人同时注册这个服务器就吃不消了 如果解决这个问题就 我们在这里用到了一个水库模型 想想: 现在有一条大河,突然有一天涨水了,然后呢?就淹没城市了哈哈 然后怎么解决呢?就是不止一条河排水 ![](https://box.kancloud.cn/2bc4911bf884bb3365530592f9b60bdf_910x496.png) ![](https://box.kancloud.cn/db0e4dcd2c66bc5611c4d6adccbb0bd9_1134x682.jpg) 一个用户注册,我们把发送短信这个放到一个队列里面,开另一个服务他来消费这个请求 这个就是简单的一个流量削峰 - 必备基础 channel和goroutine并发编程基础 不如不会去看看官方文档吧 那个go指南这个还是挺简单的 ### 现在就开始上半部分的课程了 Scheduler - task runner 生产/消费 模型 - timer 定时器 新建taskrunner文件夹 ``` 并建立一下文件 . ├── defs.go 定义 ├── runner.go 任务调用核心 ├── task.go 生产者消费者定义 └── trmain.go ``` defs.go ``` package taskrunner const ( READY_TO_DISPATCH = "d" // 开始生产 通知 READY_TO_EXECUTE = "e" // 开始消费 通知 CLOSE = "c" // 停止 没有任务可做,或者出错 ) type controllerChan chan string // 通知 通知处理 type dataChan chan interface{} // 存放处理数据 type fn func (dc dataChan) error // 处理方法 生产者消费者方法 ``` 这样讲的有点多,就大概的讲讲实现思路吧, 用户发送消息这个任务扔到MQ里面去,然后那边就消费就可以了 - 讲讲定时任务 ``` package taskrunner import "time" /** timer setup start{trigger->task->runner} */ type Worker struct { ticker *time.Ticker // 定时器 runner *Runner } func NewWorker(interval time.Duration,r *Runner) *Worker { return &Worker{ ticker:time.NewTicker(interval * time.Second), runner: r, } } func (w *Worker) startWorker() { for { select { case <- w.ticker.C://这个是阻塞的,当时间到了这里就放行了 go w.runner.StartAll() } } } func Start() { // Start //r :NewWor ..... } ``` 定时任务就是通过<- w.ticker.C的阻塞实现的 我还是吧调度器的代码给大家敲一遍,能够理解的就理解下吧 ``` package taskrunner type Runner struct { Controller controllerChan // 控制信息 Error controllerChan // 错误信息 Data dataChan // 数据 dataSize int // 数据大小 longlived bool // 是否长期存货 tree不会回收资源 Dispatcher fn // 生产者 Executor fn // 消费者 } func NewRunner(size int,longlived bool,dispatcher fn,executor fn) *Runner { return &Runner{ Controller:make(chan string,1),// 我们这个需求是非阻塞的 所以用带buffer的chan Error:make(chan string,1), Data:make(chan interface{},size), longlived:longlived, Dispatcher:dispatcher, Executor:executor, } } func (r *Runner) startDispatch() { // 资源回收 defer func() { if !r.longlived { close(r.Controller) close(r.Data) close(r.Error) } }() forloop: for { select { case c := <-r.Controller: if c == READY_TO_DISPATCH { err := r.Dispatcher(r.Data) if err != nil { r.Error <- CLOSE }else{ r.Controller <- READY_TO_EXECUTE } } if c == READY_TO_EXECUTE { err := r.Executor(r.Data) if err != nil{ r.Error <- CLOSE }else{ r.Controller <- READY_TO_DISPATCH } } case e := <-r.Error: if e == CLOSE { return } default: //如果执行这个 说明消费完了 break forloop } } } func (r *Runner) StartAll() { r.Controller <- READY_TO_DISPATCH r.startDispatch() } ``` 今日课程代码 [https://github.com/dollarkillerx/GolangWebCourseware/tree/%E8%B0%83%E5%BA%A6%E5%99%A8](https://github.com/dollarkillerx/GolangWebCourseware/tree/%E8%B0%83%E5%BA%A6%E5%99%A8)