2016-08-04 9 views
3

So Throught lerne ich Apache nehmen Daten von Elasticsearch Spark. Angenommen, ich habe eine Verbindung zu ElasticSearch mit dem Benutzerindex hergestellt.Spark: Stark optimierte verbinden mit Elasticsearch Index

sqlContext = SQLContext(sc) 
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user') 

erklären (usersES) zeigt mir diesen:

== == Physical-Plan

Scan ElasticsearchRelation (Karte (es.nodes -> mynode, es.resource -> Benutzer/Anwender), org.apache.spark.sql.SQLContext @ 6c78e806, None) [über # 145, # 146 Aktivitäten, bdate 147 #, uid # 148]

Wenn ich Filter:

usersES.filter(usersES.uid==1566324).explain() 

== Physical-Plan == Filter (uid # 203L = 1.566.324) + - Scan ElasticsearchRelation (Map (es.nodes -> mynode, es.resource -> Benutzer/Benutzer), org.apache.spark.sql.SQLContext @ 6c78e806, Keine) [ca. # 145, Aktivitäten # 146, Datum # 147, uid # 148] PushedFilters: [EqualTo (UID, 1566324)]

Wie Sie sehen, Funken elegant schiebt den Filter Elasticsearch, so dass der Index-Suche ein schnell d gemütlich.

aber wenn ich versuche usersES mit einem anderen Datenrahmen verbinden, bekomme ich das gleiche Problem die ganze Zeit: Spark-Scans durch den gesamten Elasticsearch Index, keine Filter drängen ich es geben. Zum Beispiel:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF() 
a.join(usersES, usersES.uid==a.id).explain() 

zeigt:

SortMergeJoin [id # 210L], [uid 203L #]: - Sortierung [id # 210L ASC], falsch, 0: + - TungstenExchange hashpartitioning (id # 210L, 200), None: + - ConvertToUnsafe: + - Scan ExistingRDD [id # 210L] + - Sortierung [uid # 203L ASC], false, 0 + - TungstenExchange hashpartitioning (uid # 203L, 200), keine + - ConvertToUnsafe + - Scan ElasticsearchRelation (Map (es.nodes -> mynode, es.resource -> Benutzer s/user), org.apache.spark.sql.SQLContext @ 6c78e806, None) [etwa 145 #, Aktivitäten # 146, # 147 bdate, uid # 148]

Bitte, sagen Sie mir, das ist möglich Filter innerhalb Elasticsearch innerhalb des Joins drücken?

Antwort

2

Dies ist ein erwartetes Verhalten, unterstützt ja elaticsearch-hadoop Anschluss Pushdown- Prädikat aber es gibt keinen Push, wenn Sie beitreten. Diese

ist, weil der Betrieb verbinden weiß nichts darüber, wie die Schlüssel aufgeteilt werden in Ihrem Datenrahmen.

Standardmäßig werden bei dieser Operation alle Schlüssel beider Datagramme gehackt, wobei alle Elemente mit demselben Schlüsselhash über das Netzwerk an dieselbe Maschine gesendet werden. Anschließend werden die Elemente mit demselben Schlüssel auf dieser Maschine zusammengefügt.

Und deshalb erhalten Sie diesen Ausführungsplan ohne das Prädikat gedrückt zu werden.

EDIT: Es scheint, wie der Stecker seit der Version 2.1 derIN Klausel unterstützt. Sie sollten das verwenden, wenn Ihr DataFrame ein nicht groß ist.

+0

Dank! Scheint so, als ob dies die einzige Möglichkeit wäre, obwohl sie nicht meinen Bedürfnissen entspricht (tausende von Aufzeichnungen in df zu py innerhalb des IN) –

+0

Nun, der einzige Weg besteht darin, funke anstelle der In-Klausel zu verwenden, oder Sie können sogar eine Broadcast-Variable verwenden . – eliasah

+0

Können Sie bitte die Antwort zumindest akzeptieren, um das Problem zu schließen? – eliasah

Verwandte Themen