2017-01-27 2 views
10

Ich versuche, mehrere Operationen in einer Zeile Code in pySpark, und nicht sicher, ob das für meinen Fall möglich ist.Aggregat Funktion Zähle Verwendung mit groupBy in Spark

Ich möchte die Ausgabe nicht als neuen Datenrahmen speichern.

Mein aktueller Code ist ziemlich einfach:

encodeUDF = udf(encode_time, StringType()) 
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME'))) 
    .groupBy('timePeriod') 
    .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"), 
    stddev('DOWNSTREAM_SIZE').alias("Stddev") 
) 
    .show(20, False) 

Und meine Absicht ist count() hinzuzufügen, nachdem groupBy verwenden, zu erhalten, na ja, die Anzahl der Datensätze jeden Wert von timePeriod Spalt übereinstimmt, gedruckt \ gezeigt als Ausgabe.

Beim Versuch, groupBy(..).count().agg(..) zu verwenden, erhalte ich Ausnahmen.

Gibt es eine Möglichkeit, beide count() und agg() .show() druckt zu erreichen, ohne Splitting Code auf zwei Zeilen von Befehlen, z.B. :

new_log_df.withColumn(..).groupBy(..).count() 
new_log_df.withColumn(..).groupBy(..).agg(..).show() 

Oder noch besser, für eine fusionierte Ausgabe agg.show() Ausgang bekommen - eine zusätzliche Spalte, die die gezählte Anzahl von Aufzeichnungen heißt es in der Zeile Wert entsprechen. z.B .:

timePeriod | Mean | Stddev | Num Of Records 
    X  | 10 | 20 | 315 
+0

'new_log_df.withColumn (..). GroupBy (..). Agg (Anzahl (1)). Show()'? – mrsrinivas

+0

Wofür steht '1' in' count (1) '? und kann ich den count() innerhalb von agg() zusammen mit anderen Begriffen verwenden, wie in meinem Code erwähnt? – Adiel

+0

Beim Versuch, 'agg (count (1), mean (..), stddev (..)) zu verwenden. Show()' i get ** NameError: Name 'count' ist nicht definiert ** – Adiel

Antwort

18

count() kann innerhalb agg() als groupBy Ausdruck derselben verwendet werden.

Mit Python

import pyspark.sql.functions as func 

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
    .groupBy("timePeriod") 
    .agg(
    func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
    func.stddev("DOWNSTREAM_SIZE").alias("Stddev"), 
    func.count(func.lit(1)).alias("Num Of Records") 
    ) 
    .show(20, False) 

pySpark SQL functions doc

Mit Scala

import org.apache.spark.sql.functions._ //for count() 

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
    .groupBy("timePeriod") 
    .agg(
    mean("DOWNSTREAM_SIZE").alias("Mean"), 
    stddev("DOWNSTREAM_SIZE").alias("Stddev"), 
    count(lit(1)).alias("Num Of Records") 
    ) 
    .show(20, false) 

count(1) werden die Datensätze nach ersten Spalte zählen, die zu count("timePeriod")

Mit Java gleich

Verwandte Themen