Ich habe gerade erst in hadoop gelernt (ich benutze hadoop2.7.3 & java 1.7.0.89). Ich schreibe einige Codes zur Analyse verschiedener TV-Sendung & Kommentare auf verschiedenen Websites. Ich überschreibe die filesinputformat Klasse. aber wenn ich meine Codes auf eclipse laufen lasse, gibt es viele Ausnahmen. Ich versuche auf Eclipse zu debuggen. nur die Mapper finden finden oder Reduzierer irgendein Problem hat .. Aber ich bin nicht sicher, was falsch gelaufen ist ..MapReduce mit rewrited fileInputFormat kann das Ergebnis nicht ausgeben
hier ist die Datums Beispielen ist die Sekundärdaten der Spitzname der Website
truelove 3 3678 0 0 0 1
truelove 2 39155 0 0 173 438
truelove 1 142208 1 2 1 1
truelove 1 142208 1 2 1 1
truelove 1 142208 1 2 1 1
frink 2 950 0 0 0 0
frink 2 800 0 0 0 0
daughter 4 4489850 0 0 0 0
daughter 4 1161 0 0 0 0
princess 2 33593 0 0 0 3
princess 2 36118 0 0 0 2
princess 2 38608 0 0 0 1
princess 3 2542 0 0 0 0
princess 1 35322 2 4 0 1
und dann ich umschreiben das Inputformat
die benutzerdefinierten Datenformate
package com.hadoop.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/*1-5means:1youku2souhu3tudou4aiqiyi5xunlei
princess 2 33593 0 0 0 3
princess 2 36118 0 0 0 2
princess 2 38608 0 0 0 1
princess 3 2542 0 0 0 0
princess 1 35322 2 4 0 1*/
public class TVplaydata implements WritableComparable<Object>{
//private String tvname;
private int tvplaynum;
private int tvfavorite;
private int tvcomment;
private int tvdown;
private int tvvote;
public TVplaydata(){}
public void set(int tvplaynum,int tvfavorite,int tvcomment,int tvdown,int tvvote){
this.tvplaynum = tvplaynum;
this.tvfavorite = tvfavorite;
this.tvcomment = tvcomment;
this.tvdown = tvdown;
this.tvvote = tvvote;
}
//source get set
public void setTvpalynum(int tvplaynum) {
this.tvplaynum = tvplaynum;
}
public int getTvpalynum() {
return tvplaynum;
}
public int getTvfavorite() {
return tvfavorite;
}
public void setTvfavorite(int tvfavorite) {
this.tvfavorite = tvfavorite;
}
public int getTvcomment() {
return tvcomment;
}
public void setTvcomment(int tvcomment) {
this.tvcomment = tvcomment;
}
public int getTvdown() {
return tvdown;
}
public void setTvdown(int tvdown) {
this.tvdown = tvdown;
}
public int getTvvote() {
return tvvote;
}
public void setTvvote(int tvvote) {
this.tvvote = tvvote;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
tvplaynum = in.readInt();
tvfavorite = in.readInt();
tvcomment = in.readInt();
tvdown = in.readInt();
tvvote = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(tvplaynum);
out.writeInt(tvfavorite);
out.writeInt(tvcomment);
out.writeInt(tvdown);
out.writeInt(tvvote);
}
@Override
public int compareTo(Object o) {
// TODO Auto-generated method stub
return 0;
}
}
dann das Inputformat neu schreiben.
package com.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
class PlayinputFormat extends FileInputFormat<Text, TVplaydata>{
@Override
public RecordReader<Text, TVplaydata> createRecordReader(InputSplit input, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new TvplayRecordReader();
}
class TvplayRecordReader extends RecordReader<Text, TVplaydata>{
public LineReader in;
public Text lineKey;
public TVplaydata lineValue;
public Text line;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
if(in !=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineKey;
}
@Override
public TVplaydata getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return lineValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job);
FSDataInputStream filein=fs.open(file); //open
in=new LineReader(filein,job);
line=new Text();
lineKey=new Text();
lineValue = new TVplaydata();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize=in.readLine(line);
if(linesize==0) return false;
String[] pieces = line.toString().split("\t");
if(pieces.length != 7){
throw new IOException("Invalid record received");
}
lineKey.set(pieces[0]+"\t"+pieces[1]);
lineValue.set(Integer.parseInt(pieces[2]),Integer.parseInt(pieces[3]),Integer.parseInt(pieces[4])
,Integer.parseInt(pieces[5]),Integer.parseInt(pieces[6]));
return true;
}
}
}
endlich die Haupt-Methode schreiben Mapper & Reducer
package com.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TVPlay extends Configured implements Tool{
public static void main(String[] args) throws Exception {
String[] paths = {"hdfs://wang:9000/mapreduce/tvplay.txt","hdfs://wang:9000/mapreduce/tvout"};
int ec = ToolRunner.run(new Configuration(), new TVPlay(), paths);
System.exit(ec);
}
//mapper
public class TVmapper extends Mapper<Text, TVplaydata, Text, TVplaydata> {
public void map(Text key,TVplaydata value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public class TVreducer extends Reducer<Text, TVplaydata, Text, Text>{
private Text m_key = new Text();
private Text m_value = new Text();
private MultipleOutputs<Text, Text> mos;
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
}
public void reduce(Text key,Iterable<TVplaydata> values, Context context) throws IOException, InterruptedException{
int tvplaynum = 0;
int tvfavorite = 0;
int tvcomment = 0;
int tvvote = 0;
int tvdown = 0;
for (TVplaydata tv:values) {
tvplaynum += tv.getTvpalynum();
tvfavorite += tv.getTvfavorite();
tvcomment += tv.getTvcomment();
tvvote += tv.getTvvote();
tvdown += tv.getTvdown();
}
String[] records = key.toString().split("\t");
String source = records[1];
m_key.set(records[0]);
m_value.set(tvplaynum+"\t"+tvfavorite+"\t"+tvcomment+"\t"+tvdown+"\t"+tvvote);
if(source.equals("1")){
mos.write("youku", m_key, m_value);
}else if (source.equals("2")) {
mos.write("souhu", m_key, m_value);
}else if (source.equals("3")) {
mos.write("tudou",m_key, m_value);
}else if (source.equals("4")) {
mos.write("aiqiyi", m_key, m_value);
}else if (source.equals("5")) {
mos.write("xunlei", m_key, m_value);
}else{
mos.write("other", m_key, m_value);
}
}
protected void cleanup(Context context) throws IOException ,InterruptedException{
mos.close();
}
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Path path = new Path(arg0[1]);
FileSystem hdfs = path.getFileSystem(conf);
if(hdfs.isDirectory(path)){
hdfs.delete(path, true);
}
Job job = new Job(conf,"tvplay");
job.setJarByClass(TVPlay.class);
// set InputFormatClass
job.setInputFormatClass(PlayinputFormat.class);
// set mapper
job.setMapperClass(TVmapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TVplaydata.class);
// set reduce
job.setReducerClass(TVreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job,new Path(arg0[1]));
MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,
Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class,
Text.class, Text.class);
return job.waitForCompletion(true)?0:1;
}
}
Hier ist die Ausnahme
2017-06-20 23:03:26,848 INFO [org.apache.hadoop.conf.Configuration.deprecation] - session.id is deprecated. Instead, use dfs.metrics.session-id
2017-06-20 23:03:26,854 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=
2017-06-20 23:03:27,874 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2017-06-20 23:03:28,186 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2017-06-20 23:03:28,236 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 1
2017-06-20 23:03:28,639 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1
2017-06-20 23:03:29,389 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local622257889_0001
2017-06-20 23:03:30,552 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/
2017-06-20 23:03:30,556 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local622257889_0001
2017-06-20 23:03:30,607 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null
2017-06-20 23:03:30,630 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1
2017-06-20 23:03:30,670 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2017-06-20 23:03:31,562 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 running in uber mode : false
2017-06-20 23:03:31,567 INFO [org.apache.hadoop.mapreduce.Job] - map 0% reduce 0%
2017-06-20 23:03:31,569 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks
2017-06-20 23:03:31,571 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local622257889_0001_m_000000_0
2017-06-20 23:03:31,667 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1
2017-06-20 23:03:31,691 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux.
2017-06-20 23:03:34,256 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : [email protected]
2017-06-20 23:03:34,259 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete.
2017-06-20 23:03:34,485 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local622257889_0001
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>()
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:134)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>()
at java.lang.Class.getConstructor0(Unknown Source)
at java.lang.Class.getDeclaredConstructor(Unknown Source)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:128)
... 8 more
2017-06-20 23:03:34,574 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 failed with state FAILED due to: NA
2017-06-20 23:03:34,598 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 0
Ich habe keine Ahnung, was das bedeutet laufen. Denke an deine Hilfe!
Können Sie auch den Befehl anzeigen, mit dem Sie den MR-Job ausgeführt haben –