0

Ich bin neu in Spark-Scala und versuche folgendes Ding, aber ich bin stecken geblieben und nicht auf, wie zu erreichen diese Anforderung. Ich werde wirklich dankbar sein, wenn jemand in dieser Hinsicht wirklich helfen kann.Wie kann ich die gleiche Scala-Funktion in kaskadierender Weise dynamisch aufrufen mit Ausgabe des vorherigen Anrufs geht als Eingabe für den nächsten Anruf

1) Wir müssen verschiedene Regeln für verschiedene Spalten der angegebenen Tabelle aufrufen. Die Liste der Spaltennamen und Regeln wird als Argument an das Programm übergeben. 2) Die Resultierende der ersten Regel sollte als Eingabe für die nächste Regeleingabe verwendet werden.

Frage: Wie kann ich exec() -Funktion in kaskadierender Weise mit dem dynamischen Füllen der Argumente für so viele Regeln wie in Argumenten angegeben ausführen.

Ich habe einen Code wie folgt entwickelt.

Objekt Regeln {

def main(args: Array[String]) = { 

     if (args.length != 3) { 
      println("Need exactly 3 arguments in format : <sourceTableName> <destTableName> <[<colName>=<Rule> <colName>=<Rule>,...") 
      println("E.g : INPUT_TABLE OUTPUT_TABLE [NAME=RULE1,ID=RULE2,TRAIT=RULE3]"); 
      System.exit(-1) 
     } 
     val conf = new SparkConf().setAppName("My-Rules").setMaster("local"); 
     val sc = new SparkContext(conf); 
     val srcTableName = args(0).trim(); 
     val destTableName = args(1).trim(); 
     val ruleArguments = StringUtils.substringBetween(args(2).trim(), "[", "]"); 
     val businessRuleMappings = ruleArguments.split(",").map(_.split("=")).map(arr => arr(0) -> arr(1)).toMap; 

     val sqlContext : SQLContext = new org.apache.spark.sql.SQLContext(sc) ; 
     val hiveContext : HiveContext = new org.apache.spark.sql.hive.HiveContext(sc);   
     val dfSourceTbl = hiveContext.table("TEST.INPUT_TABLE"); 

     def exec(dfSource: DataFrame,columnName :String ,funName: String): DataFrame = { 
       funName match { 
       case "RULE1" => TransformDF(columnName,dfSource,RULE1); 
       case "RULE2" => TransformDF(columnName,dfSource,RULE2); 
       case "RULE3" => TransformDF(columnName,dfSource,RULE3); 
       case _ =>dfSource; 
       } 
     } 

     def TransformDF(x:String, df:DataFrame, f:(String,DataFrame)=>DataFrame) : DataFrame = { 
       f(x,df); 
     } 

     def RULE1(column : String, sourceDF: DataFrame): DataFrame = { 
       //put businees logic 
       return sourceDF; 
     } 

     def RULE2(column : String, sourceDF: DataFrame): DataFrame = { 
       //put businees logic 
       return sourceDF; 
     } 

     def RULE3(column : String,sourceDF: DataFrame): DataFrame = { 
       //put businees logic 
       return sourceDF; 
     } 

     // How can I call this exec() function with output casacing and arguments for variable number of rules. 
     val finalResultDF = exec(exec(exec(dfSourceTbl,"NAME","RULE1"),"ID","RULE2"),"TRAIT","RULE3); 

     finalResultDF.write.mode(org.apache.spark.sql.SaveMode.Append).insertInto("DB.destTableName")  
} 

}

+0

Tut m y Antwort hat dir geholfen? Wenn ja, bitte akzeptieren Sie es –

Antwort

0

ich alle Regeln schreiben würde als Funktionen ein Datenrahmen in einen anderen verwandeln:

val rules: Seq[(DataFrame) => DataFrame] = Seq(
    RULE1("NAME",_:DataFrame), 
    RULE2("ID",_:DataFrame), 
    RULE3("TRAIT",_:DataFrame) 
) 

Nicht können Sie sie anwenden Falten mit

val finalResultDF = rules.foldLeft(dfSourceTbl)(_ transform _) 
Verwandte Themen