2016-04-07 20 views
1

Ich würde gerne wissen, ist es möglich, dass wir Cartesian in Cascading teilnehmen können. Wenn jemand ein einfaches, anschauliches Beispiel geben kann, um zu verstehen, dass Cartesian sich an Kaskadierung anschließt?Implementieren Cartesian Join in Cascading

+0

Überprüfen Sie diesen Eintrag: http://stackoverflow.com/questions/14681506/cartesian-product-in-cascading – chinglun

Antwort

0

Mit dem folgenden Unterbaugruppe für kartesische tun Join:

/** 
* Created by dhruv.pancholi on 16/01/17. 
*/ 
public class CartesianJoin extends SubAssembly { 

    public static class CommonFieldAddOperation extends BaseOperation implements Function, Serializable { 

     public CommonFieldAddOperation(Fields outputFields) { 
      super(outputFields); 
     } 

     @Override 
     public void operate(FlowProcess flowProcess, FunctionCall functionCall) { 
      TupleEntry arguments = functionCall.getArguments(); 

      // Copying the same tuple from input 
      Tuple tuple = new Tuple(arguments.getTuple()); 

      // Adding 1 for joining on this field 
      tuple.add(1); 

      functionCall.getOutputCollector().add(tuple); 
     } 
    } 

    public CartesianJoin(Pipe leftPipe, Fields leftFields, Pipe rightPipe, Fields rightFields) { 

     // Adding 1 at the end of each tuple for joining 
     leftPipe = new Each(leftPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(leftFields, new Fields("cartesian_common"))), Fields.RESULTS); 

     // Adding 1 at the end of each tuple for joining 
     rightPipe = new Each(rightPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(rightFields, new Fields("cartesian_common_"))), Fields.RESULTS); 

     // Joining on the 1 which was added in both the pipes 
     Pipe joinPipe = new CoGroup(leftPipe, new Fields("cartesian_common"), rightPipe, new Fields("cartesian_common_"), new InnerJoin()); 

     // Keeping only the original fields 
     joinPipe = new Retain(joinPipe, Fields.merge(leftFields, rightFields)); 

     // Adding output pipe of the sub-assembly 
     setTails(joinPipe); 
    } 

} 

Verwenden Sie das folgende Codefragment in der Hauptfunktion oder wo Strömung definiert ist:

Pipe joinPipe = new CartesianJoin(leftPipe, new Fields("id", "name"), rightPipe, new Fields("id_", "name_")); 

leftPipe

id name 
1 dhruv 
3 arun 

rechts Tpipe

id_ name_ 
1 dhruv 
2 gaj 

joinPipe

id name id_ name_ 
3 arun 2 gaj 
3 arun 1 dhruv 
1 dhruv 2 gaj 
1 dhruv 1 dhruv