2017-04-24 9 views
0

Ich muss neue Spalte mit dem Wert von UUID hinzufügen. Ich habe dies mit Spark 1.4 Java mit folgendem Code getan.SpakrSQL Generiere neue Spalte mit UUID

StructType objStructType = inputDataFrame.schema(); 
     StructField []arrStructField=objStructType.fields(); 
     List<StructField> fields = new ArrayList<StructField>(); 
     List<StructField> newfields = new ArrayList<StructField>(); 
     List <StructField> listFields = Arrays.asList(arrStructField); 
     StructField a = DataTypes.createStructField(leftCol,DataTypes.StringType, true); 
     fields.add(a); 
     newfields.addAll(listFields); 
     newfields.addAll(fields); 
     final int size = objStructType.size(); 

    JavaRDD<Row> rowRDD = inputDataFrame.javaRDD().map(new Function<Row, Row>() { 
     private static final long serialVersionUID = 3280804931696581264L; 
     public Row call(Row tblRow) throws Exception { 

       Object[] newRow = new Object[size+1]; 
       int rowSize= tblRow.length(); 
       for (int itr = 0; itr < rowSize; itr++) 
       { 
        if(tblRow.apply(itr)!=null) 
        { 
         newRow[itr] = tblRow.apply(itr); 
        } 

       } 
       newRow[size] = UUID.randomUUID().toString(); 
       return RowFactory.create(newRow); 

     } 
    }); 



    inputDataFrame = objsqlContext.createDataFrame(rowRDD, DataTypes.createStructType(newfields)); 

Ich frage mich, ob es in Spark Spark 2 einen guten Weg gibt. Bitte um Rat.

Antwort

0

Sie können udf registrieren, um UUID abzurufen, und callUDF verwenden, um eine neue Spalte zu Ihrer inputDataFrame hinzuzufügen. Bitte beachten Sie den Beispielcode mit Spark 2.0.

public class SparkUUIDSample { 
public static void main(String[] args) { 
    SparkSession spark = SparkSession.builder().appName("SparkUUIDSample").master("local[*]").getOrCreate(); 
    //sample input data 
    List<Tuple2<String, String>> inputList = new ArrayList<Tuple2<String, String>>(); 
    inputList.add(new Tuple2<String, String>("A", "v1")); 
    inputList.add(new Tuple2<String, String>("B", "v2")); 
    //dataset 
    Dataset<Row> df = spark.createDataset(inputList, Encoders.tuple(Encoders.STRING(), Encoders.STRING())).toDF("key", "value"); 
    df.show(); 
    //register udf 
    UDF1<String, String> uuid = str -> UUID.randomUUID().toString(); 
    spark.udf().register("uuid", uuid, DataTypes.StringType); 
    //call udf 
    df.select(col("*"), callUDF("uuid", col("value"))).show(); 
    //stop 
    spark.stop(); 
    }  
}