2017-12-07 13 views
0

Ich versuche, ein Map-Reduce-Programm zu erstellen, um den K-Means-Algorithmus auszuführen. Ich weiß, dass Map Reduce nicht der beste Weg ist, iterative Algorithmen zu verwenden. Ich habe die Mapper- und Reducer-Klassen erstellt. Im Mapper-Code habe ich eine Eingabedatei gelesen. Wenn eine Kartenreduzierung abgeschlossen ist, möchte ich, dass die Ergebnisse in derselben Eingabedatei gespeichert werden. Wie kann ich die Ausgabedatei überschreiben die eingegebene Datei vom Mapper? auch so mache ich die Karte Iterierte reduzieren, bis die Werte aus der alten Eingabedatei und neue Eingabedatei also die Differenz zwischen den Werten konvergieren weniger als 0,1Hadoop Mapreduce, Wie überschreibe ich eine im Mapper eingegebene TXT-Datei mit Map Reduce Output?

Mein Code ist:

import java.io.IOException; 
import java.util.StringTokenizer; 
import java.util.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import java.io.FileReader; 
import java.io.BufferedReader; 
import java.util.ArrayList; 


public class kmeansMapper extends Mapper<Object, Text, DoubleWritable, 
DoubleWritable> { 
private final static String centroidFile = "centroid.txt"; 
private List<Double> centers = new ArrayList<Double>(); 

public void setup(Context context) throws IOException{ 
     BufferedReader br = new BufferedReader(new 
     FileReader(centroidFile)); 
     String contentLine; 
     while((contentLine = br.readLine())!=null){ 
      centers.add(Double.parseDouble(contentLine)); 
     } 
} 

public void map(Object key, Text input, Context context) throws IOException, 
InterruptedException { 

     String[] fields = input.toString().split(" "); 
     Double rating = Double.parseDouble(fields[2]); 
     Double distance = centers.get(0) - rating; 
     int position = 0; 
     for(int i=1; i<centers.size(); i++){ 
      Double cDistance = Math.abs(centers.get(i) - rating); 
      if(cDistance< distance){ 
       position = i; 
       distance = cDistance; 
      } 
     } 
     Double closestCenter = centers.get(position); 
     context.write(new DoubleWritable(closestCenter),new 
DoubleWritable(rating)); //outputs closestcenter and rating value 

     } 
} 
import java.io.IOException; 
import java.lang.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Reducer; 
import java.util.*; 

public class kmeansReducer extends Reducer<DoubleWritable, DoubleWritable, 
DoubleWritable, Text> { 

public void reduce(DoubleWritable key, Iterable<DoubleWritable> values, 
Context context)// get count // get total //get values in a string 
      throws IOException, InterruptedException { 
      Iterator<DoubleWritable> v = values.iterator(); 
      double total = 0; 
      double count = 0; 
      String value = ""; //value is the rating 
      while (v.hasNext()){ 
       double i = v.next().get(); 
       value = value + " " + Double.toString(i); 
       total = total + i; 
       ++count; 
      } 
      double nCenter = total/count; 
    context.write(new DoubleWritable(nCenter), new Text(value)); 
} 
} 
import java.util.Arrays; 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class run 
{ 

public static void runJob(String[] input, String output) throws Exception { 

    Configuration conf = new Configuration(); 

    Job job = new Job(conf); 
    Path toCache = new Path("input/centroid.txt"); 
    job.addCacheFile(toCache.toUri()); 
    job.setJarByClass(run.class); 
    job.setMapperClass(kmeansMapper.class); 
    job.setReducerClass(kmeansReducer.class); 
    job.setMapOutputKeyClass(DoubleWritable.class); 
    job.setMapOutputValueClass(DoubleWritable.class); 

    job.setNumReduceTasks(1); 
    Path outputPath = new Path(output); 
    FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath,true); 
    job.waitForCompletion(true); 

} 

public static void main(String[] args) throws Exception { 
    runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]); 

} 

} 

Dank

Antwort

0

Ich weiß, dass Sie den Disclaimer setzen .. aber bitte wechseln Sie zu Spark oder einem anderen Framework, das Probleme im Speicher lösen kann. Dein Leben wird so viel besser sein.

Wenn Sie dies wirklich tun möchten, führen Sie iterativ den Code in runJob aus und verwenden Sie einen temporären Dateinamen für die Eingabe. Sie können this question on moving files in hadoop sehen, um dies zu erreichen. Hier finden Sie eine Filesystem-Instanz und eine temporäre Datei für die Eingabe benötigen:

FileSystem fs = FileSystem.get(new Configuration()); 
Path tempInputPath = Paths.get('/user/th/kmeans/tmp_input'; 

Grob gesagt, nach jeder Iteration beendet ist, zum ersten Iteration

fs.delete(tempInputPath) 
fs.rename(outputPath, tempInputPath) 

Natürlich haben Sie die Eingabe festlegen müssen Pfad zu den Eingabepfaden, die beim Ausführen des Jobs bereitgestellt werden. Nachfolgende Iterationen können den TempInputPath verwenden, der die Ausgabe der vorherigen Iteration darstellt.

+0

Hallo vielen Dank für die Antwort, wie gehe ich über den Code in Runjob Iterieren? – th308

+0

Sie wickeln die notwendigen Teile des Codes in runJob in eine normale for-Schleife. –