2014-05-19 6 views
9

Ich versuche, Spark zu verwenden, um Daten zu verarbeiten, die von HBase-Tabellen stammen. This blog post gibt ein Beispiel für die Verwendung von NewHadoopAPI zum Lesen von Daten aus einem beliebigen Hadoop InputFormat.Anreichern von SparkContext ohne Serialisierungsprobleme

Was ich

getan Da ich brauche dies viele Male zu tun, ich hatte versucht, implicits zu verwenden SparkContext zu bereichern, so dass ich eine RDD aus einer gegebenen Menge von Spalten in HBase erhalten kann. Ich habe die folgenden Helfer geschrieben:

trait HBaseReadSupport { 
    implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc) 

    implicit def bytes2string(bytes: Array[Byte]) = new String(bytes) 
} 


final class HBaseSC(sc: SparkContext) extends Serializable { 
    def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) = 
    data map { case (cf, columns) => 
     val content = columns map { column => 
     val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes) 

     column -> interpret(CellUtil.cloneValue(cell)) 
     } toMap 

     cf -> content 
    } 

    def makeConf(table: String) = { 
    val conf = HBaseConfiguration.create() 

    conf.setBoolean("hbase.cluster.distributed", true) 
    conf.setInt("hbase.client.scanner.caching", 10000) 
    conf.set(TableInputFormat.INPUT_TABLE, table) 

    conf 
    } 

    def hbase[A](table: String, data: Map[String, List[String]]) 
    (interpret: Array[Byte] => A) = 

    sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat], 
     classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) => 
     Bytes.toString(key.get) -> extract(data, row, interpret) 
     } 

} 

Es können wir eine RDD bekommen von (String, Map[String, Map[String, String]]) In diesem Fall wie

val rdd = sc.hbase[String](table, Map(
    "cf" -> List("col1", "col2") 
)) 

verwendet werden, wobei die erste Komponente der RowKey ist und die zweite ist eine Karte, deren Schlüssel sind Spaltenfamilien und die Werte sind Zuordnungen, deren Schlüssel Spalten sind und deren Inhalt die Zellenwerte sind.

Wo es

Leider versagt, so scheint es, dass meine Arbeit einen Verweis auf sc wird, die sich durch Design nicht serialisierbar ist. Was bekomme ich, wenn ich den Job ausgeführt ist

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) 

kann ich die Hilfsklassen entfernen und die gleiche Logik inline in meinem Job verwenden und alles läuft gut. Aber ich möchte etwas bekommen, das ich wiederverwenden kann, anstatt immer wieder das gleiche Muster zu schreiben.

Übrigens ist das Problem nicht spezifisch für implizit, auch wenn eine Funktion von sc das gleiche Problem aufweist.

Zum Vergleich wurden die folgenden Helfer TSV-Dateien zu lesen (ich weiß, es funktioniert nicht, da es nicht zu zitieren unterstützt und so weiter, egal) scheint gut zu funktionieren:

trait TsvReadSupport { 
    implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc) 
} 

final class TsvRDD(val sc: SparkContext) extends Serializable { 
    def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line => 
    val contents = line.split(separator).toList 

    (fields, contents).zipped.toMap 
    } 
} 

Wie kann ich kapseln die Logik, Zeilen aus HBase zu lesen, ohne den SparkContext unbeabsichtigt zu erfassen?

+0

Ich habe eine Antwort hier: http://stackoverflow.com/questions/23619775/spark-serialization- Fehler/23628930 # 23628930 – samthebest

Antwort

12

Fügen Sie einfach @transient Anmerkung zu sc Variable:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable { 
    ... 
} 

und stellen Sie sicher, sc nicht innerhalb extract Funktion verwendet wird, da es nicht auf die Arbeitnehmer zur Verfügung stehen wird.

Wenn es notwendig ist, von der Berechnung innerhalb der verteilten Spark-Kontext zuzugreifen, kann rdd.context Funktion verwendet werden:

val rdd = sc.newAPIHadoopRDD(...) 
rdd map { 
    case (k, v) => 
    val ctx = rdd.context 
    .... 
} 
+0

Vielen Dank !!! Ich hätte es nie erfahren! – Andrea

+1

Wenn ich auf Kontext innerhalb der Schließung referenziere, bekomme ich eine NotSerilizableException? Ich muss einen Kontext innerhalb einer Schließung verwenden, um auf eine Datei auf HDFS zuzugreifen. Wie kann dies erreicht werden? – VishAmdi

Verwandte Themen