2017-01-06 3 views
4

Ich bin in Schwierigkeiten versucht, Zeilen aus Dataframe basierend auf zweispaltige Liste der Elemente zu filtern zu entfernen. Zum Beispiel für diesen Datenrahmen:Kann Array-Literal in spark/pyspark nicht erstellen

df = spark.createDataFrame([(100, 'A', 304), (200, 'B', 305), (300, 'C', 306)], ['number', 'letter', 'id']) 
df.show() 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 100|  A|304| 
| 200|  B|305| 
| 300|  C|306| 
+------+------+---+ 

ich leicht Reihen entfernen isin auf einer Spalte:

df.where(~col('number').isin([100, 200])).show() 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 300|  C|306| 
+------+------+---+ 

Aber wenn ich versuche, sie durch zwei Spalten zu entfernen erhalte ich eine Ausnahme:

df.where(~array('number', 'letter').isin([(100, 'A'), (200, 'B')])).show() 

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit. 
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [100, A] 
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57) 
    at org.apache.spark.sql.functions$.lit(functions.scala:101) 
    at org.apache.spark.sql.functions.lit(functions.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 

Nach einigen Untersuchungen erkannte ich, dass die Ursache des Problems Literale von nicht-primitiven Typen erstellt. Ich habe versucht, den folgenden Code in pyspark:

lit((100, 'A')) 
lit([100, 'A']) 

und folgende Unterlagen in scala-Funke:

lit((100, "A")) 
lit(List(100, "A")) 
lit(Seq(100, "A")) 
lit(Array(100, "A")) 

aber ohne Glück ... Kennt jemand die Art und Weise Arrayliteral in Funken/pyspark zu erstellen ? Oder gibt es eine andere Methode, um Datenrahmen um zwei Spalten zu filtern?

Antwort

1

Zunächst einmal werden Sie wahrscheinlich struct nicht arrays wollen. Denken Sie daran, dass Spark SQL keine heterogenen Arrays unterstützt, daher wird array(1, 'a') an array<string> übergeben.

So Abfrage könnte wie folgt aussehen:

choices = [(100, 'A'), (200, 'B')] 

target = [ 
    struct(
     lit(number).alias("number").cast("long"), 
     lit(letter).alias("letter").cast("string")) 
    for number, letter in choices] 

query = struct("number", "letter").isin(target) 

Diese gültigen Ausdruck zu erzeugen scheint:

query 
Column<b'(named_struct(NamePlaceholder(), number, NamePlaceholder(), letter) IN (named_struct(col1, CAST(100 AS `number` AS BIGINT), col2, CAST(A AS `letter` AS STRING)), named_struct(col1, CAST(200 AS `number` AS BIGINT), col2, CAST(B AS `letter` AS STRING))))'> 

Aber aus irgendeinem Grunde nicht auf Analysator:

df.where(~query) 
AnalysisException       Traceback (most recent call last) 
... 
AnalysisException: "cannot resolve '(named_struct('number', `number`, 'letter', `letter`) IN (named_struct('col1', CAST(100 AS BIGINT), 'col2', CAST('A' AS STRING)), named_struct('col1', CAST(200 AS BIGINT), 'col2', CAST('B' AS STRING))))' due to data type mismatch: Arguments must be same type;;\n'Filter NOT named_struct(number, number#0L, letter, letter#1) IN (named_struct(col1, cast(100 as bigint), col2, cast(A as string)),named_struct(col1, cast(200 as bigint), col2, cast(B as string)))\n+- LogicalRDD [number#0L, letter#1, id#2L]\n" 

Merkwürdigerweise mit SQL folgenden versagt auch:

df.createOrReplaceTempView("df") 

spark.sql("SELECT * FROM df WHERE struct(letter, letter) IN (struct(CAST(1 AS bigint), 'a'))") 
AnalysisException: "cannot resolve '(named_struct('letter', df.`letter`, 'letter', df.`letter`) IN (named_struct('col1', CAST(1 AS BIGINT), 'col2', 'a')))' due to data type mismatch: Arguments must be same type; line 1 pos 46;\n'Project [*]\n+- 'Filter named_struct(letter, letter#1, letter, letter#1) IN (named_struct(col1, cast(1 as bigint), col2, a))\n +- SubqueryAlias df\n  +- LogicalRDD [number#0L, letter#1, id#2L]\n" 

aber wenn sie mit Literalen auf beiden Seiten ersetzt:

spark.sql("SELECT * FROM df WHERE struct(CAST(1 AS bigint), 'a') IN (struct(CAST(1 AS bigint), 'a'))") 
DataFrame[number: bigint, letter: string, id: bigint] 

funktioniert gut, so dass es wie ein Fehler aussieht.

Das heißt links anti wird kommen sollte hier gut funktionieren:

from pyspark.sql.functions import broadcast 

df.join(
    broadcast(spark.createDataFrame(choices, ("number", "letter"))), 
    ["number", "letter"], 
    "leftanti" 
) 
+------+------+---+ 
|number|letter| id| 
+------+------+---+ 
| 300|  C|306| 
+------+------+---+ 
1

Um eine Arrayliteral in Funken Sie einen Array aus einer Reihe von Spalten erstellen müssen schaffen, in dem eine Spalte aus der lit Funktion erstellt:

scala> array(lit(100), lit("A")) 
res1: org.apache.spark.sql.Column = array(100, A)