2016-05-16 5 views
1

Vom Code unten habe ich nicht verstanden 2 Dinge:Nicht zu verstehen, den Weg in verteilten Pfad

  1. DistributedCache.addcachefile(new URI ('/abc.dat'), job.getconfiguration())

Ich verstand nicht, URI-Pfad im HDFS vorhanden sein muss. Korrigiere mich, wenn ich falsch liege.

  1. Und was ist p.getname().equals() aus dem unten stehenden Code:

    public class MyDC { 
    
    public static class MyMapper extends Mapper < LongWritable, Text, Text, Text > { 
    
        private Map < String, String > abMap = new HashMap < String, String >(); 
    
        private Text outputKey = new Text(); 
    
        private Text outputValue = new Text(); 
    
        protected void setup(Context context) throws 
        java.io.IOException, InterruptedException { 
    
         Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
    
         for (Path p: files) { 
    
          if (p.getName().equals("abc.dat")) { 
    
           BufferedReader reader = new BufferedReader(new FileReader(p.toString())); 
    
           String line = reader.readLine(); 
    
           while (line != null) { 
    
            String[] tokens = line.split("\t"); 
    
            String ab = tokens[0]; 
    
            String state = tokens[1]; 
    
            abMap.put(ab, state); 
    
            line = reader.readLine(); 
    
           } 
    
          } 
    
         } 
    
         if (abMap.isEmpty()) { 
    
          throw new IOException("Unable to load Abbrevation data."); 
    
         } 
    
        } 
    
        protected void map(LongWritable key, Text value, Context context) 
        throws java.io.IOException, InterruptedException { 
    
         String row = value.toString(); 
    
         String[] tokens = row.split("\t"); 
    
         String inab = tokens[0]; 
    
         String state = abMap.get(inab); 
    
         outputKey.set(state); 
    
         outputValue.set(row); 
    
         context.write(outputKey, outputValue); 
    
        } 
    
    } 
    
    public static void main(String[] args) 
    throws IOException, ClassNotFoundException, InterruptedException { 
    
        Job job = new Job(); 
    
        job.setJarByClass(MyDC.class); 
    
        job.setJobName("DCTest"); 
    
        job.setNumReduceTasks(0); 
    
        try { 
    
         DistributedCache.addCacheFile(new URI("/abc.dat"), job.getConfiguration()); 
    
        } catch (Exception e) { 
    
         System.out.println(e); 
    
        } 
    
        job.setMapperClass(MyMapper.class); 
    
        job.setMapOutputKeyClass(Text.class); 
    
        job.setMapOutputValueClass(Text.class); 
    
    
        FileInputFormat.addInputPath(job, new Path(args[0])); 
    
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    
        job.waitForCompletion(true); 
    
    } 
    
    } 
    

Antwort

0

DistributedCache ist eine API, die eine Datei oder eine Gruppe von Dateien im Speicher hinzuzufügen, verwendet wird, und wird für jeden verfügbar sein Datenknoten, ob das Map-Reduce funktioniert. Ein Beispiel für die Verwendung von DistributedCache sind kartenseitige Joins.

DistributedCache.addcachefile (neue URI ('/abc.dat'), job.getconfiguration()) wird die Datei abc.dat im Cache-Bereich hinzufügen. Es kann n Dateien im Cache geben und p.getName(). Equals ("abc.dat")) prüft die Datei, die Sie benötigen. Jeder Pfad in HDFS wird unter Pfad [] für die Map-Reduce-Verarbeitung verwendet. Zum Beispiel:

FileInputFormat.addInputPath(job, new Path(args[0])); 

FileOutputFormat.setOutputPath(job, new Path(args[1])); 

der erste Pfad (args [0]) ist das erste Argument (Eingabedatei location) Sie passieren, während Jar Ausführung und Weg (args [1]) ist das zweite Argument, das die Ausgabedatei Lage. Alles wird als Pfad-Array genommen.

Auf die gleiche Weise, wenn Sie eine Datei zum Zwischenspeichern hinzufügen, wird sie im Path-Array, das Sie abrufen möchten, mit dem unten stehenden Code angeordnet.

Pfad [] files = DistributedCache.getLocalCacheFiles (context.getConfiguration());

Es wird alle Dateien im Cache zurückgeben und Sie werden Ihren Dateinamen durch p.getName(). Equals() -Methode.

+0

Danke ishan dafür !! Es war eine große Hilfe – Sri

1

Die Idee der verteilten Cache ist auf einige statischen Daten zur Verfügung, um die Aufgabe Knoten zu machen, bevor es beginnt seine Ausführung.

Datei hat in HDFS vorhanden sein, so dass es dann zu dem Distributed Cache (zu jedem Task-Knoten)

DistributedCache.getLocalCacheFile im Grunde die Cache-Dateien werden alle in diesem Task-Knoten hinzufügen kann. Unter if (p.getName().equals("abc.dat")) { erhalten Sie die entsprechende Cache-Datei, die von Ihrer Anwendung verarbeitet werden soll.

Bitte beachten Sie die Dokumente unter:

https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#DistributedCache

https://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/filecache/DistributedCache.html#getLocalCacheFiles(org.apache.hadoop.conf.Configuration)

+0

Danke für die wunderbare Antwort !! Ich habe es klar verstanden !! – Sri

Verwandte Themen