2017-02-24 2 views
0

Der Mapper von Benutzerdatei von zwei Stellen 1) Artikel besucht Lesen (nach Land Sortieren) 2) Statistik des Landes (Land weise)von HDFS Lesen und Schreiben zu Hbase

Der Ausgang der beiden Mapper ist Text, Text

ich betreibe Programm von Amazon Cluster

Mein Ziel ist lesen von Daten aus zwei unterschiedlichen Satz und das Ergebnis kombinieren und speichern sie in hbase.

HDFS zu HDFS funktioniert. als

17/02/24 10:45:31 INFO mapreduce.Job: map 0% reduce 0% 
17/02/24 10:45:37 INFO mapreduce.Job: map 100% reduce 0% 
17/02/24 10:45:49 INFO mapreduce.Job: map 100% reduce 67% 
17/02/24 10:46:00 INFO mapreduce.Job: Task Id : attempt_1487926412544_0016_r_000000_0, Status : FAILED 
Error: java.lang.IllegalArgumentException: Row length is 0 
     at org.apache.hadoop.hbase.client.Mutation.checkRow(Mutation.java:565) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:110) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:68) 
     at org.apache.hadoop.hbase.client.Put.<init>(Put.java:58) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:45) 
     at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:1) 
     at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
     at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635) 
     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390) 
     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at javax.security.auth.Subject.doAs(Subject.java:422) 
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) 
     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

Treiberklasse

package com.happiestminds.hadoop; 



import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.hbase.HBaseConfiguration; 
import org.apache.hadoop.hbase.MasterNotRunningException; 
import org.apache.hadoop.hbase.client.HBaseAdmin; 
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class Main extends Configured implements Tool { 

    /** 
    * @param args 
    * @throws Exception 
    */ 
    public static String outputTable = "mapreduceoutput"; 

    public static void main(String[] args) throws Exception { 
     int exitCode = ToolRunner.run(new Main(), args); 
     System.exit(exitCode); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 


     Configuration config = HBaseConfiguration.create(); 

     try{ 
      HBaseAdmin.checkHBaseAvailable(config); 
     } 
     catch(MasterNotRunningException e){ 
      System.out.println("Master not running"); 
      System.exit(1); 
     } 

     Job job = Job.getInstance(config, "Hbase Test"); 

     job.setJarByClass(Main.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 



     MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ArticleMapper.class); 
     MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, StatisticsMapper.class); 

     TableMapReduceUtil.addDependencyJars(job); 
     TableMapReduceUtil.initTableReducerJob(outputTable, CounterReducer.class, job); 

     //job.setReducerClass(CounterReducer.class); 

     job.setNumReduceTasks(1); 


     return job.waitForCompletion(true) ? 0 : 1; 
    } 

} 

Reducer Klasse ist

package com.happiestminds.hadoop; 

import java.io.IOException; 

import org.apache.hadoop.hbase.client.Mutation; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 
import org.apache.hadoop.hbase.mapreduce.TableReducer; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 


public class CounterReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { 

    public static final byte[] CF = "counter".getBytes(); 
    public static final byte[] COUNT = "combined".getBytes(); 


    @Override 
    protected void reduce(Text key, Iterable<Text> values, 
      Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
      throws IOException, InterruptedException { 

     String vals = values.toString(); 
     int counter = 0; 

     StringBuilder sbr = new StringBuilder(); 
     System.out.println(key.toString()); 
     for (Text val : values) { 
      String stat = val.toString(); 
      if (stat.equals("***")) { 
       counter++; 
      } else { 
       sbr.append(stat + ","); 
      } 

     } 
     sbr.append("Article count : " + counter); 


     Put put = new Put(Bytes.toBytes(key.toString())); 
     put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
     if (counter != 0) { 
      context.write(null, put); 
     } 

    } 



} 

Abhängigkeiten

Der Code wird auf die Reduzierung 67% und gibt Fehler stecken
<dependencies> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.7.3</version> 
     </dependency> 



     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-client</artifactId> 
      <version>1.2.2</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-common</artifactId> 
      <version>1.2.2</version> 
     </dependency> 


     <dependency> 
      <groupId>org.apache.hbase</groupId> 
      <artifactId>hbase-server</artifactId> 
      <version>1.2.2</version> 
     </dependency> 



    </dependencies> 

Antwort

0

Können Sie versuchen zu überprüfen, ob Sie Nullwerte einfügen oder nicht?

Das HBase-Datenmodell erlaubt keinen Null-Zeilenschlüssel, es sollte mindestens 1 Byte sein.

Bitte überprüfen Sie Ihren Reducer-Code , bevor Sie den Put-Befehl ausführen, ob einige der Werte auf null gesetzt sind oder nicht.

1

Eine gute Praxis ist, Ihre Werte zu validieren, bevor Sie sie irgendwo einreichen. In Ihrem speziellen Fall können Sie Ihre Schlüssel und sbr validieren oder sie in Try-Catch-Abschnitt mit angemessenen Benachrichtigungsrichtlinien einwickeln. Sie sollten sie in eine Log-Ausgabe, wenn sie nicht korrekt sind und Sie Unit-Tests mit neuen Testfälle aktualisieren:

try 
{ 
    Put put = new Put(Bytes.toBytes(key.toString())); 
    put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString())); 
    if (counter != 0) { 
     context.write(null, put); 
    } 
} 
catch (IllegalArgumentException ex) 
{ 
     System.err.println("Error processing record - Key: "+ key.toString() +", values: " +sbr.ToString()); 
} 
0

Der Fehler, den Sie erhalten, ist ziemlich selbsterklärend. Zeilenschlüssel in HBase können nicht leer sein (obwohl Werte sein können).

@Override 
protected void reduce(Text key, Iterable<Text> values, 
     Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) 
     throws IOException, InterruptedException { 
    if (key == null || key.getLength() == 0) { 
     // Log a warning about the empty key. 
     return; 
    } 
    // Rest of your reducer follows. 
} 
+0

Jetzt Reducer wird bei 100% stecken. –

1

Nach der durch das Programm geworfen Ausnahme ist es klar, dass die Schlüssellänge ist 0 so, bevor sie in hbase setzen Sie prüfen, ob die Schlüssellänge 0 ist oder nicht, können nur Sie die hbase versetzt.

Weitere Klarheit warum Schlüssellänge 0 ist nicht von hbase unterstützt

Becuase HBase Datenmodell nicht 0-length Zeilenschlüssel erlaubt, es sollte wenigstens 1 Byte sein. Der 0-Byte-Zeilenschlüssel ist für die interne Verwendung reserviert (um leere Start- und Ende-Schlüssel anzugeben).