故事背景,珍爱网数据爬取
数据爬取注册为PRC服务
1.建立客户端
~~~
package main
import (
"fmt"
"gopkg.in/olivere/elastic.v5"
"log"
"spider/spider_distribution/config"
"spider/spider_distribution/persist"
"spider/spider_distribution/rpcsupport"
)
func main() {
log.Fatal(serverRpc(fmt.Sprintf(":%d",config.ItemSaverPort), config.ElasticIndex))
}
func serverRpc(host, index string) error {
client, err := elastic.NewClient(
elastic.SetSniff(false))
if err != nil {
return err
}
return rpcsupport.ServerRpc(host, &persist.ItemSaverService{
Client: client,
Index: index,
})
}
~~~
2.rpc服务封装
~~~
package rpcsupport
import (
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func ServerRpc(host string,service interface{}) error {
rpc.Register(service)
listener, err := net.Listen("tcp", host)
if err != nil {
panic(err)
}
log.Printf("listening on %s",host)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("accept error :%v",err)
continue
}
go jsonrpc.ServeConn(conn)
}
return nil
}
func NewClient(host string) (*rpc.Client,error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil ,err
}
return jsonrpc.NewClient(conn),nil
}
~~~
3,客户端调用
~~~
package client
import (
"log"
"spider/engine"
"spider/spider_distribution/config"
"spider/spider_distribution/rpcsupport"
)
func ItemSaver(host string)(chan engine.Item,error) {
client, err := rpcsupport.NewClient(host)
if err != nil {
return nil,err
}
out := make(chan engine.Item)
const index = "cfun"
go func() {
itemCount := 0
for {
item := <-out
log.Printf("item saver:got item "+"# %d : %v", itemCount, item)
itemCount++
//call Rpc to save item
result:=""
err:=client.Call(config.ItemSaverRpc,item,&result)
if err != nil {
log.Printf("Item Saver :error "+"saving item %v : %v", item, err)
}
}
}()
return out,nil
}
~~~
//数据保存
~~~
package persist
import (
"gopkg.in/olivere/elastic.v5"
"log"
"spider/engine"
"spider/persist"
)
type ItemSaverService struct {
Client *elastic.Client
Index string
}
func (s *ItemSaverService) Save(item engine.Item, result *string) error {
err := persist.Save(s.Client, s.Index, item)
log.Printf("Item %v saved.",item)
if err == nil {
*result="ok"
}else{
log.Printf("Error saving item %v: %v",item,err)
}
return err
}
~~~