企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 概述 除去情怀需求,互联网上有大量珍贵的开放资料,近年来深度学习如雨后春笋一般火热起来,但机器学习很多时候并不是苦于我的模型是否建立得合适,我的参数是否调整得正确,而是苦于最初的起步阶段:没有数据。 作为收集数据的前置工作,有能力去写一个简单的或者复杂的爬虫,对于我们来说依然非常重要。 ## 基于colly的单机爬虫 <details> <summary>main.go</summary> ``` package main import ( "fmt" "regexp" "time" "github.com/gocolly/colly" ) var visited = map[string]bool{} func main() { // Instantiate default collector c := colly.NewCollector( colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(1), ) // 我们认为匹配该模式的是该网站的详情页 detailRegex, _ := regexp.Compile(`/go/go\?p=\d+$`) // 匹配下面模式的是该网站的列表页 listRegex, _ := regexp.Compile(`/t/\d+#\w+`) // 所有a标签,上设置回调函数 c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") // 已访问过的详情页或列表页,跳过 if visited[link] && (detailRegex.Match([]byte(link)) || listRegex.Match([]byte(link))) { return } // 既不是列表页,也不是详情页 // 那么不是我们关心的内容,要跳过 if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) { println("not match", link) return } // 因为大多数网站有反爬虫策略 // 所以爬虫逻辑中应该有 sleep 逻辑以避免被封杀 time.Sleep(time.Second) println("match", link) visited[link] = true time.Sleep(time.Millisecond * 2) c.Visit(e.Request.AbsoluteURL(link)) }) err := c.Visit("https://www.abcdefg.com/go/go") if err != nil {fmt.Println(err)} } ``` </details> ## 分布式爬虫 在很多场景下,速度是有意义的: 1. 对于价格战期间的电商们来说,希望能够在对手价格变动后第一时间获取到其最新价格,再靠机器自动调整本家的商品价格。 2. 对于类似头条之类的Feed流业务,信息的时效性也非常重要。如果我们慢吞吞地爬到的新闻是昨天的新闻,那对于用户来说就没有任何意义。 ![](https://img.kancloud.cn/e2/3e/e23e089cd03457c45a9d23435aa6a768_394x604.png) 列表页的html内容中会包含有所有详情页的链接。详情页的数量一般是列表页的10到100倍,所以我们将这些详情页链接作为“任务”内容,通过消息队列分发出去。 针对页面爬取来说,在执行时是否偶尔会有重复其实不太重要,因为任务结果是幂等的(这里我们只爬页面内容,不考虑评论部分)。 ### nats [nats](../../../%E6%B6%88%E6%81%AF%E4%B8%AD%E9%97%B4%E4%BB%B6/nats.md) ### 结合nats和colly结合 我们为每一个网站定制一个对应的collector,并设置相应的规则,比如abcdefg,hijklmn(虚构的),再用简单的工厂方法来将该collector和其host对应起来,每个站点爬到列表页之后,需要在当前程序中把所有链接解析出来,并把落地页的URL发往消息队列 <details> <summary>消息生产 main.go</summary> ``` package main import ( "fmt" "net/url" "github.com/gocolly/colly" ) var domain2Collector = map[string]*colly.Collector{} var nc *nats.Conn var maxDepth = 10 var natsURL = "nats://localhost:4222" func factory(urlStr string) *colly.Collector { u, _ := url.Parse(urlStr) return domain2Collector[u.Host] } func initABCDECollector() *colly.Collector { c := colly.NewCollector( colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(maxDepth), ) c.OnResponse(func(resp *colly.Response) { // 做一些爬完之后的善后工作 // 比如页面已爬完的确认存进 MySQL }) c.OnHTML("a[href]", func(e *colly.HTMLElement) { // 基本的反爬虫策略 link := e.Attr("href") time.Sleep(time.Second * 2) // 正则 match 列表页的话,就 visit if listRegex.Match([]byte(link)) { c.Visit(e.Request.AbsoluteURL(link)) } // 正则 match 落地页的话,就发消息队列 if detailRegex.Match([]byte(link)) { err = nc.Publish("tasks", []byte(link)) nc.Flush() } }) return c } func initHIJKLCollector() *colly.Collector { c := colly.NewCollector( colly.AllowedDomains("www.hijklmn.com"), colly.MaxDepth(maxDepth), ) c.OnHTML("a[href]", func(e *colly.HTMLElement) { }) return c } func init() { domain2Collector["www.abcdefg.com"] = initABCDECollector() domain2Collector["www.hijklmn.com"] = initHIJKLCollector() var err error nc, err = nats.Connect(natsURL) if err != nil {os.Exit(1)} } func main() { urls := []string{"https://www.abcdefg.com", "https://www.hijklmn.com"} for _, url := range urls { instance := factory(url) instance.Visit(url) } } ``` </details> <details> <summary>消息消费 main.go</summary> ``` package main import ( "fmt" "net/url" "github.com/gocolly/colly" ) var domain2Collector = map[string]*colly.Collector{} var nc *nats.Conn var maxDepth = 10 var natsURL = "nats://localhost:4222" func factory(urlStr string) *colly.Collector { u, _ := url.Parse(urlStr) return domain2Collector[u.Host] } func initV2exCollector() *colly.Collector { c := colly.NewCollector( colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(maxDepth), ) return c } func initV2fxCollector() *colly.Collector { c := colly.NewCollector( colly.AllowedDomains("www.hijklmn.com"), colly.MaxDepth(maxDepth), ) return c } func init() { domain2Collector["www.abcdefg.com"] = initV2exCollector() domain2Collector["www.hijklmn.com"] = initV2fxCollector() var err error nc, err = nats.Connect(natsURL) if err != nil {os.Exit(1)} } func startConsumer() { nc, err := nats.Connect(nats.DefaultURL) if err != nil {return} sub, err := nc.QueueSubscribeSync("tasks", "workers") if err != nil {return} var msg *nats.Msg for { msg, err = sub.NextMsg(time.Hour * 10000) if err != nil {break} urlStr := string(msg.Data) ins := factory(urlStr) // 因为最下游拿到的一定是对应网站的落地页 // 所以不用进行多余的判断了,直接爬内容即可 ins.Visit(urlStr) // 防止被封杀 time.Sleep(time.Second) } } func main() { startConsumer() } ``` </details>