2016-09-28 1 views
0

Ich hatte einen Anwendungsfall, in dem ich den ersten MR-Ausgang zum zweiten MR-Eingang haben wollte. Das habe ich mit ControlJob in Hadoop erreicht, aber am Ende des Jobs bekomme ich die unten erwähnte Exception.Zweiter MR-Auftrag beendet nicht in Hadoop

java.lang.IllegalStateException: Job in state RUNNING instead of DEFINE 
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294) 
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1288) 
    at org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:335) 
    at org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:240) 
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.runMRJobs(JoinClickImpressionDetailJob.java:353) 
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:421) 
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) 
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:309) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221) 
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136) 

Source Code

public static void main(String[] args) throws Exception { 
     ToolRunner.run(new Configuration(), new JoinClickImpressionDetailJob(), args); 
     System.exit(1); 
    } 

    private static int runMRJobs(String[] args) { 
     int result = -1; 
     Configuration conf = new Configuration(); 
     conf.set("mapreduce.output.fileoutputformat.compress", "true"); 
     conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec"); 
     conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec"); 
     conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK"); 

     ControlledJob mrJob1 = null; 
     Job firstJob = null; 
     try { 
      deleteDirectory(args[2], conf); 
      mrJob1 = new ControlledJob(conf); 
      mrJob1.setJobName("IMPRESSION_CLICK_COMBINE_JOB"); 
      firstJob = mrJob1.getJob(); 
      result += firstMapReduceJob(args, firstJob); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     System.out.println("First Job Finished============="); 

     System.out.println("Second Job Started============="); 

     ControlledJob mrJob2 = null; 
     try { 
      mrJob2 = new ControlledJob(conf); 
      deleteDirectory(args[3], conf); 
      mrJob2.addDependingJob(mrJob1); 
      mrJob2.setJobName("IMPRESSION_CLICK_COMBINE_JOB1"); 
      Job job2 = mrJob2.getJob(); 
      result += secondMapReduceJob(args, job2); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Second Job Finished============="); 

     JobControl jobControl = new JobControl("Click-Impression-aggregator"); 
     jobControl.addJob(mrJob1); 
     jobControl.addJob(mrJob2); 
     jobControl.run(); 
     return result; 
    } 

    private static int secondMapReduceJob(String[] args, Job job2) throws IOException, InterruptedException, ClassNotFoundException { 
     long startTime = System.currentTimeMillis(); 

     job2.setMapOutputKeyClass(Text.class); 
     job2.setMapOutputValueClass(Text.class); 
     job2.setJarByClass(JoinClickImpressionDetailJob.class); 

     job2.setInputFormatClass(TextInputFormat.class); 
     job2.setOutputFormatClass(TextOutputFormat.class); 

     job2.setReducerClass(ImpressionAndClickReducer.class); 

     FileInputFormat.setInputDirRecursive(job2, true); 
     FileInputFormat.addInputPath(job2, new Path(args[2])); 
     job2.setMapperClass(ImpressionClickMapper.class); 

     FileOutputFormat.setOutputPath(job2, new Path(args[3])); 
     job2.setNumReduceTasks(8); 
     job2.setPartitionerClass(ClickNonClickPartitioner.class); 
     System.out.println("Time taken : " + (System.currentTimeMillis() - startTime)/1000); 
     return job2.waitForCompletion(true) ? 1 : 0; 
    } 

    private static int firstMapReduceJob(String[] args, Job job) throws IOException, InterruptedException, ClassNotFoundException { 

     long startTime = System.currentTimeMillis(); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setJarByClass(JoinClickImpressionDetailJob.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     job.setReducerClass(ImpressionClickReducer.class); 

     FileInputFormat.setInputDirRecursive(job, true); 
     /** 
     * Here directory of impressions will be present 
     */ 
     MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class); 
     /** 
     * Here directory of clicks will be present 
     */ 
     MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class); 

     FileOutputFormat.setOutputPath(job, new Path(args[2])); 

     job.setNumReduceTasks(10); 

     job.setPartitionerClass(TrackerPartitioner.class); 

     System.out.println("Time taken : " + (System.currentTimeMillis() - startTime)/1000); 
     return job.waitForCompletion(true) ? 1 : 0; 
    } 

    private static void deleteDirectory(String args, Configuration conf) throws IOException { 
     Path p = new Path(args); 
     FileSystem fs = FileSystem.get(conf); 
     fs.exists(p); 
     fs.delete(p, true); 
    } 

    @Override 
    public int run(String[] args) throws Exception { 
     return runMRJobs(args); 
    } 

komplette Code: https://github.com/ragnar-lothbrok/hadoop-demo/blob/master/src/main/java/com/hadoop/intellipaat/JoinClickImpressionDetailJob.java

Antwort

0

aktualisieren, wie unten Ihren Code. Sie können dies ohne Kontrolljob erreichen.

private static int runMRJobs(String[] args) { 

     int result = -1;  

Configuration conf = getConf(); 

     conf.set("mapreduce.output.fileoutputformat.compress", "true"); 
     conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec"); 
     conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec"); 
     conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK"); 

     Job job1 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB"); 

      try { 
       deleteDirectory(args[2], conf); 
       result += firstMapReduceJob(args, job1); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

      System.out.println("First Job Finished============="); 

      System.out.println("Second Job Started============="); 

      Job job2 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB1"); 
      try { 
       deleteDirectory(args[3], conf); 
       result += secondMapReduceJob(args, job2); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      System.out.println("Second Job Finished============="); 
    return result; 
    } 

in Ihren ersten Job Rückkehr wie unten

return job1.waitForCompletion(true); 

in Ihrem zweiten Job Rückkehr wie unten

return job2.waitForCompletion(true) ? 0 : 1; 
+0

Ja ich das gleiche in beiden MR Arbeitsplätze getan haben. – cody123

+0

sieht es anders aus, wie in Ihrem Post, Sie haben? 1: 0 –

+0

Ja, ich habe auch mit "job.waitForCompletion (true)? 0: 1" überprüft, aber es hat nicht funktioniert. – cody123