6

Ich versuche, Spark 1.4 window functions in PYSPARK 1.4.1Warum scheitern Fensterfunktionen mit "Fensterfunktion X nimmt keine Rahmenspezifikation"?

zu verwenden, aber vor allem Fehler oder unerwartete Ergebnisse. Hier ist ein sehr einfaches Beispiel die ich denke, funktionieren soll:

from pyspark.sql.window import Window 
import pyspark.sql.functions as func 

l = [(1,101),(2,202),(3,303),(4,404),(5,505)] 
df = sqlContext.createDataFrame(l,["a","b"]) 

wSpec = Window.orderBy(df.a).rowsBetween(-1,1) 

df.select(df.a, func.rank().over(wSpec).alias("rank")) 
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification. 

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")) 
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; 


wSpec = Window.orderBy(df.a) 

df.select(df.a, func.rank().over(wSpec).alias("rank")) 
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected. 

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect() 

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)] 

Wie Sie sehen können, wenn ich rowsBetween Rahmenspezifikation hinzufügen, weder rank() noch lag/lead() Fensterfunktionen erkennt es: „Window-Funktion übernimmt keine Rahmenspezifikation ".

Wenn ich die rowsBetween Rahmenspezifikation bei Leas lag/lead() weglassen, werfen Sie keine Ausnahmen zurück, sondern unerwartet (für mich) Ergebnis: immer None. Und die rank() funktioniert immer noch nicht mit anderer Ausnahme.

Kann mir jemand helfen, meine Fensterfunktionen richtig zu machen?

UPDATE

In Ordnung, das als ein pyspark Fehler zu suchen beginnt. ich den gleichen Test in reinen Funken vorbereitet haben (Scala, funken Shell):

import sqlContext.implicits._ 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505)) 
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2)) 
val schemaString = "a b" 
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true))) 
val df = sqlContext.createDataFrame(rdd, schema) 

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val wSpec = Window.orderBy("a").rowsBetween(-1,1) 
df.select(df("a"), rank().over(wSpec).alias("rank")) 
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.; 

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) 
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; 


val wSpec = Window.orderBy("a") 
df.select(df("a"), rank().over(wSpec).alias("rank")).collect() 
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5]) 

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) 
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null]) 

Auch wenn die rowsBetween nicht in Scala angewendet werden, sowohl rank() und lag()/lead() Arbeit, wie ich erwarten, wenn rowsBetween weggelassen.

Antwort

3

Soweit ich es sagen kann gibt es zwei verschiedene Probleme. Die Fensterrahmendefinition wird von Hive GenericUDAFRank, GenericUDAFLag und GenericUDAFLead einfach nicht unterstützt. Daher sind Fehler, die Sie sehen, ein erwartetes Verhalten.

Bezüglich Problems mit dem folgenden Code PySpark

wSpec = Window.orderBy(df.a) 
df.select(df.a, func.rank().over(wSpec).alias("rank")) 

es aussieht wie es auf meine Frage https://stackoverflow.com/q/31948194/1560062 verwendet und soll durch SPARK-9978 angesprochen werden. So weit jetzt können Sie es funktionieren lassen, indem Sie die Fensterdefinition folgendermaßen ändern:

wSpec = Window.partitionBy().orderBy(df.a) 
Verwandte Themen