2016-10-03 1 views
0

Ich habe SJS für mein Projekt und möchte wissen, wie NamedDataFrame von SJS funktioniert. Mein erstes Programm tut diesWie benannteDataFrame von Spark-Job-Server

val schemaString = "parm1:int,parm2:string,parm3:string,parm4:string,parm5:int,parm6:string,parm7:int,parm8:int" 
val schema = StructType(schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0), getFieldTypeInSchema(fieldName.split(":")(1)),true))) 

val eDF1 = hive.applySchema(rowRDD1, schema) 
this.namedObjects.getOrElseCreate("edf1", new NamedDataFrame(eDF1, true, StorageLevel.MEMORY_ONLY)) 

Mein zweites Programm macht dies die Datenrahmen abzurufen.

val eDF1: Option[NamedDataFrame] = this.namedObjects.get("eDF1") 

Hier kann ich nur Option verwenden. Wie muss ich NamedDataFrame in einen Spark DataFrame umwandeln?

Ist etwas von diesem Äquivalent verfügbar?

this.namedObjects.get[(Int,String,String,String,Int,String,Int,Int)]("eDF1") 

Danke !!

Edit1: Um genau zu sein, ohne SJS Persistenz, dies auf der df getan werden könnte

eDF1.filter(eDF1.col("parm1")%2!==0) 

Wie kann ich die gleiche Operation aus einer gespeicherten NamedObject durchführen?

Antwort

0
+0

Das obige Beispiel sagt nicht, wie man einen Dataframe abruft. Hier sind einige Zeilen von Ihrem Beispiel. Kannst du sagen, wie man df1 erhält, während man den StructType behält? val = struct StructType ( StructField ("i", IntegerType, true) :: StructField ("b", BooleanType, false) :: Nil) val df = sqlContext.createDataFrame (Zeilen (sc), Struktur) namedObjects.update ("df1", NamedDataFrame (df, true, StorageLevel.MEMORY_AND_DISK)) – user1384205

0

folgende Arbeiten auf NamedDataFrame

Job1

this.namedObjects.getOrElseCreate("df:esDF1", new NamedDataFrame(eDF1, true, StorageLevel.MEMORY_ONLY)) 

Job2

val NamedDataFrame(eDF1, _, _) = namedObjects.get[NamedDataFrame]("df:esDF1").get 

Jetzt kann ich auf EDF1 auf dem zweiten betreiben Job als Spark-Datenframe.