2017-06-13 1 views
0

Ich habe Liste der org.apache.avro.generic.GenericRecord, avro schema diese nutzen wir brauchen dataframe mit Hilfe von SQLContext API erstellen, zu erstellen dataframe es braucht RDD von org.apache.spark.sql.Row und avro schema. Voraussetzung für die Erstellung von DF ist, dass wir RDD von org.apache.spark.sql.Row haben sollten, und es kann erreicht werden, indem man den folgenden Code verwendet, aber einige, wie es nicht funktioniert und Fehler, Beispielcode gibt.org.apache.avro.generic.GenericRecord org.apache.spark.sql.Row

1. Convert GenericRecord to Row 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 
    import org.apache.avro.Schema 
    import org.apache.spark.sql.types.StructType 
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] = 
    { 
     val fields = avroSchema.getFields 
     var rows = new Seq[Row] 
     for (avroRecord <- genericRecords) { 
     var avroFieldsSeq = Seq[Any](); 
     for (i <- 0 to fields.size - 1) { 
      avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name) 
     } 
     val avroFieldArr = avroFieldsSeq.toArray 
     val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType) 
     rows = rows :+ genericRow 
     } 
     return rows; 
    } 

2. Convert `Avro schema` to `Structtype` 
    Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType 

3. Create `Dataframe` using `SQLContext` 
    val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType) 
    val rowRdd = sc.parallelize(rowSeq, 1) 
    val finalDF =sqlContext.createDataFrame(rowRDD,structType) 

aber es ist ein Fehler bei der Erstellung von DataFrame werfen. Kann mir bitte jemand helfen, was oben im Code falsch ist. Abgesehen davon, wenn jemand eine andere Logik zum Konvertieren und Erstellen von dataframe hat.

Immer, wenn ich jede Aktion auf Datenrahmen aufrufen, wird es DAG ausführen und versuchen DF-Objekt zu erstellen, aber in dieser es mit unter Ausnahme als

ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1 
         at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) 
         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) 

Danach versagt Ich versuche, richtige Version Glas zu geben, im Glas Parameter von Funken vorlegen und mit anderen Parametern als --conf spark.driver.userClassPathFirst = true aber jetzt ist es mit MapR als

ERROR CLDBRpcCommonUtils: Exception during init 
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;) 
        at com.mapr.security.JNISecurity.SetClusterOption(Native Method) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73) 
        at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63) 
        at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69) 
        at java.lang.Class.forName0(Native Method) 

Versagen Wir MapR Verteilung und nach dem Unterricht Pfadänderung verwenden in Funken- reichen Sie es ein scheitert mit der obigen Ausnahme.

Kann jemand bitte hier helfen oder meine grundlegende Notwendigkeit es Avro GenericRecord in Spark Row zu konvertieren, damit ich Dataframe mit ihm erstellen kann, bitte helfen Sie
Danke.

+1

Was ist der genaue Fehler auf Zeile? und aktualisieren Sie die Frage bitte mit genericRecords sample, avroSchema. –

+0

@RameshMaharjan Treiber stacktrace: org.apache.spark.SparkException: Job aufgrund von Stage-Fehler abgebrochen: Task 0 in Stufe 0.0 ist 4 Mal fehlgeschlagen, letzter Fehler: Task 0.3 in Stufe 0.0 verloren (TID 3, hdpoc-c01-r03 -01, Executor 2): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; lokale Klasse inkompatibel: stream classdesc serialVersionUID = 2, lokale Klasse serialVersionUID = 1 bei java.io.ObjectStreamClass.initNonProxy (ObjectStreamClass.java:617) bei java.io.ObjectInputStream.readNonProxyDesc (ObjectInputStream.java:1622) –

+0

der Fehler sieht aus wie Versionsfehler zwischen der Quelle von Streaming-Daten und dem Konvertierungscode in Ihrem lokalen. Sie müssen dieselbe Version des FastDateFormat-Pakets verwenden, das die Quelle verwendet. Und bitte aktualisieren Sie den Fehler in der Frage, damit auch andere Ihnen helfen können. –

Antwort

Verwandte Themen