2016-09-05 2 views
0

Ist es möglich, Multi-Level-Aggregationsabfragen dynamisch mithilfe von elastic4s DSL durchzuführen?Dynamische Aggregationsabfragen für elasticsearch mit elastic4s-Client erstellen

mit HTTP-Client seinem unkomplizierten

 
multiLevelAggregation 

Input: Fields[0..N] 
Output: Data grouped by field tuple 

Steps: 
1. Build multilevel elasticsearch aggregation query (JSON) 
2. Execute query on elasticsearch server 
3. Flatten result and return 

Aber wie dies mit elastic4s oder Java-Client zu tun.

Antwort

1

Nachdem ich mein Problem sorgfältig verstanden habe, gelang es mir, eine Lösung zu finden, zunächst dachte ich, dies wäre eine Einschränkung von elastic4s, aber es ist nicht so, es ist einfach, Multi-Feld Aggregation Abfragen über elastic4s Client zu erstellen, hier ist meine Lösung

//For building aggregation query 
def buildAgg(groups: Seq[String])(leafAggBuilder:() => AbstractAggregationDefinition): AbstractAggregationDefinition = { 
    groups match { 
    case x :: xs => aggregation.terms("termAgg").field(x).aggregations(buildAgg(xs)(leafAggBuilder)) 
    case Nil => leafAggBuilder() 
    } 
} 

//An example leaf aggregation builder 
def buildLeafAgg(aggFuncInfo: Pair[String, String])(): AbstractAggregationDefinition = { 
    aggFuncInfo._1 match { 
    case "avg" => aggregation.avg("aggFunc").field(aggFuncInfo._2) 
    case "sum" => aggregation.sum("aggFunc").field(aggFuncInfo._2) 
    case "cardinality" => aggregation.cardinality("aggFunc").field(aggFuncInfo._2) 
    case _ => aggregation.count("aggFunc").field(aggFuncInfo._2) 
    } 
} 

//For parsing aggregation 
def parseAgg[T](groups: Seq[String], agg: Aggregation, allGroups: Seq[String])(leafAggParser: (Seq[String], Aggregation) => Seq[T]): Seq[T] = { 
    groups match { 
    case x :: xs => { 
     val groupAggs = agg.asInstanceOf[StringTerms].getBuckets.asScala.toList 
     (for { 
     groupAgg <- groupAggs 
     aa = groupAgg.getAggregations.asList.asScala.head 
     gkey = groupAgg.getKeyAsString 
     gacc = allGroups :+ gkey 
     } yield parseAgg(xs, aa, gacc)(leafAggParser)).flatten 
    } 

    case Nil => { 
     leafAggParser(allGroups, agg) 
    } 
    } 
} 

//An example leaf aggregation parser 
def parseSimpleLeafAgg(allGroups: Seq[String], agg: Aggregation): Seq[GroupStats] = { 
    val value = agg.asInstanceOf[InternalNumericMetricsAggregation.SingleValue].value() 
    val groupId = allGroups.mkString(".") 
    Seq(GroupStats(groupId, value)) 
} 

//Usage: Build Query and Parse result 
def groupStats(groupFields: Seq[String]): Seq[GroupStats] = { 
    val resp = client.execute { 
    def leafPlainAggBuilder = buildLeafAgg(("count", "V1")) _ 
    search(esIndex).size(0).aggregations(buildAgg(groupFields)(leafPlainAggBuilder)) 
    }.await 
    //get the root aggregation 
    val agg = resp.aggregations.asList().asScala.head 
    def leafAggParser = parseSimpleLeafAgg _ 
    val res = parseAgg(groupFields, agg, Seq())(leafAggParser) 
    res 
} 

Verwandte Themen