💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] > * 检索可以: * 全文搜索的数据首先会被analysis分析器分词解析成倒排索引,进而存储成为文档,以便于全文搜索。 1. 结构化查询:在某字段精准的结构化匹配查询(类似于关系型数据库) 2. 全文搜索:使用文档的所有字段匹配某个关键字,然后按照匹配的相似程度排序输出搜索结果 3. 结合以上两条搜索方式 > * search api 1. 简单风格:查询字符串(query string)将所有参数通过查询字符串定义 2. 结构化查询语句(DSL):使用json表示完整的请求体 > * elasticsearch的数据类型 1. 确切值:如名字,日期等确定的唯一的词或短语(数值、日期。。。) 2. 全文文本:人类语言书写的文章,邮件内容等,为了对全文文本进行分析,elasticsearch会对文本进行分析(分词),形成倒排索引(字符串)。 ### 1. 中文分词器 ----------------------------------------------------------------------------------------------------------------------------------------------- 1.安装 ~~~ ./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v5.5.2/elasticsearch-analysis-ik-5.5.2.zip ~~~ 2.创建索引 `curl -XPUT http://192.168.56.130:9200/index` 3.设置属性 ~~~ curl -XPOST http://192.168.56.130:9200/index/fulltext/_mapping -d' { "properties": { "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } }' ~~~ 4. 插入数据 ~~~ curl -XPOST http://192.168.56.130:9200/index/fulltext/1 -d' {"content":"美国留给伊拉克的是个烂摊子吗"}' curl -XPOST http://192.168.56.130:9200/index/fulltext/2 -d' {"content":"公安部:各地校车将享最高路权"}' curl -XPOST http://192.168.56.130:9200/index/fulltext/3 -d' {"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}' curl -XPOST http://192.168.56.130:9200/index/fulltext/4 -d' {"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}' ~~~ 5.查询数据 ~~~ curl -XPOST http://192.168.56.130:9200/index/fulltext/_search?pretty -d' { "query" : { "match" : { "content" : "中国" }}, "highlight" : { "pre_tags" : ["<tag1>", "<tag2>"], "post_tags" : ["</tag1>", "</tag2>"], "fields" : { "content" : {} } } }' ~~~ 6. 查询类型 ![](https://box.kancloud.cn/415c688031f65288c4fc27664550697b_1482x609.png) ### 2. java 客户端 1. pom.xml ~~~ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.aixin.elasticsearch</groupId> <artifactId>elasticearchclient</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.5.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>5.5.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>x-pack-transport</artifactId> <version>5.5.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>rest</artifactId> <version>5.5.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> ~~~ ### 2.1 node 自动探查 1. client集群自动探查 > 1. 默认情况下,是根据我们手动指定的所有节点,依次轮询这些节点,来发送各种请求的,如下面的代码,我们可以手动为client指定多个节点 ~~~ TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost2"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost3"), 9300)); ~~~ >2. 但是问题是,如果我们有成百上千个节点呢?为了对来自客户端的请求可以负载均衡,难道也要这样手动添加吗? > 3. es client提供了一种集群节点自动探查的功能,打开这个自动探查机制以后,es client会根据我们手动指定的几个节点连接过去,然后通过集群状态`自动获取`当前集群中的`所有data node`,然后用这份完整的列表更新自己内部要发送请求的node list。默认每隔5秒钟,就会更新一次node list。 > 但是注意,es cilent是不会将Master node纳入node list的,因为要避免给master node发送搜索等请求。 > 这样的话,我们其实直接就指定几个master node,或者1个node就好了,client会自动去探查集群的所有节点,而且每隔5秒还会自动刷新。非常棒。 ~~~ Settings settings = Settings.builder() .put("client.transport.sniff", true).build(); # 开启自动探查 TransportClient client = new PreBuiltTransportClient(settings); ~~~ * 使用上述的settings配置,将client.transport.sniff设置为true即可打开集群节点自动探查功能 2. 代码 ~~~ package com.aixin.elasticsearch.client; import org.apache.lucene.index.Fields; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient; import java.io.IOException; import java.net.InetAddress; import java.util.*; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; /** * Created by dailin on 2017/8/31. */ public class ElasticsearchClient { private static ElasticsearchClient elasticsearchClient = null; private static TransportClient client = null; private ElasticsearchClient(String host,String user,String password) { try { Settings settings = Settings.builder() .put("xpack.security.user", user+":"+password) .put("cluster.name", "e-cluster").build(); client = new PreBuiltXPackTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300)); } catch (Exception e) { e.printStackTrace(); } } public static ElasticsearchClient getInstance(String host,String user,String password) { if (elasticsearchClient == null) { synchronized (ElasticsearchClient.class) { if (elasticsearchClient == null) { elasticsearchClient = new ElasticsearchClient(host,user,password); } } } return elasticsearchClient; } /** * 获取文档 * @param index 索引 * @param type 类型 * @param id 文档id * @return */ public Document get(String index, String type, String id) { Document document = new Document(index,type,id); Map<String, String> result = new HashMap<String, String>(); GetResponse response = client.prepareGet(index, type, id).get(); Map<String, Object> source = response.getSource(); for (Map.Entry<String, Object> map : source.entrySet()) { document.puToFile(map.getKey(), map.getValue().toString()); } return document; } /** * 删除文档 * @param index 索引 * @param type 类型 * @param id 文档id */ public void deleteById(String index, String type, String id) { DeleteResponse dr = client.prepareDelete(index, type, id).get(); } public List<String> matchs(String index, String field, String text) { List<String> result = new ArrayList<String>(); QueryBuilder qb = matchQuery(field, text); SearchResponse response = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(qb)// 设置字段和值 .get(); SearchHits hits = response.getHits(); SearchHit[] hits1 = hits.getHits(); for (SearchHit sh : hits1) { for (Map.Entry<String, Object> map : sh.getSource().entrySet()) { result.add(map.getValue().toString()); } } return result; } /** * 文档词频统计 * @param index * @param type * @param id * @return Map<词语,频率> * @throws IOException */ public Map<String,Integer> termVectos(String index, String type, String id,String field) throws IOException { TermVectorsRequest.FilterSettings filterSettings = new TermVectorsRequest.FilterSettings(); filterSettings.minWordLength = 2; TermVectorsResponse resp = client.prepareTermVectors(index, type, id) .setFilterSettings(filterSettings) .setSelectedFields(field) .execute().actionGet(); //获取字段 Fields fields = resp.getFields(); Iterator<String> iterator = fields.iterator(); Map<String,Integer> result = new HashMap<String, Integer>(); while (iterator.hasNext()){ String dfield = iterator.next(); Terms terms = fields.terms(dfield); //获取字段对应的terms TermsEnum termsEnum = terms.iterator(); //termsEnum包含词语统计信息 while (termsEnum.next() != null){ String word = termsEnum.term().utf8ToString(); int freq = termsEnum.postings(null,120).freq(); result.put(word,freq); } } return result; } /** * 字段之上精准查询 * @param index * @param type * @param field * @param text * @return */ public List<String> termQuery( String index, String type,String field,String text) { TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery(field, text); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index) .setTypes(type) .setQuery(termQueryBuilder); SearchResponse searchResponse = searchRequestBuilder.get(); SearchHit[] hits = searchResponse.getHits().getHits(); List<String> values = new ArrayList<String>(); for (SearchHit hit : hits) { for (Map.Entry<String, Object> map : hit.getSource().entrySet()) { values.add(map.getValue().toString()); } } return values; } /** * 分组聚合,对应操作一节中7。查询聚合(多次分组) */ public void doubleAggSearch() { SearchResponse searchResponse = client.prepareSearch("company") .addAggregation(AggregationBuilders.terms("group_by_country").field("country") .subAggregation(AggregationBuilders .dateHistogram("group_by_join_date") .field("join_date") .dateHistogramInterval(DateHistogramInterval.YEAR) .subAggregation(AggregationBuilders.avg("avg_age").field("age"))) ) .execute().actionGet(); Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap(); StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country"); Iterator<StringTerms.Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator(); //country 组 while (groupByCountryBucketIterator.hasNext()) { StringTerms.Bucket groupByCountryBucket = groupByCountryBucketIterator.next(); System.out.println(groupByCountryBucket.getKey() + ":" + groupByCountryBucket.getDocCount()); Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); //获取country内日期组 Iterator<? extends Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator(); while (groupByJoinDateBucketIterator.hasNext()) { org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next(); System.out.println(groupByJoinDateBucket.getKey() + ":" + groupByJoinDateBucket.getDocCount()); Avg avg = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_age"); //日期组内平均年龄 System.out.println(avg.getValue()); } } } /** * 全文搜索 * @param index 索引 * @param field 字段 * @param text 查询语句 * @return Document对象 */ public List<Document> match(String index, String field, String text) { List<Document> result = new ArrayList<Document>(); QueryBuilder qb = matchQuery(field, text); SearchResponse response = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(qb)// 设置字段和值 .get(); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.internalHits(); for (SearchHit hist : searchHits){ Document document = new Document(hist.getIndex(),hist.getType(),hist.getId()); result.add(document); Map<String, Object> source = hist.getSource(); Set<Map.Entry<String, Object>> entries = source.entrySet(); for (Map.Entry<String,Object> v : entries){ document.puToFile(v.getKey(),v.getValue().toString()); } } return result; } /** * 标记与查询语句相匹配的词语 * @param index 索引 * @param field 字段 * @param text 查询语句 * @return 文档(Document)对象 */ public List<Document> matchHighLight(String index, String field, String text) { List<Document> result = new ArrayList<Document>(); HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.postTags("<mark>"); highlightBuilder.preTags("</mark>"); highlightBuilder.field(field); QueryBuilder qb = matchQuery(field, text); SearchResponse response = client.prepareSearch(index) .highlighter(highlightBuilder) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(qb)// 设置字段和值 .get(); SearchHits hits = response.getHits(); SearchHit[] searchHits = hits.internalHits(); for (SearchHit hist : searchHits){ Map<String, HighlightField> highlightFields = hist.getHighlightFields(); Set<Map.Entry<String, HighlightField>> entries1 = highlightFields.entrySet(); for (Map.Entry<String,HighlightField> v : entries1){ Document document = new Document(hist.getIndex(),hist.getType(),hist.getId()); document.puToFile( v.getKey(),v.getValue().toString()); result.add(document); } } return result; } } ~~~ ### 2.2 更新字段 ~~~ package com.roncoo.es.senior; import java.net.InetAddress; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.transport.client.PreBuiltTransportClient; public class UpsertCarInfoApp { @SuppressWarnings({ "unchecked", "resource" }) public static void main(String[] args) throws Exception { Settings settings = Settings.builder() .put("cluster.name", "e-cluster") .put("client.transport.sniff", true) .build(); TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.56.130"), 9300)); IndexRequest indexRequest = new IndexRequest("car_shop", "cars", "1") .source(XContentFactory.jsonBuilder() .startObject() .field("brand", "宝马") .field("name", "宝马320") .field("price", 310000) .field("produce_date", "2017-01-01") .endObject()); UpdateRequest updateRequest = new UpdateRequest("car_shop", "cars", "1") .doc(XContentFactory.jsonBuilder() .startObject() .field("price", 320000) .endObject()) .upsert(indexRequest); //如果有这个index更新,没有就插入 UpdateResponse updateResponse = client.update(updateRequest).get(); System.out.println(updateResponse.getVersion()); client.close(); } } ~~~