2017-06-20 4 views
0

Ich habe gerade erst in hadoop gelernt (ich benutze hadoop2.7.3 & java 1.7.0.89). Ich schreibe einige Codes zur Analyse verschiedener TV-Sendung & Kommentare auf verschiedenen Websites. Ich überschreibe die filesinputformat Klasse. aber wenn ich meine Codes auf eclipse laufen lasse, gibt es viele Ausnahmen. Ich versuche auf Eclipse zu debuggen. nur die Mapper finden finden oder Reduzierer irgendein Problem hat .. Aber ich bin nicht sicher, was falsch gelaufen ist ..MapReduce mit rewrited fileInputFormat kann das Ergebnis nicht ausgeben

hier ist die Datums Beispielen ist die Sekundärdaten der Spitzname der Website

truelove 3 3678 0 0 0 1 
truelove 2 39155 0 0 173 438 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
frink 2 950 0 0 0 0 
frink 2 800 0 0 0 0 
daughter 4 4489850 0 0 0 0 
daughter 4 1161 0 0 0 0 
princess 2 33593 0 0 0 3 
princess 2 36118 0 0 0 2 
princess 2 38608 0 0 0 1 
princess 3 2542 0 0 0 0 
princess 1 35322 2 4 0 1 

und dann ich umschreiben das Inputformat

die benutzerdefinierten Datenformate

 package com.hadoop.mapreduce; 

    import java.io.DataInput; 
    import java.io.DataOutput; 
    import java.io.IOException; 

    import org.apache.hadoop.io.WritableComparable; 


    /*1-5means:1youku2souhu3tudou4aiqiyi5xunlei 
    princess 2 33593 0 0 0 3 
    princess 2 36118 0 0 0 2 
    princess 2 38608 0 0 0 1 
    princess 3 2542 0 0 0 0 
    princess 1 35322 2 4 0 1*/ 
    public class TVplaydata implements WritableComparable<Object>{ 
     //private String tvname; 

     private int tvplaynum; 
     private int tvfavorite; 
     private int tvcomment; 
     private int tvdown; 
     private int tvvote; 
    public TVplaydata(){} 
    public void set(int tvplaynum,int tvfavorite,int tvcomment,int tvdown,int tvvote){ 
     this.tvplaynum = tvplaynum; 
     this.tvfavorite = tvfavorite; 
     this.tvcomment = tvcomment; 
     this.tvdown = tvdown; 
     this.tvvote = tvvote; 
    } 
    //source get set 
    public void setTvpalynum(int tvplaynum) { 
     this.tvplaynum = tvplaynum; 
    } 
    public int getTvpalynum() { 
     return tvplaynum; 

    } 

    public int getTvfavorite() { 
     return tvfavorite; 
    } 
    public void setTvfavorite(int tvfavorite) { 
     this.tvfavorite = tvfavorite; 
    } 
    public int getTvcomment() { 
     return tvcomment; 
    } 
    public void setTvcomment(int tvcomment) { 
     this.tvcomment = tvcomment; 
    } 
    public int getTvdown() { 
     return tvdown; 
    } 
    public void setTvdown(int tvdown) { 
     this.tvdown = tvdown; 
    } 
    public int getTvvote() { 
     return tvvote; 
    } 
    public void setTvvote(int tvvote) { 
     this.tvvote = tvvote; 
    } 
     @Override 

     public void readFields(DataInput in) throws IOException { 
      // TODO Auto-generated method stub 
      tvplaynum = in.readInt(); 
      tvfavorite = in.readInt(); 
      tvcomment = in.readInt(); 
      tvdown = in.readInt(); 
      tvvote = in.readInt(); 
     } 

     @Override 

     public void write(DataOutput out) throws IOException { 
      // TODO Auto-generated method stub 
      out.writeInt(tvplaynum); 
      out.writeInt(tvfavorite); 
      out.writeInt(tvcomment); 
      out.writeInt(tvdown); 
      out.writeInt(tvvote); 
     } 

     @Override 
     public int compareTo(Object o) { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

    } 

dann das Inputformat neu schreiben.

package com.hadoop.mapreduce; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.util.LineReader; 



class PlayinputFormat extends FileInputFormat<Text, TVplaydata>{ 

    @Override 
    public RecordReader<Text, TVplaydata> createRecordReader(InputSplit input, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // TODO Auto-generated method stub 

     return new TvplayRecordReader(); 
    } 
    class TvplayRecordReader extends RecordReader<Text, TVplaydata>{ 

     public LineReader in; 
     public Text lineKey; 
     public TVplaydata lineValue; 
     public Text line; 

     @Override 
     public void close() throws IOException { 
      // TODO Auto-generated method stub 
      if(in !=null){ 
       in.close(); 
      } 
     } 

     @Override 
     public Text getCurrentKey() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineKey; 
     } 

     @Override 
     public TVplaydata getCurrentValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineValue; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

     @Override 
     public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      FileSplit split=(FileSplit)input; 
      Configuration job=context.getConfiguration(); 
      Path file=split.getPath(); 
      FileSystem fs=file.getFileSystem(job); 

      FSDataInputStream filein=fs.open(file); //open 
      in=new LineReader(filein,job); 
      line=new Text(); 
      lineKey=new Text(); 
      lineValue = new TVplaydata(); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      int linesize=in.readLine(line); 
      if(linesize==0) return false; 

      String[] pieces = line.toString().split("\t"); 
      if(pieces.length != 7){ 
       throw new IOException("Invalid record received"); 
      } 


      lineKey.set(pieces[0]+"\t"+pieces[1]); 
      lineValue.set(Integer.parseInt(pieces[2]),Integer.parseInt(pieces[3]),Integer.parseInt(pieces[4]) 
        ,Integer.parseInt(pieces[5]),Integer.parseInt(pieces[6])); 
      return true; 
     } 
    } 
} 

endlich die Haupt-Methode schreiben Mapper & Reducer

package com.hadoop.mapreduce; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 



public class TVPlay extends Configured implements Tool{ 
public static void main(String[] args) throws Exception { 
    String[] paths = {"hdfs://wang:9000/mapreduce/tvplay.txt","hdfs://wang:9000/mapreduce/tvout"}; 
    int ec = ToolRunner.run(new Configuration(), new TVPlay(), paths); 
    System.exit(ec); 
} 


     //mapper 
public class TVmapper extends Mapper<Text, TVplaydata, Text, TVplaydata> { 
    public void map(Text key,TVplaydata value, Context context) throws IOException, InterruptedException { 
       context.write(key, value); 
      } 
     } 
public class TVreducer extends Reducer<Text, TVplaydata, Text, Text>{ 
      private Text m_key = new Text(); 
      private Text m_value = new Text(); 
      private MultipleOutputs<Text, Text> mos; 
      protected void setup(Context context) throws IOException, 
        InterruptedException { 
       mos = new MultipleOutputs<Text, Text>(context); 
      } 
     public void reduce(Text key,Iterable<TVplaydata> values, Context context) throws IOException, InterruptedException{ 

       int tvplaynum = 0; 
       int tvfavorite = 0; 
       int tvcomment = 0; 
       int tvvote = 0; 
       int tvdown = 0; 
      for (TVplaydata tv:values) { 
       tvplaynum += tv.getTvpalynum(); 
       tvfavorite += tv.getTvfavorite(); 
       tvcomment += tv.getTvcomment(); 
       tvvote += tv.getTvvote(); 
       tvdown += tv.getTvdown(); 
       } 

      String[] records = key.toString().split("\t"); 

      String source = records[1]; 
      m_key.set(records[0]); 
      m_value.set(tvplaynum+"\t"+tvfavorite+"\t"+tvcomment+"\t"+tvdown+"\t"+tvvote); 

      if(source.equals("1")){ 
       mos.write("youku", m_key, m_value); 
      }else if (source.equals("2")) { 
       mos.write("souhu", m_key, m_value); 
      }else if (source.equals("3")) { 
       mos.write("tudou",m_key, m_value); 
      }else if (source.equals("4")) { 
       mos.write("aiqiyi", m_key, m_value); 
      }else if (source.equals("5")) { 
       mos.write("xunlei", m_key, m_value); 
      }else{ 
       mos.write("other", m_key, m_value); 
      } 
     } 
     protected void cleanup(Context context) throws IOException ,InterruptedException{ 
       mos.close(); 
     } 
     } 

     @Override 
     public int run(String[] arg0) throws Exception { 
      // TODO Auto-generated method stub 
      Configuration conf = new Configuration(); 

      Path path = new Path(arg0[1]); 
      FileSystem hdfs = path.getFileSystem(conf); 
      if(hdfs.isDirectory(path)){ 
       hdfs.delete(path, true); 
      } 
      Job job = new Job(conf,"tvplay"); 
      job.setJarByClass(TVPlay.class); 
      // set InputFormatClass 
      job.setInputFormatClass(PlayinputFormat.class); 
      // set mapper 
      job.setMapperClass(TVmapper.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(TVplaydata.class); 
      // set reduce 
      job.setReducerClass(TVreducer.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      FileInputFormat.addInputPath(job, new Path(arg0[0])); 
      FileOutputFormat.setOutputPath(job,new Path(arg0[1])); 
      MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class, 
        Text.class, Text.class); 

      return job.waitForCompletion(true)?0:1; 
     } 
} 

Hier ist die Ausnahme

2017-06-20 23:03:26,848 INFO [org.apache.hadoop.conf.Configuration.deprecation] - session.id is deprecated. Instead, use dfs.metrics.session-id 
2017-06-20 23:03:26,854 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId= 
2017-06-20 23:03:27,874 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
2017-06-20 23:03:28,186 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 
2017-06-20 23:03:28,236 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 1 
2017-06-20 23:03:28,639 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1 
2017-06-20 23:03:29,389 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local622257889_0001 
2017-06-20 23:03:30,552 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/ 
2017-06-20 23:03:30,556 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local622257889_0001 
2017-06-20 23:03:30,607 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null 
2017-06-20 23:03:30,630 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:30,670 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
2017-06-20 23:03:31,562 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 running in uber mode : false 
2017-06-20 23:03:31,567 INFO [org.apache.hadoop.mapreduce.Job] - map 0% reduce 0% 
2017-06-20 23:03:31,569 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks 
2017-06-20 23:03:31,571 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local622257889_0001_m_000000_0 
2017-06-20 23:03:31,667 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:31,691 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux. 
2017-06-20 23:03:34,256 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : [email protected] 
2017-06-20 23:03:34,259 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete. 
2017-06-20 23:03:34,485 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local622257889_0001 
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:134) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
    at java.util.concurrent.FutureTask.run(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at java.lang.Class.getConstructor0(Unknown Source) 
    at java.lang.Class.getDeclaredConstructor(Unknown Source) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:128) 
    ... 8 more 
2017-06-20 23:03:34,574 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 failed with state FAILED due to: NA 
2017-06-20 23:03:34,598 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 0 

Ich habe keine Ahnung, was das bedeutet laufen. Denke an deine Hilfe!

+0

Können Sie auch den Befehl anzeigen, mit dem Sie den MR-Job ausgeführt haben –

Antwort

0

AT letzten finde ich die Mapper & Reduzierung sind nicht statische Methoden. Wenn ich sie zu statischen Methoden ändere, wird alles in Ordnung sein.

Why declaring Mapper and Reducer classes as static?

Hadoop verwendet Reflektion, um eine Instanz der Klasse für jede Karte zu erstellen oder eine Aufgabe zu reduzieren, die ausgeführt wird. Die neu erstellte Instanz erwartet einen Null-Argument-Konstruktor (andernfalls würde sie wissen, was zu übergeben ist).

Durch die Deklaration der Inner Mapper- oder Reduction-Klasse ohne das static-Schlüsselwort erstellt die Java-Kompilierung tatsächlich einen Konstruktor, der erwartet, dass eine Instanz der Elternklasse bei der Konstruktion übergeben wird.

Sie sollten in der Lage sein, dies zu sehen, indem Sie den javap Befehls gegen den Lauf erzeugte Klassendatei

Auch das statische Schlüsselwort nicht gültig ist, wenn sie in einer übergeordneten Klasse Deklaration (weshalb man es nie an der Spitze sehen level, aber nur in den untergeordneten Klassen)

0

Sie haben die Jobkonfiguration in die run() Methode des Reduzierers gestellt, was falsch ist. Die einfachste Lösung besteht darin, den gesamten Code in die main()-Methode von TVplay zu verschieben.

Wenn Sie nicht genau wissen, was Sie tun, sollten Sie nicht generell die run() Methode überschreiben, also würde ich das vollständig aus der Klasse reduzieren.

Verwandte Themen