segments概念

在Lucene⾥⾯有很多⼩的segment,我们可以把它们看成Lucene内部的mini-index,即为存储的最⼩管理单位。
重要特性:不可变性。

segments结构

  • Inverted Index
  • Stored Fields
  • Document Values
  • Cache

Inverted Index

倒排索引(Inverted Index)也叫反向索引,有反向索引必有正向索引。通俗地来讲,正向索引是通过key找value,反向索引则是通过value找key。

  • Term(单词):⼀段⽂本经过分析器分析以后就会输出⼀串单词,这⼀个⼀个的就叫做Term(直译为:单词)
  • Term Dictionary(单词字典):顾名思义,它⾥⾯维护的是Term,可以理解为Term的集合
  • Term Index(单词索引):为了更快的找到某个单词,我们为单词建⽴索引
  • Posting List(倒排列表):倒排列表记录了出现过某个单词的所有⽂档的⽂档列表及单词在该⽂档中出现的位置信息,每条记录称为⼀个倒排项(Posting)。根据倒排列表,即可获知哪些⽂档包含某个单词。

实际的倒排列表中并不只是存了⽂档ID这么简单,还有⼀些其它的信息,⽐如:词频(Term出现的次数)、偏移量(offset)等,可以想象成是Java中的对象

WX20210203-105228

WX20210203-105242

Term Dictionary

Elasticsearch为了能快速找到某个term,将所有的term排个序,⼆分法查找term,logN的查找效率,就像通过字典查找⼀样,这就是Term Dictionary。

Term Index

mysql 的B-Tree数据结构通过减少磁盘寻道次数来提⾼查询性能,Elasticsearch也是采⽤同样的思路,直接通过内存查找term,不读磁盘,但是如果term太多,term dictionary也会很⼤,放内存不现实,于是有了Term Index,就像字典⾥的索引⻚⼀样,A开头的有哪些term,分别在哪⻚,可以理解term index是⼀颗树。

这棵树不会包含所有的term,它包含的是term的⼀些前缀。通过term index可以快速地定位到term dictionary的某个offset,然后从这个位置再往后顺序查找。 Es4.0之后采⽤了FST结构。
WX20210203-105726

Posting List压缩存储

在 lucene 中,要求 postings lists 都要是有序的整形数组。这样就带来了一个很好的好处,可以通过 增量编码(delta-encode)这种方式进行压缩。
比如现在有 id 列表 [73, 300, 302, 332, 343, 372],转化成每一个 id 相对于前一个 id 的增量值(第一个 id 的前一个 id 默认是 0,增量就是它自己)列表是[73, 227, 2, 30, 11, 29]。在这个新的列表里面,所有的 id 都是小于 255 的,所以每个 id 只需要一个字节存储。

实际上 ES 会做的更加精细,

它会把所有的文档分成很多个 block,每个 block 正好包含 256 个文档,然后单独对每个文档进行增量编码,计算出存储在这个 block 里面所有文档的最大的id增量值需要多少位保存,并且把这个位数作为头信息(header)放在每个 block 的前面。这个技术叫 Frame of Reference。

es设计

es分片

为了⽀持对海量数据的存储和查询,Elasticsearch引⼊分⽚的概念,⼀个索引被分成、多个分⽚,每个分⽚可以有⼀个主分⽚和多个副本分⽚,每个分⽚副本都是⼀个具有完整功能的lucene实例。分⽚可以分配在不同的服务器上,同⼀个分⽚的不同副本不能分配在相同的服务器上。

在进⾏写操作时,ES会根据传⼊的routing参数(或mapping中设置的routing, 如果参数和设置中都没有则默认使⽤_id), 按照公式 shard_num = hash(\routing) % num_primary_shards ,计算出⽂档要分配到的分⽚,在从集群元数据中找出对应主分⽚的位置,将请求路由到该分⽚进⾏⽂档写操作。

近实时性-refresh操作

当⼀个⽂档写⼊Lucene后是不能被⽴即查询到的,Elasticsearch提供了⼀个refresh操作,会定时地调⽤lucene的reopen(新版本为openIfChanged)为内存中新写⼊的数据⽣成⼀个新的segment,此时被处理的⽂档均可以被检索到。

refresh操作的时间间隔由 refresh_interval 参数控制,默认为1s, 当然还可以在写⼊请求中带上refresh表示写⼊后⽴即refresh,另外还可以调⽤refresh API显式refresh。

数据可靠性

  • 引⼊translog

当⼀个⽂档写⼊Lucence后是存储在内存中的,即使执⾏了refresh操作仍然是在⽂件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进⾏⽂档写操作时会先将⽂档写⼊Lucene,然后写⼊⼀份到translog,写⼊translog是落盘的(如果对可靠性要求不是很⾼,也可以设置异步落盘,可以提⾼性能,由配置 index.translog.durabilityindex.translog.sync_interval 控制),这样就可以防⽌服务器宕机后数据的丢失。

由于translog是追加写⼊,因此性能⽐较好。与传统的分布式系统不同,这⾥是先写⼊Lucene再写⼊translog,原因是写⼊Lucene可能会失败,为了减少写⼊失败回滚的复杂度,因此先写⼊Lucene.

  • flush操作

另外每30分钟或当translog达到⼀定⼤⼩(由 index.translog.flush_threshold_size 控制,默认512mb), ES会触发⼀次flush操作,此时ES会先执⾏refresh操作将buffer中的数据⽣成segment,然后调⽤lucene的commit⽅法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据。

  • merge操作

由于refresh默认间隔为1s中,因此会产⽣⼤量的⼩segment,为此ES会运⾏⼀个任务检测当前磁盘中的segment,对符合条件的segment进⾏合并操作,减少lucene中的segment个数,提⾼查询速度,降低负载。不仅如此,merge过程也是⽂档删除和更新操作后,旧的doc真正被删除的时候。

⽤户还可以⼿动调⽤_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的⽂档。

  • 多副本机制

另外ES有多副本机制,⼀个分⽚的主副分⽚不能分⽚在同⼀个节点上,进⼀步
保证数据的可靠性。

部分更新

lucene⽀持对⽂档的整体更新,ES为了⽀持局部更新,在Lucene的Store索引中存储了⼀个source字段,该字段的key值是⽂档ID, 内容是⽂档的原⽂。

当进⾏更新操作时先从source中获取原⽂,与更新部分合并后,再调⽤lucene API进⾏全量更新, 对于写⼊了ES但是还没有refresh的⽂档,可以从translog中获取。另外为了防⽌读取⽂档过程后执⾏更新前有其他线程修改了⽂档,ES增加了版本机制,当执⾏更新操作时发现当前⽂档的版本与预期不符,则会重新获取⽂档再更新。

写入文档

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进⾏⼀系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发至primary shard所在的node上,然后该node将数据保存至primary shard, primary shard完成写⼊后,再将写⼊并发发送给各replica shard, raplica shard写⼊操作后返回给primary shard所在node,primary shard 所在node 再将请求返回给协调节点。⼤致流程如下图:
WX20210203-135627

coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接
受并转发请求。当⼀个节点接受到写请求或更新请求后,会执⾏如下操作:

  • ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执⾏pipeline中的逻辑,⼀般是对⽂档进⾏各种预处理,如格式调整,增加字段等。如果当前节点没有ingest⻆⾊,则需要将请求转发给有ingest⻆⾊的节点执⾏。
  • ⾃动创建索引
    判断索引是否存在,如果开启了⾃动创建则⾃动创建,否则报错
  • 设置routing
    获取请求URL或mapping中的routing,如果没有则使⽤id, 如果没有指定id则ES会⾃动⽣成⼀个全局唯⼀ID。该routing字段⽤于决定⽂档分配在索引的哪个shard上。
  • 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执⾏,因此协调节点,会将请求按照shard分开,同⼀个shard上的请求聚合到⼀起,构建BulkShardRequest
  • 将请求发送给primary shard所在node
    因为当前执⾏的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  • 等待primary shard返回

primary shard

Primary请求的⼊⼝是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执⾏的逻辑如下:

  • 判断操作类型
    遍历bulk请求中的各⼦请求,根据不同的操作类型跳转到不同的处理逻辑
  • 将update操作转换为Index和Delete操作
    获取⽂档的当前内容,与update内容合并⽣成新⽂档,然后将update请求转换成index请求,此处⽂档设置⼀个version v1
  • Parse Doc
    解析⽂档的各字段,并添加如_uid等ES相关的⼀些系统字段
  • 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template⽣成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  • 获取sequence Id和Version
    从SequcenceNumberService获取⼀个sequenceID和Version。SequcenID⽤于初始化LocalCheckPoint, verion是根据当前Versoin+1⽤于防⽌并发写导致数据不⼀致。
  • 写⼊lucene
    这⼀步开始会对⽂档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否⼀致,不⼀致则返回第⼆步重新执⾏。 如果version⼀致,如果同id的doc已经存在,则调⽤lucene的updateDocument接⼝,如果是新⽂档则调⽤lucene的addDoucument. 这⾥有个问题,如何保证Delete-Then-Add的原⼦性,ES是通过在Delete之前会加上已refresh 锁,禁⽌被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原⼦性。
  • 写⼊translog
    写⼊Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满⾜nosql场景的实时性。
  • 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进⾏调整,只包含index或delete操作,不需要再进⾏update的处理操作。
  • flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不⾼,可以设置translog异步,那么translog的fsync将会异步执⾏,但是落盘前的数据有丢失⻛险。
  • 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这⾥需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执⾏失败,则primary会给master发请求remove该shard。这⾥会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  • 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

replica shard

Replica 请求的⼊⼝是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执⾏如下流程:

  • 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执⾏对应的操作
  • Parse Doc
  • 更新mapping
  • 获取sequenceId和Version 直接使⽤primary shard发送过来的请求中的内容即可
  • 写如lucene
  • write Translog
  • Flush translog

读取文档

索引数据

索引数据,就是按照id查询:

  • 客户端发送请求到任意⼀个node,称为coordinate node , coordinate node对document进⾏路由,将请求转发到对应的node,此时会使⽤round-robin随机轮询算法,在primary shard以及其所有replica中随机选择⼀个,让读请求负载均衡接收请求的node返回document给coordinate node
    coordinate node返回document给客户端

搜索数据

search⼀个⽐较复杂的执⾏模式,因为我们不知道那些document会被匹配到哪个shard,任何⼀个shard上都有可能,所以⼀个search请求必须查询⼀个索引或多个索引⾥⾯的所有shard才能完整的查询到我们想要的结果。

找到所有匹配的结果是查询的第⼀步,来⾃多个shard上的数据集在分⻚返回到客户端的之前会被合并到⼀个排序后的list列表,由于需要经过⼀步取top N的操作,所以search需要进过两个阶段才能完成,分别是query和fetch。

query(查询阶段)

当⼀个search请求发出的时候,这个query会被⼴播到索引⾥⾯的每⼀个shard(主shard或副本shard),每个shard会在本地执⾏查询请求后会⽣成⼀个命中⽂档的有序队列。

这队列是⼀个排序好的top N数据的列表,它的size等于from+size的和,也就是说如果你的from是10,size是10,那么这个队列的size就是20,所以这也是为什么深度分⻚不能⽤from+size这种⽅式,因为from越⼤,性能就越低。

query流程

  • 客户端发送⼀个search请求到某个coordinating Node上,然后这个Node会创建⼀个优先级队列它的⼤⼩=from+size
  • 接着这个Node转发这个search请求到索引⾥⾯每⼀个主shard或者副本shard上,每个shard会在本地查询然后添加结果到本地的排序好的优先级队列⾥⾯。
  • 每个shard返回docId和所有参与排序字段的值例如_score到优先级队列⾥⾯,然后再返回给coordinating节点也就是这个Node,然后这个Node负责将所有shard⾥⾯的数据给合并到⼀个全局的排序的列表。

注意这个结果集仅仅包含docId和所有排序的字段值

fetch(读取阶段)

query阶段标识了那些⽂档满⾜了该次的search请求,但是我们仍然需要检索回document整条数据,这个阶段称为fetch.

fetch流程:

  • coordinating 节点标识了那些document需要被拉取出来,并发送⼀个批量的mutil get请求到相关的shard上
  • 每个shard加载相关document,如果需要他们将会被返回到coordinating 节点上
  • ⼀旦所有的document被拉取回来,coordinating节点将会返回结果集到客户端上。

这⾥需要注意,coordinating节点拉取的时候只拉取需要被拉取的数据,⽐如from=90,size=10,那么fetch只会读取需要被读取的10条数据,这10条数据可能在⼀个shard上,也可能在多个shard上所以 coordinating节点会构建⼀个multi-get请求并发送到每⼀个shard上,每个shard会根据需要从_source字段⾥⾯获取数据,⼀旦所有的数据返回,coordinating节点会组装数据进⼊单个response⾥⾯然后将其返回给最终的client。

联合索引

查询过滤条件 age=18 的过程就是先从 term index 找到 18 在 term dictionary 的⼤概位置,然后再从 term dictionary ⾥精确地找到 18 这个 term,然后得到⼀个 posting list 或者⼀个指向posting list 位置的指针。

然后再查询 gender= ⼥ 的过程也是类似的。最后得出 age=18 AND gender=⼥ 就是把两个 posting list 做⼀个“与”的合并。

合并有两种⽅法,如果查询的filter缓存到了内存中(以bitset的形式),那么合并就是两个bitset的AND。如果查询的filter没有缓存,那么就用skip list的方式去遍历两个on disk的posting list

删除文档的操作

索引的每个文档都是版本化的。
删除文档时,可以指定版本以确保我们试图删除的相关文档实际上被删除,并且在此期间没有更改。

每个在文档上执行的写操作,包括删除,都会使其版本增加。

真正的删除时机:

deleting a document doesn’t immediately remove the document from disk; it just marks it as deleted. Elasticsearch will clean up deleted documents in the background as you continue to index more data.

删除索引和删除文档的区别?

  • 删除索引是会立即释放空间的,不存在所谓的“标记”逻辑。

  • 删除文档的时候,是将新文档写入,同时将旧文档标记为已删除。 磁盘空间是否释放取决于新旧文档是否在同一个segment file里面,因此ES后台的segment merge在合并segment file的过程中有可能触发旧文档的物理删除。

但因为一个shard可能会有上百个segment file,还是有很大几率新旧文档存在于不同的segment里而无法物理删除。想要手动释放空间,只能是定期做一下force merge,并且将max_num_segments设置为1。

POST /_forcemerge

数据较大的话forcemerge执行时间较长,30G-50G index merge时间大概是半小时,之后才返回响应。

merge操作

curl -XPOST 'http://192.168.1.101:9200/_forcemerge?
only_expunge_deletes=true&max_num_segments=1'

参数

  • max_num_segments 期望merge到多少个segments,1的意思是强行merge到1个segment
  • only_expunge_deletes 只做清理有deleted的segments,即瘦身
  • flush 清理完执行一下flush,默认是true

lucene

lucene基本概念

  • segment

lucene内部的数据是由⼀个个segment组成的,写⼊lucene的数据并不直接落盘,⽽是先写在内存中,经过了refresh间隔,lucene才将该时间段写⼊的全部数据refresh成⼀个segment,segment多了之后会进⾏merge成更⼤的segment。

lucene查询时会遍历每个segment完成。由于lucene* 写⼊的数据是在内存中完成,所以写⼊效率⾮常⾼。但是也存在丢失数据的⻛险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才
会删除对应的translog。

  • doc : doc表示lucene中的⼀条记录
  • field :field表示记录中的字段概念,⼀个doc由若⼲个field组成。
  • term :term是lucene中索引的最⼩单位,某个field对应的内容如果是全⽂检索类型,会将内容进⾏分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是⼀个term。
  • 倒排索引(inverted index): lucene索引的通⽤叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通⽤叫法,即原始数据,可以理解为⼀个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了⼀份docvalues,⽤作分析和排序。

lucene文件

lucene包的⽂件是由很多segment⽂件组成的,segments_xxx⽂件记录了lucene包下⾯的segment⽂件数量。每个segment会包含如下的⽂件。
WX20210203-144700