Skip to main content

写入索引

同步方式

BulkRequest request = new BulkRequest();
request.add(new IndexRequest(index).type("_doc").source(XContentType.JSON, "message", UUID.randomUUID().toString()));
request.add(new IndexRequest(index).type("_doc").source(XContentType.JSON, "message", UUID.randomUUID().toString()));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

BulkRequest 可选的参数

  • 超时时间

提供字符串或TimeValue方式设置bulk的等待超时时间

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");
  • 刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
策略含义默认
NONE不刷新默认
IMMEDIATE立即强制刷新,此刷新策略不适用于生产环境高频率写入,常用于测试
WAIT_UNTIL写入请求在刷新完成前等待,刷新完成后结束。
  • 设置pipeline
request.pipeline("pipelineId"); 
  • 设置routing
request.routing("routingId");

异步方式

client.bulkAsync(request, RequestOptions.DEFAULT, listener); 

异步方法不会阻塞并立即返回。一旦完成,如果执行成功完成,将使用onResponse方法调用ActionListener;如果执行失败,则使用onFailure方法调用Action监听器。故障场景和预期异常与同步执行情况相同。

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {

}

@Override
public void onFailure(Exception e) {

}
};

Bulk 响应结果

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
DocWriteResponse itemResponse = bulkItemResponse.getResponse();

switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}

响应失败处理

  • 至少有一个失败
if (bulkResponse.hasFailures()) { 

}
  • 失败条目处理
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}