Elasticsearch使用要点(二)-创建,更新,删除

创建

在es中, 文档是不可改变的, 更新文档可以直接对相同id的文档进行index 或者 update操作(相当于执行get,index操作). 在内部, es 将旧文档标记为已删除, 并增加一个全新的文档. 当被标记为删除的文档累计一定的数量或持续时间后, es会在后台清理这些文档.

  • index文档时可以加op_type=create参数, 来指定只有新增时才会创建成功, 否则报文档冲突异常.
  • 创建时可以指定索引的主分片数量, 复制分片数量.

    1
    2
    3
    4
    5
    6
    {
    "settings": {
    "number_of_shards" : 1,
    "number_of_replicas" : 0
    }
    }

更新

  • 更新或者创建时, 可以加version=?参数, 来使用乐观锁机制指定es中该文档当前的version值进行数据的更新. 类似redis事务的watch命令.(并发更新时, 可以使用时间戳作为version, 如:传入参数:version=10&version_type=external, 避免因网络等因素, 导致的新更新被旧更新覆盖).

  • 局部更新可以在update请求中指定doc属性, 来进行局部更新.

  • 批量操作可以使用bulk-api, 实现定时,定量刷新提交策略.整合多个原子操作于一个es请求中. 注意:bulk中每一个操作是原子的, 但是bulk整个不是原子的, 无法实现事务.

删除

旧的es版本可以支持条件删除操作.

5.0以后的版本不再支持条件删除, 只能通过文档id来删除.

逻辑可以通过scroll API查询出所有满足条件的id, 再使用id来删除.

java代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public int batchDeleteByQuery(String index, String type, QueryBuilder queryBuilder){
// collect going to delete ids
long ct = System.currentTimeMillis();
Set<String> ids = Sets.newHashSet();
SearchResponse response = this.getClient()
.prepareSearch(index)
.setTypes(type)
.setQuery(queryBuilder)
.setSize(300)
.setScroll(TimeValue.MINUS_ONE)
.setFetchSource(false)
.execute()
.actionGet();
log.debug("response:{} ", response);
ids.addAll(Arrays.stream(response.getHits().getHits()).map(searchHitFields -> searchHitFields.getId()).collect(
Collectors.toList()));
while (ArrayUtils.isNotEmpty(response.getHits().getHits())){
response = this.getClient()
.prepareSearchScroll(response.getScrollId())
.setScroll(TimeValue.MINUS_ONE)
.execute()
.actionGet();
log.debug("response:{} ", response);
ids.addAll(Arrays.stream(response.getHits().getHits()).map(searchHitFields -> searchHitFields.getId()).collect(Collectors.toList()));
}
// remove scroll
this.getClient().prepareClearScroll().addScrollId(response.getScrollId()).execute().actionGet();

// iterater to delete
for(String id : ids){
this.getDefaultBulkProcessor().add(new DeleteRequest(index, type, id));
}

log.info("delete doc. index:{}, type:{}, ids size:{}, costs:{}ms, query:{} ", index, type, ids.size(),
System.currentTimeMillis() - ct, queryBuilder);
log.debug("delete ids:{} ", ids);
return ids.size();
}