2016-05-12 8 views
0

Ich muss Bloom-Filter in der Reduce Side Join-Algorithmus verwenden, um eine meiner Eingaben zu filtern, aber ich habe ein Problem mit der Funktion readFields, die den Input-Stream eines verteilten Cache (Bloom-Filter) deserialisieren in einen Blütenfilter.Bloom-Filter in MapReduce

public class BloomJoin { 

    //function map : input transaction.txt 
    public static class TransactionJoin extends 
      Mapper<LongWritable, Text, Text, Text> { 

     private Text CID=new Text(); 
     private Text outValue=new Text(); 

     public void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 

      String line = value.toString(); 
       String record[] = line.split(",", -1); 
       CID.set(record[1]); 

       outValue.set("A"+value); 
       context.write(CID, outValue); 
       } 
     } 
    //function map : input customer.txt 
      public static class CustomerJoinMapper extends 
        Mapper<LongWritable, Text, Text, Text> { 

       private Text outkey=new Text(); 
       private Text outvalue = new Text(); 
       private BloomFilter bfilter = new BloomFilter(); 
       public void setup(Context context) throws IOException { 

        URI[] files = DistributedCache 
          .getCacheFiles(context.getConfiguration()); 

        // if the files in the distributed cache are set 
        if (files != null) { 
        System.out.println("Reading Bloom filter from: " 
        + files[0].getPath()); 
        // Open local file for read. 

        DataInputStream strm = new DataInputStream(new FileInputStream(
        files[0].toString())); 
        bfilter.readFields(strm); 
        strm.close(); 

        // Read into our Bloom filter. 

        } else { 
        throw new IOException(
        "Bloom filter file not set in the DistributedCache."); 
        } 
       }; 

       public void map(LongWritable key, Text value, Context context) 
         throws IOException, InterruptedException { 
        String line = value.toString(); 
        String record[] = line.split(",", -1); 

         outkey.set(record[0]); 
         if (bfilter.membershipTest(new Key(outkey.getBytes()))) { 
         outvalue.set("B"+value); 
         context.write(outkey, outvalue); 
         } 
      } 
      } 

    //function reducer: join customer with transaction 
    public static class JoinReducer extends 
      Reducer<Text, Text, Text, Text> { 

     private ArrayList<Text> listA = new ArrayList<Text>(); 
     private ArrayList<Text> listB = new ArrayList<Text>(); 


     @Override 
     public void reduce(Text key, Iterable<Text> values, Context context) 
       throws IOException, InterruptedException { 

      listA.clear(); 
      listB.clear(); 

        for (Text t : values) { 
       if (t.charAt(0) == 'A') { 
        listA.add(new Text(t.toString().substring(1))); 
        System.out.println("liste A: "+listA); 
       } else /* if (t.charAt('0') == 'B') */{ 
        listB.add(new Text(t.toString().substring(1))); 
        System.out.println("listeB :"+listB); 
       } 
      } 

      executeJoinLogic(context); 
     } 

     private void executeJoinLogic(Context context) throws IOException, 
       InterruptedException { 
       if (!listA.isEmpty() && !listB.isEmpty()) { 
        for (Text A : listB) { 
         for (Text B : listA) { 
          context.write(A, B); 
          System.out.println("A="+A+",B="+B); 
         } 
        } 
       } 

     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 
     Path bloompath=new Path("/user/biadmin/ezzaki/bloomfilter/output/part-00000"); 
     DistributedCache.addCacheFile(bloompath.toUri(),conf); 
     Job job = new Job(conf, "Bloom Join"); 
     job.setJarByClass(BloomJoin.class); 
     String[] otherArgs = new GenericOptionsParser(conf, args) 
     .getRemainingArgs(); 
     if (otherArgs.length != 3) { 
    System.err 
      .println("ReduceSideJoin <Transaction data> <Customer data> <out> "); 
    System.exit(1); 
            } 
     MultipleInputs.addInputPath(job, new Path(otherArgs[0]), 
       TextInputFormat.class,TransactionJoin.class); 
     MultipleInputs.addInputPath(job, new Path(otherArgs[1]), 
       TextInputFormat.class, CustomerJoinMapper.class); 

     job.setReducerClass(JoinReducer.class); 

     FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); 
     //job.setMapOutputKeyClass(Text.class); 
     //job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 
     System.exit(job.waitForCompletion(true) ? 0 : 3); 
    } 
} 

Wie kann ich dieses Problem lösen?

+0

Bitte fügen Sie den Code und die Ausnahme des Fehlers zu Ihrer Frage ... –

+0

Ich habe den Code meiner Klasse und das Fehlerbild hinzugefügt, wenn es nicht erscheint es ist: java.io.EOFException: at java.io. DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/bei org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

Antwort

1

Können Sie versuchen

URI[] files = DistributedCache.getCacheFiles(context.getConfiguration()); 

zu

Path[] cacheFilePaths = DistributedCache.getLocalCacheFiles(conf); 
for (Path cacheFilePath : cacheFilePaths) {  
    DataInputStream fileInputStream = fs.open(cacheFilePath); 
} 
bloomFilter.readFields(fileInputStream); 
fileInputStream.close(); 

auch ändern, ich glaube, Sie verwenden Karte Seite beitreten und nicht die Seite verkleinern, da Sie den verteilten Cache in Mapper verwenden.

+0

danke für deine Antwort aber das Problem immer noch mit der Funktion: readFields habe ich in exception: java.io.EOFException: at java.io.DataInputStream.readInt/at org.apache.hadoop.util.bloom.Filter.readFields/at org.apache.hadoop.util.bloom.BloomFilter.readFields – Fatiso

+0

dies ist ein reduzierter Side Join, da der Join auf der Verkleinerungsseite durchgeführt wird und ich versuche, eine meiner Eingaben auf der Kartenseite vor dem Senden des PKV an den Reducer zu filtern – Fatiso