2017-01-11 3 views
4

Ich versuche, Daten von HBase zu bekommen, Für alle Tuto finde ich, dass die Daten von Hbase ich durch Kafka gehen muss, ist es möglich, eine Integration zwischen Spark-Streaming und HBase direkt ohne Einbeziehung Kafka in der Kette Danke.Spark Streaming mit Hbase

+0

ja seine möglich, da wir das gleiche getan haben, ohne kafka zu verwenden. siehe unten Beispiel –

Antwort

3

ist es möglich, eine Integration zwischen Funken Streaming und hbase direkt ohne Kafka einschließlich

Ja .. its möglich, da wir die gleiche kafka ohne getan haben. siehe Beispiel unten JavaHBaseStreamingBulkPutExample

package org.apache.hadoop.hbase.spark.example.hbasecontext; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.TableName; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.spark.JavaHBaseContext; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

/** 
* This is a simple example of BulkPut with Spark Streaming 
*/ 
final public class JavaHBaseStreamingBulkPutExample { 

    private JavaHBaseStreamingBulkPutExample() {} 

    public static void main(String[] args) { 
    if (args.length < 4) { 
     System.out.println("JavaHBaseBulkPutExample " + 
       "{host} {port} {tableName}"); 
     return; 
    } 

    String host = args[0]; 
    String port = args[1]; 
    String tableName = args[2]; 

    SparkConf sparkConf = 
      new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " + 
        tableName + ":" + port + ":" + tableName); 

    JavaSparkContext jsc = new JavaSparkContext(sparkConf); 

    try { 
     JavaStreamingContext jssc = 
       new JavaStreamingContext(jsc, new Duration(1000)); 

     JavaReceiverInputDStream<String> javaDstream = 
       jssc.socketTextStream(host, Integer.parseInt(port)); 

     Configuration conf = HBaseConfiguration.create(); 

     JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); 

     hbaseContext.streamBulkPut(javaDstream, 
       TableName.valueOf(tableName), 
       new PutFunction()); 
    } finally { 
     jsc.stop(); 
    } 
    } 

    public static class PutFunction implements Function<String, Put> { 

    private static final long serialVersionUID = 1L; 

    public Put call(String v) throws Exception { 
     String[] part = v.split(","); 
     Put put = new Put(Bytes.toBytes(part[0])); 

     put.addColumn(Bytes.toBytes(part[1]), 
       Bytes.toBytes(part[2]), 
       Bytes.toBytes(part[3])); 
     return put; 
    } 

    } 
} 
+0

Danke Ram es funktioniert für mich das ist alles was ich brauche :) :) –