2012-07-02 10 views
5

Ich mag es, mehrere Tupel aus einem einzigen Tupel zu generieren. Was ich meine ist: Ich habe Datei mit folgenden Daten drin.Teilen eines Tupels in mehrere Tupel in Pig

>> cat data 
ID | ColumnName1:Value1 | ColumnName2:Value2 

so lade ich es durch den folgenden Befehl

grunt >> A = load '$data' using PigStorage('|');  
grunt >> dump A;  
(ID,ColumnName1:Value1,ColumnName2:Value2) 

Jetzt möchte ich dieses Tupel in zwei Tupel spalten.

(ID, ColumnName1, Value1) 
(ID, ColumnName2, Value2) 

Kann ich UDF zusammen mit foreach und generieren. Etwas wie das Folgende?

grunt >> foreach A generate SOMEUDF(A) 

EDIT:

Eingangstupel: (id1, column1, column2) Ausgang: zwei Tupel (id1, column1) und (ID2, column2), so dass es Liste ist oder sollte ich eine Tasche zurückgeben?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>> 
{ 
    public List<Tuple> exec(Tuple input) throws IOException { 
     if (input == null || input.size() == 0) 
      return null; 
     try{ 
      // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory. 
      // return list of tuples. 
     }catch(Exception e){ 
      throw WrappedIOException.wrap("Caught exception processing input row ", e); 
     } 
    } 
} 

Ist diese Vorgehensweise korrekt?

Antwort

10

Sie könnten ein UDF schreiben oder einen PIG-Skript mit integrierten Funktionen nutzen.

Zum Beispiel:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example 
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 

-- split by | and create a row so we can dereference it later 
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; 

-- first column is id, rest is converted into a bag and flatten it to make rows 
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; 
-- there will be records with (id, id), but id should not have ':' 
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); 
dump final; 

Testeingang:

1|c1:11:33|c2:12 
234|c1:21|c2:22 
33|c1:31|c2:32 
345|c1:41|c2:42 

OUTPUT

(1,c1,11:33) 
(1,c2,12) 
(234,c1,21) 
(234,c2,22) 
(33,c1,31) 
(33,c2,32) 
(345,c1,41) 
(345,c2,42) 

Ich hoffe, es hilft.

Prost.

+0

Vielen Dank. Kann ich das Gleiche tun, wenn ich eine UDF schreibe? Ich aktualisiere die Frage. – FourOfAKind

+0

Ja können Sie. Siehe nächste Antwort. – alexeipab

+0

Seine große Hilfe. Vielen Dank für Ihre Zeit. – FourOfAKind

6

Hier ist die UDF-Version. Ich bevorzuge BAG zurückzukehren:

import java.io.IOException; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

/** 
* Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
* {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} 
* 
* Default rows separator is '|' and key value separator is ':'. 
* In this implementation white spaces around separator characters are not removed. 
* ID can be made of any character (including sequence of white spaces). 
* @author 
* 
*/ 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 
    private static final BagFactory bagFactory = BagFactory.getInstance(); 

    //Row separator character. Default is '|'. 
    private String rowsSeparator; 
    //Column value separator character. Default i 
    private String columnValueSeparator; 

    public TupleToBagColumnValuePairs() { 
     this.rowsSeparator = "\\|"; 
     this.columnValueSeparator = ":"; 
    } 

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { 
     this.rowsSeparator = rowsSeparator; 
     this.columnValueSeparator = keyValueSeparator; 
    } 

    /** 
    * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) 
    * @param outputBag Output tuples (id, column, value) are added to this bag 
    * @param id 
    * @param column 
    * @param value 
    * @throws ExecException 
    */ 
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { 
     Tuple outputTuple = tupleFactory.newTuple(); 
     outputTuple.append(id); 
     outputTuple.append(column); 
     outputTuple.append(value); 
     outputBag.add(outputTuple); 
    } 

    /** 
    * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) 
    * @param outputBag Output tuples (id, column, value) should be added to this bag 
    * @param id 
    * @param splitInputLine format column{separator}value, which start from index 1 
    * @throws ExecException 
    */ 
    protected void parseColumnValues(DataBag outputBag, String id, 
      String[] splitInputLine) throws ExecException { 
     for (int i = 1; i < splitInputLine.length; i++) { 
      if (splitInputLine[i] != null) { 
       int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); 
       if (columnValueSplitIndex != -1) { 
        String column = splitInputLine[i].substring(0, columnValueSplitIndex); 
        String value = null; 
        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { 
         value = splitInputLine[i].substring(columnValueSplitIndex + 1); 
        } 
        this.addTuple(outputBag, id, column, value); 
       } else { 
        String column = splitInputLine[i]; 
        this.addTuple(outputBag, id, column, null); 
       } 
      } 
     } 
    } 

    /** 
    * input - contains only one field of type chararray, which will be split by '|' 
    * All inputs that are: null or of length 0 are ignored. 
    */ 
    @Override 
    public DataBag exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 1 || input.isNull(0)) { 
      return null; 
     } 

     String inputLine = (String)input.get(0); 
     String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); 

     if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { 
      String id = splitInputLine[0]; 
      DataBag outputBag = bagFactory.newDefaultBag();    
      if (splitInputLine.length == 1) { // there is just an id in the line 
       this.addTuple(outputBag, id, null, null); 
      } else { 
       this.parseColumnValues(outputBag, id, splitInputLine); 
      } 


      return outputBag; 
     } 
     return null; 
    } 

    @Override 
    public Schema outputSchema(Schema input) { 
     try { 
      if (input.size() != 1) { 
       throw new RuntimeException("Expected input to have only one field"); 
      } 

      Schema.FieldSchema inputFieldSchema = input.getField(0); 
      if (inputFieldSchema.type != DataType.CHARARRAY) { 
       throw new RuntimeException("Expected a CHARARRAY as input"); 
      } 

      Schema tupleSchema = new Schema(); 
      tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); 

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); 
     } catch (FrontendException exx) { 
      throw new RuntimeException(exx); 
     } 
    } 

} 

Hier ist, wie es in PIG verwendet wird:

register 'path to the jar'; 
define IdColumnValue myPackage.TupleToBagColumnValuePairs(); 

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); 
dump result; 

Eine gute Inspiration für UDF Schreiben mit Taschen siehe DataFu source code by LinkedIn

0

Sie TransposeTupleToBag verwenden könnte (UDF von DataFu lib) auf die Ausgabe von STRSPLIT, um die Tasche zu erhalten, und dann FLACHTEN die Tasche, um separate Zeile pro Originalspalte zu erstellen.

Verwandte Themen