写入索引
同步方式
BulkRequest request = new BulkRequest();
request.add(new IndexRequest(index).type("_doc").source(XContentType.JSON, "message", UUID.randomUUID().toString()));
request.add(new IndexRequest(index).type("_doc").source(XContentType.JSON, "message", UUID.randomUUID().toString()));
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
BulkRequest 可选的参数
- 超时时间
提供字符串或TimeValue
方式设置bulk的等待超时时间
request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
- 刷新策略
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
策略 | 含义 | 默认 |
---|---|---|
NONE | 不刷新 | 默认 |
IMMEDIATE | 立即强制刷新,此刷新策略不适用于生产环境高频率写入,常用于测试 | |
WAIT_UNTIL | 写入请求在刷新完成前等待,刷新完成后结束。 |
- 设置pipeline
request.pipeline("pipelineId");
- 设置routing
request.routing("routingId");
异步方式
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
异步方法不会阻塞并立即返回。一旦完成,如果执行成功完成,将使用onResponse方法调用ActionListener;如果执行失败,则使用onFailure方法调用Action监听器。故障场景和预期异常与同步执行情况相同。
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
Bulk 响应结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
响应失败处理
- 至少有一个失败
if (bulkResponse.hasFailures()) {
}
- 失败条目处理
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}