2017-12-29 18 views
1

Ich habe ein Problem bei der Verwendung von Lambda-Funktionen auf Filtern und Karten von typisierten Datasets in Java Spark-Anwendungen.Spark CSV - Keine anwendbare Konstruktor/Methode für die tatsächlichen Parameter gefunden

Ich erhalte dieses Laufzeitfehler

ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 130, Column 126: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public static java.sql.Date org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(int)" 

ich die unten Klasse verwende und 2.2.0 entfachen. Voll Beispiel mit Beispieldaten ist in https://gitlab.com/opencell/test-bigdata

Dataset<CDR> cdr = spark 
      .read() 
      .format("csv") 
      .option("header", "true") 
      .option("inferSchema", "true") 
      .option("delimiter", ";") 
      .csv("CDR_SAMPLE.csv") 
      .as(Encoders.bean(CDR.class)); 

    long v = cdr.filter(x -> (x.timestamp != null && x.getAccess().length()>0)).count(); 

    System.out.println("validated entries :" + v); 

CDR-Datei Definition verfügbar ist gitlab link

EDIT

val cdrCSVSchema = StructType(Array(
    StructField("timestamp", DataTypes.TimestampType), 
    StructField("quantity", DataTypes.DoubleType), 
    StructField("access", DataTypes.StringType), 
    StructField("param1", DataTypes.StringType), 
    StructField("param2", DataTypes.StringType), 
    StructField("param3", DataTypes.StringType), 
    StructField("param4", DataTypes.StringType), 
    StructField("param5", DataTypes.StringType), 
    StructField("param6", DataTypes.StringType), 
    StructField("param7", DataTypes.StringType), 
    StructField("param8", DataTypes.StringType), 
    StructField("param9", DataTypes.StringType), 
    StructField("dateParam1", DataTypes.TimestampType), 
    StructField("dateParam2", DataTypes.TimestampType), 
    StructField("dateParam3", DataTypes.TimestampType), 
    StructField("dateParam4", DataTypes.TimestampType), 
    StructField("dateParam5", DataTypes.TimestampType), 
    StructField("decimalParam1", DataTypes.DoubleType), 
    StructField("decimalParam2", DataTypes.DoubleType), 
    StructField("decimalParam3", DataTypes.DoubleType), 
    StructField("decimalParam4", DataTypes.DoubleType), 
    StructField("decimalParam5", DataTypes.DoubleType), 
    StructField("extraParam", DataTypes.StringType))) 

und ich verwenden, um dieses Kommando das CSV-Dokument

zu laden
val cdr = spark.read.format("csv").option("header", "true").option("delimiter", ";").schema(cdrCSVSchema).csv("CDR_SAMPLE.csv") 

und dann versucht, diesen Befehl zu codieren und laufen Lambda-Funktion, aber ich bin immer noch Fehler

cdr.as[CDR].filter(c => c.timestamp != null).show 

Antwort

0

TL bekommen; DR das Schema explizit definieren, da die Eingabedatei keine Werte müssen Typen von ableiten (für java.sql.Date Felder).

Für Ihren Fall nicht typisierten Dataset API könnte eine Lösung sein (vielleicht eine Abhilfe und ehrlich würde ich empfehlen, es nicht notwendig Deserialisierung von internen Zeilenformat zu vermeiden):

cdr.filter(!$"timestamp".isNull).filter(length($"access") > 0).count 

(Es ist Scala und ich Ich überlasse es, es als Heimübung nach Java zu übersetzen.

Das Problem ist, dass Sie inferSchema Option mit den meisten nicht verfügbaren Felder in der Eingabe CDR_SAMPLE.csv-Datei verwenden, die die meisten Felder vom Typ String (das ist der Standardtyp, wenn keine Werte verfügbar sind, um spezifischeren Typ zu schließen).

Das macht die Felder vom Typ java.sql.Date, d. H. dateParam1 bis zu dateParam5, vom Typ String.

import org.opencell.spark.model.CDR 
import org.apache.spark.sql.Encoders 
implicit val cdrEnc = Encoders.bean(classOf[CDR]) 
val cdrs = spark.read. 
    option("inferSchema", "true"). 
    option("delimiter", ";"). 
    option("header", true). 
    csv("/Users/jacek/dev/sandbox/test-bigdata/CDR_SAMPLE.csv") 
scala> cdrs.printSchema 
root 
|-- timestamp: timestamp (nullable = true) 
|-- quantity: integer (nullable = true) 
|-- access: string (nullable = true) 
|-- param1: string (nullable = true) 
|-- param2: string (nullable = true) 
|-- param3: string (nullable = true) 
|-- param4: string (nullable = true) 
|-- param5: string (nullable = true) 
|-- param6: string (nullable = true) 
|-- param7: string (nullable = true) 
|-- param8: string (nullable = true) 
|-- param9: string (nullable = true) 
|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 
|-- decimalParam1: string (nullable = true) 
|-- decimalParam2: string (nullable = true) 
|-- decimalParam3: string (nullable = true) 
|-- decimalParam4: string (nullable = true) 
|-- decimalParam5: string (nullable = true) 
|-- extraParam: string (nullable = true) 

Beachten Sie, dass die Felder von Interesse, das heißt dateParam1 zu dateParam5 sind alle Saiten.

|-- dateParam1: string (nullable = true) 
|-- dateParam2: string (nullable = true) 
|-- dateParam3: string (nullable = true) 
|-- dateParam4: string (nullable = true) 
|-- dateParam5: string (nullable = true) 

Die Ausgabe Flächen, wenn Sie „so tun, als“ die Art der Felder unterscheidet sich durch den Encoder, wie in CDR Klasse definiert, die sagt:

private Date dateParam1; 
private Date dateParam2; 
private Date dateParam3; 
private Date dateParam4; 
private Date dateParam5; 

, dass die Ursache des Problems ist. Es gibt einen Unterschied zwischen dem, was Spark aus der Klasse entnehmen konnte. Ohne die Umwandlung hätte der Code gearbeitet, aber da Sie bestand darauf, ...

cdrs.as[CDR]. // <-- HERE is the issue = types don't match 
    filter(cdr => cdr.timestamp != null). 
    show // <-- trigger conversion 

Es ist wirklich egal auf welchem ​​Gebiet Sie in filter Operator zugreifen. Das Problem ist, dass die Konvertierung stattfindet, die zu einer falschen Ausführung (und einer vollständigen Java-Code-Generierung) führt.

Ich bezweifle, Spark kann viel dagegen tun, seit Sie angefordert inferSchema mit einem Dataset ohne Werte für den Typ Inferenz verwenden. Am besten ist es, das Schema explizit zu definieren und den Operator schema(...) zum Festlegen zu verwenden.

+1

Bitte überprüfen Sie meine Bearbeitung, ich bekomme den gleichen Fehler ... –

Verwandte Themen