2016-05-25 17 views
2

Verbindung I den Code unten bin mit zu schreiben, um HbaseSaveAsHadoopDataset schließt nie Zookeeper

jsonDStream.foreachRDD(new Function<JavaRDD<String>, Void>() { 

     @Override 
     public Void call(JavaRDD<String> rdd) throws Exception { 

      DataFrame jsonFrame = sqlContext.jsonRDD(rdd); 
      DataFrame selecteFieldFrame = jsonFrame.select("id_str","created_at","text"); 

      Configuration config = HBaseConfiguration.create(); 
      config.set("hbase.zookeeper.quorum", "d-9543"); 
      config.set("zookeeper.znode.parent","/hbase-unsecure"); 
      config.set("hbase.zookeeper.property.clientPort", "2181"); 
      final JobConf jobConfig=new JobConf(config,SveAsHadoopDataSetExample.class); 

      jobConfig.setOutputFormat(TableOutputFormat.class); 
      jobConfig.set(TableOutputFormat.OUTPUT_TABLE,"tableName"); 
      selecteFieldFrame.javaRDD().mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() { 

       @Override 
       public Tuple2<ImmutableBytesWritable, Put> call(Row row) throws Exception { 
        // TODO Auto-generated method stub 
        return convertToPut(row); 
       } 
      }).saveAsHadoopDataset(jobConfig); 


      return null; 
     } 
    }); 

Aber wenn ich zkDump in zookeeper sehen hält die Verbindungen auf die Erhöhung

jede Anregung/Zeiger wird von a tolle Hilfe!

Antwort

4

ich das gleiche Problem haben, ist es ein hbase Fehler ist, ich es beheben:

Änderung org.apache.hadoop.hbase.mapred.TableOutputFormat zu org.apache.hadoop.hbase.mapreduce. TableOutputFormat, und verwenden org.apache.hadoop.mapreduce.Job, nicht org.apache.hadoop.mapred.JobConf

dies ist ein Beispiel:

import org.apache.hadoop.mapreduce.Job 
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat 

val conf = HBaseConfiguration.create() 
conf.set("hbase.zookeeper.quorum", zk_hosts) 
conf.set("hbase.zookeeper.property.clientPort", zk_port) 

conf.set(TableOutputFormat.OUTPUT_TABLE, "TABLE_NAME") 
val job = Job.getInstance(conf) 
job.setOutputFormatClass(classOf[TableOutputFormat[String]]) 

formatedLines.map{ 
    case (a,b, c) => { 
    val row = Bytes.toBytes(a) 

    val put = new Put(row) 
    put.setDurability(Durability.SKIP_WAL) 

    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("node"), Bytes.toBytes(b)) 
    put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("topic"), Bytes.toBytes(c)) 

    (new ImmutableBytesWritable(row), put) 
    } 
}.saveAsNewAPIHadoopDataset(job.getConfiguration) 

das kann Ihnen helfen!

https://github.com/hortonworks-spark/shc/pull/20/commits/2074067c42c5a454fa4cdeec18c462b5367f23b9

+2

Während dies kann theoretisch die Frage beantworten, [es wäre vorzuziehen] (// meta.stackoverflow.com/q/8259) hier, die die wesentlichen Teile der Antwort auf umfassen und Stellen Sie den Link als Referenz zur Verfügung. –

+0

ja, danke für den Vorschlag – leocook

+0

Danke Mann! Dennoch haben die meisten Handbücher die alte Art, die den Bug hat. Ich hatte erwartet, dass ich stg. job.schließen(). Mein Problem war genau dasselbe. – zorkaya