So Throught lerne ich Apache nehmen Daten von Elasticsearch Spark. Angenommen, ich habe eine Verbindung zu ElasticSearch mit dem Benutzerindex hergestellt.Spark: Stark optimierte verbinden mit Elasticsearch Index
sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')
erklären (usersES) zeigt mir diesen:
== == Physical-Plan
Scan ElasticsearchRelation (Karte (es.nodes -> mynode, es.resource -> Benutzer/Anwender), org.apache.spark.sql.SQLContext @ 6c78e806, None) [über # 145, # 146 Aktivitäten, bdate 147 #, uid # 148]
Wenn ich Filter:
usersES.filter(usersES.uid==1566324).explain()
== Physical-Plan == Filter (uid # 203L = 1.566.324) + - Scan ElasticsearchRelation (Map (es.nodes -> mynode, es.resource -> Benutzer/Benutzer), org.apache.spark.sql.SQLContext @ 6c78e806, Keine) [ca. # 145, Aktivitäten # 146, Datum # 147, uid # 148] PushedFilters: [EqualTo (UID, 1566324)]
Wie Sie sehen, Funken elegant schiebt den Filter Elasticsearch, so dass der Index-Suche ein schnell d gemütlich.
aber wenn ich versuche usersES mit einem anderen Datenrahmen verbinden, bekomme ich das gleiche Problem die ganze Zeit: Spark-Scans durch den gesamten Elasticsearch Index, keine Filter drängen ich es geben. Zum Beispiel:
a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()
zeigt:
SortMergeJoin [id # 210L], [uid 203L #]: - Sortierung [id # 210L ASC], falsch, 0: + - TungstenExchange hashpartitioning (id # 210L, 200), None: + - ConvertToUnsafe: + - Scan ExistingRDD [id # 210L] + - Sortierung [uid # 203L ASC], false, 0 + - TungstenExchange hashpartitioning (uid # 203L, 200), keine + - ConvertToUnsafe + - Scan ElasticsearchRelation (Map (es.nodes -> mynode, es.resource -> Benutzer s/user), org.apache.spark.sql.SQLContext @ 6c78e806, None) [etwa 145 #, Aktivitäten # 146, # 147 bdate, uid # 148]
Bitte, sagen Sie mir, das ist möglich Filter innerhalb Elasticsearch innerhalb des Joins drücken?
Dank! Scheint so, als ob dies die einzige Möglichkeit wäre, obwohl sie nicht meinen Bedürfnissen entspricht (tausende von Aufzeichnungen in df zu py innerhalb des IN) –
Nun, der einzige Weg besteht darin, funke anstelle der In-Klausel zu verwenden, oder Sie können sogar eine Broadcast-Variable verwenden . – eliasah
Können Sie bitte die Antwort zumindest akzeptieren, um das Problem zu schließen? – eliasah