Skip to main content

PipelineAgg介绍

Pipeline Aggregation Overvew

管道聚合:让上一步聚合的结果作为下一个聚合的输入,类似stream流的操作,有许多不同类型的管道聚合,每个聚合计算的信息与其他聚合不同,但这些类型可以分为两类:

Parent (父级)

父级聚合的输出提供了一组管道聚合,它可以计算新的存储桶或新的聚合以添加到现有存储桶中

Sibling(同级)

同级聚合的输出提供管道,并且能够计算与该同级聚合处于同一级别的新聚合。

info

由于管道聚合仅添加到输出中,因此在链接管道聚合时,每个管道聚合的输出都将包含在最终输出中

buckets_path 语法

大多数管道聚合需要另一个聚合作为其输入。输入聚合通过buckets_path参数定义,该参数遵循特定格式:

AGG_SEPARATOR       =  '>' ;
METRIC_SEPARATOR = '.' ;
AGG_NAME = <the name of the aggregation> ;
METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ;
PATH = <AGG_NAME> [ <AGG_SEPARATOR>, <AGG_NAME> ]* [ <METRIC_SEPARATOR>, <METRIC> ] ;

例如,路径“my_bucket>my_stats.avg”将指向“my_stats”metric中的avg值,该metric包含在“my_bBucket”桶聚合中。

路径与管道聚合的位置是相对的;它们不是绝对路径,路径不能返回到聚合树的“上方”。例如,此移动平均值嵌入到date_histogram中,并引用sibling metric the_sum

POST /_search
{
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"timestamp",
"interval":"day"
},
"aggs":{
"the_sum":{
"sum":{ "field": "lemmings" }
},
"the_movavg":{
"moving_avg":{ "buckets_path": "the_sum" }
}
}
}
}
}
  1. metric为“the_sum”
  2. buckets_path通过相对路径“the_sum”引用metric

buckets_path也用于Sibling管道聚合,其中聚合是一系列桶的“next”,而不是嵌入在它内部。例如,max_bucket聚合使用buckets_path指定嵌入同级聚合中的metric:

POST /_search
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"max_monthly_sales": {
"max_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}

bucket_path指示此max_bucket聚合,我们希望取出sales_permonth日期直方图中的销售聚合的最大值。

Special Paths

buckets_path可以使用一个特殊的“_count”路径,而不是指向metric。这指示管道聚合使用文档计数作为其输入。例如,可以根据每个bucket的文档计数计算移动平均值。

POST /_search
{
"aggs": {
"my_date_histo": {
"date_histogram": {
"field":"timestamp",
"interval":"day"
},
"aggs": {
"the_movavg": {
"moving_avg": { "buckets_path": "_count" }
}
}
}
}
}

通过使用_count而不是metric名称,我们可以计算直方图中文档计数的移动平均值

buckets_path还可以使用“_bucket_count”和多桶聚合的路径来使用该聚合在管道聚合中返回的桶数,而不是metric。例如,这里可以使用bucket_selector来过滤不包含用于内部项聚合的桶的桶:

POST /sales/_search
{
"size": 0,
"aggs": {
"histo": {
"date_histogram": {
"field": "date",
"interval": "day"
},
"aggs": {
"categories": {
"terms": {
"field": "category"
}
},
"min_bucket_selector": {
"bucket_selector": {
"buckets_path": {
"count": "categories._bucket_count"
},
"script": {
"source": "params.count != 0"
}
}
}
}
}
}
}

通过使用_bucket_count而不是metric名称,我们可以过滤掉不包含类别聚合桶的历史桶

Dealing with dots in agg names

支持另一种语法来处理名称中有点的聚合或metric,例如99.9%。该指标可称为:

"buckets_path": "my_percentile[99.9]"

Dealing with gaps in the data

现实世界中的数据常常是嘈杂的,有时还存在差距 — 数据根本不存在的地方。出现这种情况的原因多种多样,最常见的是:

  • 落入bucket的文档不包含必填字段
  • 没有与一个或多个bucket的查询匹配的文档
  • 正在计算的metric值无法生成值,可能是因为另一个从属bucket缺少值。某些管道聚合具有必须满足的特定要求(例如,导数无法计算第一个值的metric,因为没有以前的值,HoltWinters移动平均值需要“预热”数据才能开始计算,等等)

Gap策略是一种机制,用于在遇到“gap”或丢失数据时通知管道聚合所需的行为。所有管道聚合都接受gap_policy参数。目前有两种差距政策可供选择:

skip

此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。

insert_zeros

此选项将用零(0)替换缺失的值,管道聚合计算将照常进行。