## 一、文档批量操作 ### 1、批量查询 ~~~ 1、批量查询的好处 就是一条一条的查询,比如说要查询100条数据,那么就要发送100次网络请求,这个开销还是很大的 如果进行批量查询的话,查询100条数据,就只要发送1次网络请求,网络请求的性能开销缩减100倍 2、mget的语法 (1)一条一条的查询 GET /test_index/test_type/1 GET /test_index/test_type/2 (2)mget批量查询 GET /_mget { "docs" : [ { "_index" : "test_index", "_type" : "test_type", "_id" : 1 }, { "_index" : "test_index", "_type" : "test_type", "_id" : 2 } ] } { "docs": [ { "_index": "test_index", "_type": "test_type", "_id": "1", "_version": 2, "found": true, "_source": { "test_field1": "test field1", "test_field2": "test field2" } }, { "_index": "test_index", "_type": "test_type", "_id": "2", "_version": 1, "found": true, "_source": { "test_content": "my test" } } ] } (3)如果查询的document是一个index下的不同type种的话 GET /test_index/_mget { "docs" : [ { "_type" : "test_type", "_id" : 1 }, { "_type" : "test_type", "_id" : 2 } ] } (4)如果查询的数据都在同一个index下的同一个type下,最简单了 GET /test_index/test_type/_mget { "ids": [1, 2] } 3、mget的重要性 可以说mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api 尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍,非常非常之重要 复制代码 ~~~ ### 2、批量增删改 ~~~ POST /_bulk { "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} { "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }} { "test_field": "test12" } { "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} } 每一个操作要两个json串,语法如下: {"action": {"metadata"}} {"data"} 举例,比如你现在要创建一个文档,放bulk里面,看起来会是这样子的: {"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}} {"test_field1": "test1", "test_field2": "test2"} 有哪些类型的操作可以执行呢? (1)delete:删除一个文档,只要1个json串就可以了 (2)create:PUT /index/type/id/_create,强制创建 (3)index:普通的put操作,可以是创建文档,也可以是全量替换文档 (4)update:执行的partial update操作 bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行 { "error": { "root_cause": [ { "type": "json_e_o_f_exception", "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]" } ], "type": "json_e_o_f_exception", "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]" }, "status": 500 } { "took": 41, "errors": true, "items": [ { "delete": { "found": true, "_index": "test_index", "_type": "test_type", "_id": "10", "_version": 3, "result": "deleted", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } }, { "create": { "_index": "test_index", "_type": "test_type", "_id": "3", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true, "status": 201 } }, { "create": { "_index": "test_index", "_type": "test_type", "_id": "2", "status": 409, "error": { "type": "version_conflict_engine_exception", "reason": "[test_type][2]: version conflict, document already exists (current version [1])", "index_uuid": "6m0G7yx7R1KECWWGnfH1sw", "shard": "2", "index": "test_index" } } }, { "index": { "_index": "test_index", "_type": "test_type", "_id": "4", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true, "status": 201 } }, { "index": { "_index": "test_index", "_type": "test_type", "_id": "2", "_version": 2, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": false, "status": 200 } }, { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_version": 3, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } } ] } bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志 POST /test_index/_bulk { "delete": { "_type": "test_type", "_id": "3" }} { "create": { "_type": "test_type", "_id": "12" }} { "test_field": "test12" } { "index": { "_type": "test_type" }} { "test_field": "auto-generate id test" } { "index": { "_type": "test_type", "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} } POST /test_index/test_type/_bulk { "delete": { "_id": "3" }} { "create": { "_id": "12" }} { "test_field": "test12" } { "index": { }} { "test_field": "auto-generate id test" } { "index": { "_id": "2" }} { "test_field": "replaced test2" } { "update": { "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} } 2、bulk size最佳大小 bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从1000~5000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在5~15MB之间。 复制代码 ~~~ ## 二、数据路由 ### 1.document路由到shard上是什么意思? ~~~ 一个index的数据会被分为多片,每片都在一个shard中,所以每个document只能存在一个shard中。 当在客户端创建document的时候,es就要决定这个document是要放在哪个index的shard上。这个过程就叫做document数据路由 复制代码 ~~~ ### 2.路由算法 ~~~ shard = hash(routing) % number_of_primary_shards 举个例子,一个index有3个primary shard,P0,P1,P2 每次增删改查一个document的时候,都会带过来一个routing number,默认就是这个document的_id(可能是手动指定,也可能是自动生成) routing = _id,假设_id=1 会将这个routing值,传入一个hash函数中,产出一个routing值的hash值,hash(routing) = 21 然后将hash函数产出的值对这个index的primary shard的数量求余数,21 % 3 = 0 就决定了,这个document就放在P0上。 决定一个document在哪个shard上,最重要的一个值就是routing值,默认是_id,也可以手动指定,相同的routing值,每次过来,从hash函数中,产出的hash值一定是相同的 无论hash值是几,无论是什么数字,对number_of_primary_shards求余数,结果一定是在0~number_of_primary_shards-1之间这个范围内的。0,1,2。 复制代码 ~~~ ### 3.\_id or custom routing value ~~~ 默认的routing就是_id 也可以在发送请求的时候,手动指定一个routing value,比如说put /index/type/id?routing=user_id 手动指定routing value是很有用的,可以保证说,某一类document一定被路由到一个shard上去,那么在后续进行应用级别的负载均衡,以及提升批量读取的性能的时候,是很有帮助的 复制代码 ~~~ ### 4\. primary shard数量为什么不可变? ~~~ 就是为了计算document 的路由。 复制代码 ~~~ ## 三、document内部增删改原理 ~~~ (1)客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点) (2)coordinating node,对document进行路由,将请求转发给对应的node(有primary shard) (3)实际的node上的primary shard处理请求,然后将数据同步到replica node (4)coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端 复制代码 ~~~ ![](data:image/svg+xml;utf8,) ## 四、document 读请求内部原理(与上面大致差不多) ~~~ 1、客户端发送请求到任意一个node,成为coordinate node 2、coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡 3、接收请求的node返回document给coordinate node 4、coordinate node返回document给客户端 5、特殊情况:document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有了 复制代码 ~~~ ## 五、写一致性及 quorum机制 #### 1.写一致性 ~~~ 如果有3个primary shard 必须primary shard都是活跃状态 才能执行写入操作。 复制代码 ~~~ #### 2.quorum机制 ~~~ (1)consistency,one(primary shard),all(all shard),quorum(default) 我们在发送任何一个增删改操作的时候,比如说put /index/type/id,都可以带上一个consistency参数,指明我们想要的写一致性是什么? put /index/type/id?consistency=quorum one:要求我们这个写操作,只要有一个primary shard是active活跃可用的,就可以执行 all:要求我们这个写操作,必须所有的primary shard和replica shard都是活跃的,才可以执行这个写操作 quorum:默认的值,要求所有的shard中,必须是大部分的shard都是活跃的,可用的,才可以执行这个写操作 (2)quorum机制,写之前必须确保大多数shard都可用,int( (primary + number_of_replicas) / 2 ) + 1,当number_of_replicas>1时才生效 quroum = int( (primary + number_of_replicas) / 2 ) + 1 举个例子,3个primary shard,number_of_replicas=1,总共有3 + 3 * 1 = 6个shard quorum = int( (3 + 1) / 2 ) + 1 = 3 所以,要求6个shard中至少有3个shard是active状态的,才可以执行这个写操作 (3)如果节点数少于quorum数量,可能导致quorum不齐全,进而导致无法执行任何写操作 3个primary shard,replica=1,要求至少3个shard是active,3个shard按照之前学习的shard&replica机制,必须在不同的节点上,如果说只有1台机器的话,是不是有可能出现说,3个shard都没法分配齐全,此时就可能会出现写操作无法执行的情况 1个primary shard,replica=3,quorum=((1 + 3) / 2) + 1 = 3,要求1个primary shard + 3个replica shard = 4个shard,其中必须有3个shard是要处于active状态的。如果这个时候只有2台机器的话,会出现什么情况呢? es提供了一种特殊的处理场景,就是说当number_of_replicas>1时才生效,因为假如说,你就一个primary shard,replica=1,此时就2个shard (1 + 1 / 2) + 1 = 2,要求必须有2个shard是活跃的,但是可能就1个node,此时就1个shard是活跃的,如果你不特殊处理的话,导致我们的单节点集群就无法工作 (4)quorum不齐全时,wait,默认1分钟,timeout,100,30s 等待期间,期望活跃的shard数量可以增加,最后实在不行,就会timeout 我们其实可以在写操作的时候,加一个timeout参数,比如说put /index/type/id?timeout=30,这个就是说自己去设定quorum不齐全的时候,es的timeout时长,可以缩短,也可以增长 复制代码 ~~~ ## 六、bulk api 奇特格式与底层性能优化 #### 1.前面批量增删改 大致提了下关于 bulk 的用法 ~~~ bulk api奇特的json格式 {"action": {"meta"}}\n {"data"}\n {"action": {"meta"}}\n {"data"}\n [{ "action": { }, "data": { } }] 1、bulk中的每个操作都可能要转发到不同的node的shard去执行 2、如果采用比较良好的json数组格式 允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理 (1)将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象 (2)解析json数组里的每个json,对每个请求中的document进行路由 (3)为路由到同一个shard上的多个请求,创建一个请求数组 (4)将这个请求数组序列化 (5)将序列化后的请求数组发送到对应的节点上去 3、耗费更多内存,更多的jvm gc开销 我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。 占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降 另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多 4、现在的奇特格式 {"action": {"meta"}}\n {"data"}\n {"action": {"meta"}}\n {"data"}\n (1)不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json (2)对每两个一组的json,读取meta,进行document路由 (3)直接将对应的json发送到node上去 5、最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能 ~~~ 作者:Leo\_CX330 链接:https://juejin.cn/post/6926344149796913166 来源:掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。