Ich versuche, Map-Side-Join mit CompositeTextInoutFormat
zu implementieren. Wie auch immer, ich bekomme unten Fehler in Map reduce job, die ich nicht beheben kann. 1. Im folgenden Code erhalte ich einen Fehler bei der Verwendung der Compose-Methode und beim Erstellen eines Eingabeformats mit einem Fehler. Der Fehler sagt wie folgt.Kann CompositetextinputFormat in Mapside nicht verwenden Join
Die Methode compose (String, Klasse, Pfad ...) in der Typ CompositeInputFormat für die Argumente nicht anwendbar ist (String, Klasse, Pfad [])
Kann jemand bitte helfen
package Hadoop.MR.Practice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.join.CompositeInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
//import org.apache.hadoop.mapred.join.CompositeInputFormat;
public class MapJoinJob implements Tool{
private Configuration conf;
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "MapSideJoinJob");
job.setJarByClass(this.getClass());
Path[] inputs = new Path[] { new Path(args[0]), new Path(args[1])};
String join = CompositeInputFormat.compose("inner", KeyValueTextInputFormat.class, inputs);
job.getConfiguration().set("mapreduce.join.expr", join);
job.setInputFormatClass(CompositeInputFormat.class);
job.setMapperClass(MapJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//Configuring reducer
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
MapJoinJob mjJob = new MapJoinJob();
ToolRunner.run(conf, mjJob, args);
}
Vielen Dank .. werde versuchen, dies zu beheben .. –