2016-04-16 3 views
-1

I haben mehrzeilige unstrukturierten Daten mit einem Rohr separateted (||) wie unten gezeigt:Karten-Code Reduce für Rohr mehrzeilige unstrukturierten Daten getrennt

|| 
nokia,111,55000,uk,20160809,109 
100,online,credit, 
10%,90%,2015 
|| 
iphone,6,60000,USA,20160809,110, 
100,online,credit 
|| 
samsung,s7,60000,USA,20160809,110 
100,online,credit 
|| 
..... millions of records .... 

Der erste Datensatz 3 Zeilen ist, der nächste Datensatz 2 Zeilen ist, so Die Anzahl der Zeilen ist nicht konstant. Bitte sagen Sie mir, wie Sie diese Daten verarbeiten können, indem Sie Map Reduce 2, benutzerdefinierten Eingangssplit & benutzerdefinierten Datensatzleser verwenden. Alle Links & Blogs werden ebenfalls geschätzt.

Bitte sagen Sie mir von Fehlern in meiner Custom Record Reader Klasse unter:

public class CustomInputFormat extends FileInputFormat<LongWritable,Text>{ 
    @Override 
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,TaskAttemptContext context) { 
     return new CustomRecordReader(); 
    } 

public class CustomRecordReader extends RecordReader<LongWritable, Text> { 
    private long start; 
    private long pos; 
    private long end; 
    private LineReader in; 
    private int maxLineLength; 
    private LongWritable key = new LongWritable(); 
    private Text value = new Text(); 

    private static final Log LOG = LogFactory.getLog(CustomRecordReader.class); 

    @Override 
    public void initialize(InputSplit genericSplit, TaskAttemptContext context)  throws IOException { 

     FileSplit split = (FileSplit) genericSplit; 
     Configuration job = context.getConfiguration(); 
     this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 
       Integer.MAX_VALUE); 

     start = split.getStart(); 
     end = start + split.getLength(); 
     final Path file = split.getPath(); 
     FileSystem fs = file.getFileSystem(job); 
     FSDataInputStream fileIn = fs.open(split.getPath()); 
     boolean skipFirstLine = false; 

     if (start != 0) { 
      skipFirstLine = true; 
      --start; 
      fileIn.seek(start); 
     } 
     in = new LineReader(fileIn, job); 
     if (skipFirstLine) { 
      Text dummy = new Text(); 
      start += in.readLine(dummy, 0, 
        (int) Math.min((long) Integer.MAX_VALUE, end - start)); 
     } 
     this.pos = start; 
    } 

    @Override 
    public boolean nextKeyValue() throws IOException { 
     int newSize = 0; 
     while (pos < end) { 
      newSize = in.readLine(value); 
      if (newSize == 0) break; 
      pos += newSize; 
      key.set(pos); 
      if(value.toString().equals("||")) 
       LOG.info("Skipped line of size " + newSize + " at pos " 
       + (pos - newSize)); 
      else 
       break; 
     } 
     if (newSize == 0) 
      return false; 
     return true; 
    } 

    @Override 
    public LongWritable getCurrentKey() throws IOException, 
      InterruptedException { 
     return key; 
    } 

    @Override 
    public Text getCurrentValue() throws IOException, InterruptedException { 
     return value; 
    } 

    @Override 
    public float getProgress() throws IOException, InterruptedException { 
     if (start == end) { 
      return 0.0f; 
     } else { 
      return Math.min(1.0f, (pos - start)/(float) (end - start)); 
     } 
    } 

    @Override 
    public void close() throws IOException { 
     if (in != null) { 
      in.close(); 
     } 
    } 
} 


public class CustomMapper extends Mapper<LongWritable, CustomInputFormat, LongWritable, CustomInputFormat>{ 
    final static IntWritable one = new IntWritable(1); 
    @Override 
    protected void map(LongWritable key, CustomInputFormat value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { 
     System.out.println(" *** Key is: "+key+" value is: "+value+" *** "); 
     if(null!=value){ 
        context.write(key, value); 
     } 
    } 
} 


public class CustomDriver extends Configured { 
    public static void main(String[] args) throws Exception {  
     if(args.length!=2){ 
      System.out.println("pls give i/p & o/p direc"); 
      System.exit(-1); 
     } 

     Job job = new Job();   
     job.setJarByClass(CustomDriver.class); 
     Configuration conf=job.getConfiguration();      
     FileInputFormat.setInputPaths(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setMapperClass(CustomMapper.class); 

     job.setInputFormatClass(CustomInputFormat.class); 

     job.setMapOutputKeyClass(LongWritable.class); 
     job.setMapOutputValueClass(CustomInputFormat.class);   

     System.exit(job.waitForCompletion(true)?0:-1); 
    } 
} 

Ich bin unten Fehler immer auf Lauf:

[[email protected] gous]$ hadoop jar c3.jar com.ownUnstruct.CustomDriver /user/mr/custom /user/mr/out 
16/04/18 23:15:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 
16/04/18 23:15:02 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
16/04/18 23:15:02 INFO input.FileInputFormat: Total input paths to process : 1 
16/04/18 23:15:02 INFO mapreduce.JobSubmitter: number of splits:1 
16/04/18 23:15:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1461045457615_0001 
16/04/18 23:15:03 INFO impl.YarnClientImpl: Submitted application application_1461045457615_0001 
16/04/18 23:15:03 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1461045457615_0001/ 
16/04/18 23:15:03 INFO mapreduce.Job: Running job: job_1461045457615_0001 
16/04/18 23:15:15 INFO mapreduce.Job: Job job_1461045457615_0001 running in uber mode : false 
16/04/18 23:15:15 INFO mapreduce.Job: map 0% reduce 0% 
16/04/18 23:15:22 INFO mapreduce.Job: Task Id : attempt_1461045457615_0001_m_000000_0, Status : FAILED 
Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null 
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414) 
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:415) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 
Caused by: java.lang.NullPointerException 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011) 
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402) 
    ... 9 more 

Antwort

0

das Pipe-Symbol Angenommen wird immer durch eine neue Zeile vorangestellt und Ich glaube nicht, dass ein benutzerdefinierter Eingabeaufteiler oder ein benutzerdefinierter Datensatzleser benötigt wird. Sie können den Code für Mapper wie folgt schreiben:

public class MyMapper extends Mapper<LongWritable, Text, Text, Text> { 

    private Text textValue; 

    @Override 
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     //process the data 
    } 

    @Override 
    public void run(Mapper<LongWritable, Text, Text, Text>.Context context) 
      throws IOException, InterruptedException { 
     setup(context); 
     textValue = new Text(); 
     StringBuffer sb = new StringBuffer(); 
     while (context.nextKeyValue()) { 
      String line = context.getCurrentValue().toString(); 
      if(line.equals("||")){ 
       textValue.set(sb.toString()); 
       if(!("".equals(sb.toString()))) 
        map(context.getCurrentKey(), textValue, context); 
       sb = new StringBuffer(); 
      } else { 
       sb.append(line); 
      } 
     } 
     cleanup(context); 
    } 
} 

Dies sollte Sie mit Ihrer Verarbeitung beginnen.

UPDATE: Ändern Sie Ihren Code in nextKeyValue() zu so etwas und versuchen Sie es.

@Override 
public boolean nextKeyValue() throws IOException { 
    int newSize = 0; 
    StringBuffer sb = new StringBuffer(); 
    while (pos < end) { 
     newSize = in.readLine(value); 
     if (newSize == 0) break; 
     pos += newSize; 
     key.set(pos); 
     if(value.toString().equals("||")){ 
      LOG.info("Skipped line of size " + newSize + " at pos " 
       + (pos - newSize)); 
      break; 
     } else 
      sb.append(value.toString()); 
    } 
    value.set(sb.toString()); 
    if (newSize == 0) 
     return false; 
    return true; 
} 

UPDATE: Sie so etwas wie dies ändern Mapper:

public class CustomMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ 
    @Override 
    protected void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException { 
     System.out.println(" *** Key is: "+key+" value is: "+value+" *** "); 
     if(null!=value){ 
      context.write(key, value); 
     } 
    } 
} 

Und in der CustomDriver Klasse, ändern Sie die folgende Zeile

job.setMapOutputValueClass(CustomInputFormat.class); 

zu

job.setMapOutputValueClass(Text.class);  
+0

Hey Johnson Charles, danke für deine Hilfe, kannst du bitte meinen Code durchgehen lassen und mir irgendeine Korrektur mitteilen? – user2192023

+0

Zuerst möchte ich Ihre Bemühungen würdigen, mir die Lösung zu bieten. Ja ich erwartete Ausgabe wie: 65 Nokia, 111,55000, uk, 20160809,109,100, online, Kredit, 10%, 90%, 2015
116 iphone, 6.60000, USA, 20160809,110,100, online, Kredit – user2192023

+0

Lassen Sie mich wissen, ob diese Lösung für Sie funktioniert. –