2017-04-06 7 views
0

Dies ist einfach "wie" Frage :: Wir können Daten in Spark-Umgebung durch com.databricks.spark.csv bringen. Ich weiß, wie HBase-Tabelle durch Funke erstellen und Daten manuell in die HBase-Tabellen schreiben. Aber ist das überhaupt möglich, um einen Text/csv/jason Dateien direkt über Spark zu HBase zu laden? Ich kann niemanden darüber reden sehen. Also, einfach nachschauen. Wenn möglich, leite mich bitte auf eine gute Webseite, die den Scala-Code im Detail erklärt, um es zu erledigen.laden CSV-Datei zu HBase durch Spark

Danke,

Antwort

0

Es gibt mehrere Möglichkeiten, wie Sie das tun können.

  1. Funken Hbase Stecker:

https://github.com/hortonworks-spark/shc

Sie können viele Beispiele auf den Link sehen.

  1. Sie können auch den SPark-Core verwenden, um die Daten mithilfe von HbaseConfiguration in Hbase zu laden.

Codebeispiel:

val fileRDD = sc.textFile(args(0), 2) 
    val transformedRDD = fileRDD.map { line => convertToKeyValuePairs(line) } 

    val conf = HBaseConfiguration.create() 
    conf.set(TableOutputFormat.OUTPUT_TABLE, "tableName") 
    conf.set("hbase.zookeeper.quorum", "localhost:2181") 
    conf.set("hbase.master", "localhost:60000") 
    conf.set("fs.default.name", "hdfs://localhost:8020") 
    conf.set("hbase.rootdir", "/hbase") 

    val jobConf = new Configuration(conf) 
    jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName) 
    jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName) 
    jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName) 

    transformedRDD.saveAsNewAPIHadoopDataset(jobConf) 



def convertToKeyValuePairs(line: String): (ImmutableBytesWritable, Put) = { 

    val cfDataBytes = Bytes.toBytes("cf") 
    val rowkey = Bytes.toBytes(line.split("\\|")(1)) 
    val put = new Put(rowkey) 

    put.add(cfDataBytes, Bytes.toBytes("PaymentDate"), Bytes.toBytes(line.split("|")(0))) 
    put.add(cfDataBytes, Bytes.toBytes("PaymentNumber"), Bytes.toBytes(line.split("|")(1))) 
    put.add(cfDataBytes, Bytes.toBytes("VendorName"), Bytes.toBytes(line.split("|")(2))) 
    put.add(cfDataBytes, Bytes.toBytes("Category"), Bytes.toBytes(line.split("|")(3))) 
    put.add(cfDataBytes, Bytes.toBytes("Amount"), Bytes.toBytes(line.split("|")(4))) 
    return (new ImmutableBytesWritable(rowkey), put) 
    } 
  1. Sie können auch diese eine

https://github.com/nerdammer/spark-hbase-connector

verwenden