2017-03-13 3 views
0

Ich habe eine Reihe von Filtern, die ich auf einen Datenrahmen in Spark anwenden muss, aber es ist zuerst zur Laufzeit ich weiß, welche Filter für Benutzer. Ich füge sie zur Zeit in einzelnen filter Funktionen, aber das schlägt fehl, wenn eine des filtes nichtBeliebig viele Filter auf Dataframe

myDataFrame 
    .filter(_filter1) 
    .filter(_filter2) 
    .filter(_filter3)... 

definiert ist, kann ich wirklich nicht herausfinden, wie man zur Laufzeit dynamisch ausschließen fx _filter2 wenn das nicht benötigt wird?

Soll ich es tun, indem ein großer Filter erstellen:

var filter = _filter1 
if (_filter2 != null) 
    filter = filter.and(_filter2) 
... 

Oder gibt es ein gutes Muster für das Spark, die ich gefunden habe, nicht wahr?

Antwort

1

Eine mögliche Lösung ist es, alle filters-lit(true) auf Standard:

import org.apache.spark.sql.functions._ 

val df = Seq(1, 2, 3).toDF("x") 

val filter_1 = lit(true) 
val filter_2 = col("x") > 1 
val filter_3 = lit(true) 

val filtered = df.filter(filter_1).filter(filter_2).filter(filter_3) 

Dies wird null aus Ihrem Code behalten und trivial wahr Prädikate werden zurückgeschnitten werden d von dem Ausführungsplan:

filtered.explain 
== Physical Plan == 
*Project [value#1 AS x#3] 
+- *Filter (value#1 > 1) 
    +- LocalTableScan [value#1] 

Sie können natürlich machen es noch einfacher, und eine Folge von Prädikaten:

import org.apache.spark.sql.Column 

val preds: Seq[Column] = Seq(lit(true), col("x") > 1, lit(true)) 
df.where(preds.foldLeft(lit(true))(_ and _)) 

und, wenn es richtig umgesetzt, überspringen Platzhalter vollständig.

0

Warum nicht:

var df = // load 
if (_filter2 != null) { 
    df = df.filter(_filter2) 
} 
etc 

Alternativ können Sie eine Liste der Filter erstellen:

var df = // load 
val filters = Seq (filter1, filter2, filter3, ...) 
filters.filter(_ != null).foreach (x => df = df.filter(x)) 

Leider // wenn es einige Fehler im Code, es ist mehr eine Idee - zur Zeit kann ich nicht Testcode

1

zuerst würde ich von null Filter loszuwerden:

val filters:List[A => Boolean] = nullableFilters.filter(_!=null) 

Dann Funktion Ketten Filter definieren:

def chainFilters[A](filters:List[A => Boolean])(v:A) = filters.forall(f => f(v)) 

Jetzt können Sie einfach Filter anwenden, um Ihre df:

df.filter(chainFilters(nullableFilters.filter(_!=null)) 
Verwandte Themen