2016-09-29 4 views
3

Ich schreibe ein Projekt, um Daten von Kafka zu erhalten und in Hbase-Tabelle schreiben. Da ich das Differential der Datensätze wissen möchte, muss ich zuerst mit dem gleichen Zeilenschlüssel in Hbase aufnehmen und dann eine Subtraktion mit dem empfangenen Datensatz durchführen und schließlich neue Datensätze in der HBase-Tabelle speichern.Hbase-Daten in Spark-Streaming lesen

Am Anfang versuchte ich newAPIHadoop zu verwenden, um Daten von HBase zu erhalten. Hier ist mein Versuch:

val conf = HBaseConfiguration.create() 
conf.set("zookeeper.znode.parent", "/hbase-secure") 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
conf.set("hbase.zookeeper.quorum", zkQuorum) 
conf.set("hbase.master", masterAddr) 
conf.set("hbase.zookeeper.property.clientPort", portNum) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName) 

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
     classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

Auf diese Weise, ich bin in der Lage, die Werte der Datensätze mit bestimmten Spalte Familie und Spaltennamen NUR EINMAL zu bekommen. Ich sage nur einmal, ich meine jedes Mal, wenn ich meine Spark-Streaming-Anwendung starte, wird dieser Codeschnipsel ausgeführt und ich kann einen Wert erhalten, aber er wird nicht mehr ausgeführt. Weil ich jedes Mal, wenn ich einen Datensatz von Kafka erhalte, meine Datensätze aus HBase mit cf und column lesen möchte, funktioniert das bei mir nicht.

Um das zu lösen, verschiebe ich die Logik auf foreachRDD(), aber leider scheint sparkContext nicht serialisierbar. Ich habe einen Fehler wie task is not serialzable.

Schließlich fand ich, dass es eine andere Möglichkeit gibt, Daten von HBase zu lesen, indem Sie hbase.clinet HTable verwenden. Also das ist meine letzte Arbeit:

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = { 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-secure") 
    conf.set("hbase.zookeeper.quorum", "xxxxxx") 
    conf.set("hbase.master", "xxxx") 
    conf.set("hbase.zookeeper.property.clientPort", "xxx") 
    conf.set(TableInputFormat.INPUT_TABLE, "xx") 
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx") 

    val testTable = new HTable(conf, "testTable") 
    val scan = new Scan 
    scan.addColumn("cf1".getBytes, "test".getBytes) 
    val rs = testTable.getScanner(scan) 

    var r = rs.next() 
    val res = new StringBuilder 
    while(r != null){ 
     val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes)) 

     res.append(tmp) 
     r= rs.next() 
    } 
val res = res.toString 

//do the following manipulations and return object (ImmutableBytesWritable, Put) 
     .............................. 
     ....................... 
      } 

Im Haupt Methode, die ich oben Methode in foreachRDD und speichere in HBase durch Verfahren unter Verwendung von saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration)) 

Dies funktioniert für mich jetzt gut, aber ich habe Fragen zu diesem Prozess:

Auf diese Weise, denke ich, würde für jede Partition von RDD eine Verbindung zu HBase erstellt werden. Ich frage mich, ob es möglich ist, meine App zu erweitern. Sag, wenn ich mehr als 1000 Datensätze in 1 Sekunde habe, sieht es aus, als ob 1000 Verbindungen in meinem Spark-Streaming eingerichtet würden.

Ist dies der richtige Weg, Daten von HBase zu lesen? Was ist die beste Vorgehensweise, um Daten aus HBase in sparkStreaming zu lesen? Oder Spark-Streaming soll keine Daten lesen, es ist nur entworfen, um Daten in DB zu streamen.

Vielen Dank im Voraus.

Antwort

0

foreachRDD wird auf einzelnen Executoren jvm-Prozess ausgeführt. Zumindest können Sie die Singleton-Instanz von conf (das bedeutet, dass Sie einen Null-Check haben, bevor Sie das vorhandene set-conf von jvm-process oder new conf verwenden) in der transferToHBasePut-Methode erhalten. Dies reduziert die Anzahl der Hbase-Verbindungen auf die Anzahl der in Ihrem Spark-Cluster erstellten Executoren.

Hope this helps ...

+0

Vielen Dank für die Beantwortung meiner Frage. Ich habe Ihre Lösung versucht, indem Sie conf als Parameter in method transferToHasePut übergeben haben. Aber wie Sie sagten, dass foreach bei jedem einzelnen Executor-jvm-Prozess ausgeführt wird, kann der Singleton nicht vom Treiber zum Worker übertragen werden. Ich denke, das liegt daran, dass die Konfiguration nicht serailisierbar ist. Schließlich fand ich, dass es eine Methode namens foreachPartition gibt, die auf RDD verwendet werden kann. Diese Methode verspricht, dass die Verbindung nur einmal pro RDD-Partition eingerichtet wird. – Frankie

3

Nach einigen Lernen, erstelle ich eine Konfiguration für jede Partition von RDD. Überprüfen Sie das Designmuster für foreachRDD bei Spark Streaming official website. Eigentlich ist Configuration keine Verbindung, daher weiß ich nicht, wie ich eine Verbindung von einem bestehenden Verbindungspool herbekomme, um Hbase aufzunehmen und aufzunehmen.

+0

Haben Sie von HBase mit Spark-Streaming gelesen? Ich kann es nur beim Öffnen einer Verbindung für jede Daten lesen. Wie soll man das machen? – zorkaya