Ich versuche, Spark zu verwenden, um Daten zu verarbeiten, die von HBase-Tabellen stammen. This blog post gibt ein Beispiel für die Verwendung von NewHadoopAPI
zum Lesen von Daten aus einem beliebigen Hadoop InputFormat
.Anreichern von SparkContext ohne Serialisierungsprobleme
Was ich
getan Da ich brauche dies viele Male zu tun, ich hatte versucht, implicits zu verwenden SparkContext
zu bereichern, so dass ich eine RDD aus einer gegebenen Menge von Spalten in HBase erhalten kann. Ich habe die folgenden Helfer geschrieben:
trait HBaseReadSupport {
implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)
implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}
final class HBaseSC(sc: SparkContext) extends Serializable {
def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
data map { case (cf, columns) =>
val content = columns map { column =>
val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)
column -> interpret(CellUtil.cloneValue(cell))
} toMap
cf -> content
}
def makeConf(table: String) = {
val conf = HBaseConfiguration.create()
conf.setBoolean("hbase.cluster.distributed", true)
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, table)
conf
}
def hbase[A](table: String, data: Map[String, List[String]])
(interpret: Array[Byte] => A) =
sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
Bytes.toString(key.get) -> extract(data, row, interpret)
}
}
Es können wir eine RDD bekommen von (String, Map[String, Map[String, String]])
In diesem Fall wie
val rdd = sc.hbase[String](table, Map(
"cf" -> List("col1", "col2")
))
verwendet werden, wobei die erste Komponente der RowKey ist und die zweite ist eine Karte, deren Schlüssel sind Spaltenfamilien und die Werte sind Zuordnungen, deren Schlüssel Spalten sind und deren Inhalt die Zellenwerte sind.
Wo es
Leider versagt, so scheint es, dass meine Arbeit einen Verweis auf sc
wird, die sich durch Design nicht serialisierbar ist. Was bekomme ich, wenn ich den Job ausgeführt ist
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
kann ich die Hilfsklassen entfernen und die gleiche Logik inline in meinem Job verwenden und alles läuft gut. Aber ich möchte etwas bekommen, das ich wiederverwenden kann, anstatt immer wieder das gleiche Muster zu schreiben.
Übrigens ist das Problem nicht spezifisch für implizit, auch wenn eine Funktion von sc
das gleiche Problem aufweist.
Zum Vergleich wurden die folgenden Helfer TSV-Dateien zu lesen (ich weiß, es funktioniert nicht, da es nicht zu zitieren unterstützt und so weiter, egal) scheint gut zu funktionieren:
trait TsvReadSupport {
implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}
final class TsvRDD(val sc: SparkContext) extends Serializable {
def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
val contents = line.split(separator).toList
(fields, contents).zipped.toMap
}
}
Wie kann ich kapseln die Logik, Zeilen aus HBase zu lesen, ohne den SparkContext unbeabsichtigt zu erfassen?
Ich habe eine Antwort hier: http://stackoverflow.com/questions/23619775/spark-serialization- Fehler/23628930 # 23628930 – samthebest