2016-08-13 2 views
1

Ich versuche, eine große Menge an Daten von Spark zu HBase zu laden. Ich verwende saveAsNewAPIHadoopDataset-Methode.Problem mit Objekt nicht serialisierbar Klasse: org.apache.hadoop.hbase.io.ImmutableBytesWritable Fehler

Ich erstelle ImmutableWritable und Put und speichern, die wie folgt erforderlich ist.

dataframe.mapPartitions { rows => 
     { 
     rows.map { eachRow => 
      { 
      val rowKey = Seq(eachRow.getAs[String]("uniqueId"), eachRow.getAs[String]("authTime")).mkString(",") 
      val put = new Put(Bytes.toBytes(rowKey)); 
      val fields = eachRow.schema.fields; 

      for (i <- 0 until fields.length) { 
       put.addColumn(userCF, Bytes.toBytes(fields(i).name), Bytes.toBytes(String.valueOf(eachRow.get(i)))) 
      } 

      (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put) 
      } 
     } 
     } 
    }.saveAsNewAPIHadoopDataset(job.getConfiguration) 

Meine Daten sind 30 GB wert und es ist in HDFS in 60 Dateien vorhanden.

Wenn ich den gleichen Job mit 10 Dateien gleichzeitig einreichen, ging alles gut.

Aber, wenn ich alles auf einmal einreiche, gibt es diesen Fehler. Der Fehler ist wirklich frustrierend und ich habe alles Mögliche versucht. Aber ich frage mich wirklich, was es geschafft hat, erfolgreich zu laufen, wenn die Daten 5 GB groß sind und was es zu einem Fehler führte, wenn es 30 GB ist.

Hat jemand mit dieser Art von Problemen konfrontiert.?

+0

Können Sie den kompletten Stack-Trace hier einfügen ...? –

+0

Wenn ich 40G Speicher pro Executor zugeteilt habe, wird der Job nah, aber wenn ich weniger Speicher zugewiesen habe, bin ich mit diesem Fehler konfrontiert. Das bedeutet, dass bei jedem Shuffle dieser Fehler auftritt. – Srini

Antwort

0

Das liegt daran, dass ImmutableBytesWritable nicht serialisierbar ist. Wenn es einen Shuffle gibt, versucht Apache Funke, es zu serialisieren, um es an einen anderen Knoten zu senden. Das gleiche würde passieren, wenn Sie versuchen würden, etwas zu nehmen oder es auf Fahrer zu sammeln.

Es gibt eigentlich nur zwei Ansätze.

  • Verwenden Sie es nicht, wenn Sie shuffling. Wenn Sie nur jeden Datensatz von der Festplatte in eine Datenbank einfügen müssen, ist ein Shuffling nicht erforderlich. Stellen Sie sicher, dass es ist. Wenn Sie Ihre Daten vorverarbeiten müssen, bevor sie in die Datenbank gelangen, bewahren Sie sie in einem anderen serialisierbaren Format auf und konvertieren Sie sie beim Speichern in das erforderliche Format.
  • Verwenden Sie einen anderen Serializer. Apache spark kommt mit Kryo (stelle sicher, dass du spark 2.0.0 verwendest - Kryo wurde dort aktualisiert und es behebt einige böse Nebenläufigkeitsfehler). Um es zu benutzen, müssen Sie es konfigurieren. Es ist nicht schwer, erfordert aber ein wenig Code.
+0

Hallo evgeni. Ja, ich habe Kryo probiert und einfach die Klasse serialisierbar gemacht, indem ich sie in die Liste der Klassen gestellt habe. Aber Kryo gab andere Serialisierungsprobleme für mich. Pricely, Index außerhalb der Grenzen Problem. Also habe ich keine andere Wahl, als es fallen zu lassen. Ich hoffe, dass die neue Spark-API die kryo-API verbessert hat. – Srini

+0

@Srini, welche Spark-Version verwenden Sie? Wenn vor 2.0.0, dann hat Kryo ein Problem, das Sie nicht umgehen können. Das könnte es sein. – evgenii

Verwandte Themen