Ich habe riesige Nr. von kleinen Dateien möchte ich CombineFileInputFormat verwenden, um die Dateien so zusammenzuführen, dass jede Datei als ein einzelner Datensatz in meinem MR-Job erscheint. Ich habe http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html gefolgt und versuchte, sie in die neue apiDatei als einzelnen Datensatz in Hadoop lesen
Ich bin vor zwei Probleme zu konvertieren:
a) Ich teste es nur mit zwei kleinen Dateien, noch 2 Mapper abgefeuert werden. Ich erwartete 1
b) Jede Zeile als einzelner Datensatz kommt, ich will die ganze Datei als einzelner Datensatz.
Es kann schmerzhaft sein, aber bitte unter dem Code suchen. Ich bin immer noch ein naiver in hadoop
Der Fahrer Klasse
public class MRDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
FileSystem fs = new Path(".").getFileSystem(getConf());
fs.printStatistics();
Job job = new Job(getConf());
job.setJobName("Enron MR");
job.setMapperClass(EnronMailReadMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(0);
job.setJarByClass(EnronMailReadMapper.class);
RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 :1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MRDriver(), args);
System.exit(exitCode);
}
}
Die unten Klasse meist kopieren ist Paste aus LineRecordReader mit Modifikation zu initialisieren() & NextKeyValue() Funktion
public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
in = new LineReader(fileIn, job);
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private int maxBytesToConsume(long pos) {
return (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal= pos;
return retVal;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
StringBuffer totalValue = new StringBuffer();
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
totalValue.append(value.toString()+"\n");
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
value = new Text(totalValue.toString());
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f,
(getFilePosition() - start)/(float)(end - start));
}
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
}
}
}
Ot ihre Dateien
public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}
}
Und
public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {
private CombineFileSplit split;
private TaskAttemptContext context;
private int index;
private RecordReader< LongWritable, Text > rr;
public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
this.split = split;
this.context = context;
this.index = index;
this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.split = (CombineFileSplit) split;
this.context = context;
if (null == rr) {
rr = new SingleFileRecordReader();
}
FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index),
this.split.getLength(index),
this.split.getLocations());
this.rr.initialize(fileSplit, this.context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.nextKeyValue();
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getProgress();
}
@Override
public void close() throws IOException {
if (rr != null) {
rr.close();
rr = null;
}
}
}
Sie benötigen die maximale Spaltgröße einzustellen, bis zu dem die Dateien kombiniert sind. Sie können das in der Dokumentation des 'CombineFileInputFormat' lesen. –