2016-07-25 9 views
0

Betrachten Sie das folgende BeispielPySpark: Filter basierend auf resultierende Abfrage ohne zusätzlichen Datenrahmen

>>> l = [("US","City1",125),("US","City2",123),("Europe","CityX",23),("Europe","CityY",17)] 
>>> print l 
[('US', 'City1', 125), ('US', 'City2', 123), ('Europe', 'CityX', 23), ('Europe', 'CityY', 17)] 

>>> sc = SparkContext(appName="N") 
>>> sqlsc = SQLContext(sc) 
>>> df = sqlsc.createDataFrame(l) 
>>> df.printSchema() 
root 
|-- _1: string (nullable = true) 
|-- _2: string (nullable = true) 
|-- _3: long (nullable = true) 
>>> df.registerTempTable("t1") 
>>> rdf=sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show() 
+------+---+ 
| _1|_c1| 
+------+---+ 
| US|248| 
|Europe| 40| 
+------+---+ 
>>> rdf.printSchema() 
root 
|-- _1: string (nullable = true) 
|-- _c1: long (nullable = true) 
>>> rdf.registerTempTable("t2") 
>>> sqlsc.sql("Select * from t2 where _c1 > 200").show() 
+---+---+ 
| _1|_c1| 
+---+---+ 
| US|248| 
+---+---+ 

Also im Grunde versuche ich, alle zu finden, die _3 (die Bevölkerung zu einem gewissen Dienst abonniert werden kann), die in über die Schwelle ist Jedes Land. In der obigen Tabelle, gibt es einen zusätzlichen Datenrahmen erstellt wird (rdf)

Nun Wie beseitigen wir die rdf Datenrahmen und betten die komplette Abfrage innerhalb df Datenrahmen selbst.

habe ich versucht, aber pyspark wirft Fehler

>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1").show() 
+------+---+ 
| _1|_c1| 
+------+---+ 
| US|248| 
|Europe| 40| 
+------+---+ 
>>> sqlsc.sql("Select _1,sum(_3) from t1 group by _1 where _c1 > 200").show() 
Traceback (most recent call last): 
File "/ghostcache/kimanjun/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o28.sql. 
: java.lang.RuntimeException: [1.39] failure: ``union'' expected but `where' found 

Antwort

1

Hier ist eine Lösung ohne Art von temporären Tabellen:

#Do this to don't have conflict with sum in built-in spark functions 
from pyspark.sql import sum as _sum 

gDf = df.groupBy(df._1).agg(_sum(df._3).alias('sum')) 
gDf.filter(gDf.sum > 200).show() 

Diese Lösung wir einen Weg Gruppe und Aggregat mit einer Summe haben. Um sicherzustellen, dass Sie keine Probleme mit der Summe haben. Ist besser, den Filter in einem anderen Objekt.

Ich empfehle Ihnen diese link zu sehen, einige nützliche Möglichkeiten viel mächtiger als die Verwendung von Direct SQL im Datenrahmen.

Verwandte Themen