企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 搜索引擎 ### Elasticsearch Elasticsearch是开源分布式搜索引擎的霸主,其依赖于Lucene实现,在部署和运维方面做了很多优化。当今搭建一个分布式搜索引擎比起Sphinx的时代已经是容易很多很多了。只要简单配置客户端IP和端口就可以了 #### go sdk ``` // 选用 elastic 版本时 // 注意与自己使用的 elasticsearch 要对应 import ( elastic "gopkg.in/olivere/elastic.v3" ) var esClient *elastic.Client func initElasticsearchClient(host string, port string) { var err error esClient, err = elastic.NewClient( elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)), elastic.SetMaxRetries(3), ) if err != nil { // log error } } ``` 插入: ``` func insertDocument(db string, table string, obj map[string]interface{}) { id := obj["id"] var indexName, typeName string // 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type // 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段 // 所以单个 index 内容过多会导致性能问题 // 在新版本中 type 已经废弃 // 为了让不同表的数据落入不同的 index,这里我们用 table+name 作为 index 的名字 indexName = fmt.Sprintf("%v_%v", db, table) typeName = table // 正常情况 res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do() if err != nil { // handle error } else { // insert success } } ``` 获取: ``` func query(indexName string, typeName string) (*elastic.SearchResult, error) { // 通过 bool must 和 bool should 添加 bool 查询条件 q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1), elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m"))) q = q.Should( elastic.NewMatchPhraseQuery("name", "alex"), elastic.NewMatchPhraseQuery("name", "xargin"), ) searchService := esClient.Search(indexName).Type(typeName) res, err := searchService.Query(q).Do() if err != nil { // log error return nil, err } return res, nil } ``` 删除: ``` func deleteDocument( indexName string, typeName string, obj map[string]interface{}, ) { id := obj["id"] res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do() if err != nil { // handle error } else { // delete success } } ``` #### 将 sql 转换为 DSL ``` github.com/cch123/elasticsql ``` demo ``` package main import ( "fmt" "github.com/cch123/elasticsql" ) var sql = ` select * from aaa where a=1 and x = '三个男人' and create_time between '2015-01-01T00:00:00+0800' and '2016-01-01T00:00:00+0800' and process_id > 1 order by id desc limit 100,10 ` func main() { dsl, esType, _ := elasticsql.Convert(sql) fmt.Println(dsl) fmt.Println(esType) } ``` ## 异构数据同步 在实际应用中,我们很少直接向搜索引擎中写入数据。更为常见的方式是,将MySQL或其它关系型数据中的数据同步到搜索引擎中。而搜索引擎的使用方只能对数据进行查询,无法进行修改和删除 ### 通过时间戳进行增量数据同步 把最近十分钟创建的所有出库单取出,批量存入es中,取数据的操作需要执行的逻辑可以表达为下面的SQL: ``` select * from wms_orders where update_time >= date_sub(now(), interval 10 minute); ``` 当然,考虑到边界情况,我们可以让这个时间段的数据与前一次的有一些重叠: ``` select * from wms_orders where update_time >= date_sub( now(), interval 11 minute ); ``` ### 通过 binlog 进行数据同步 ![](https://img.kancloud.cn/51/ac/51acae5e6be3d3cdce72d922d103db5c_281x461.png) 业界使用较多的是阿里开源的Canal,来进行binlog解析与同步。**canal会伪装成MySQL的从库**,然后解析好行格式的binlog,再以更容易解析的格式(例如json)发送到消息队列