2017-10-19 4 views
1

Hallo Leute, ich habe einen Datenrahmen, der jedes Datum auf dem neuesten Stand ist, jeden Tag muss ich die neue qte und die neue ca zu dem alten hinzufügen und aktualisieren Sie das Datum. Also muss ich die, die aktualisieren, die bereits bestehenden und der neuen ones.Here ein Beispiel hinzufügen, was ichJoin Datenrahmen und führen Sie die Operation

val histocaisse = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 
     .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

    val hist = histocaisse 
     .withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 



    val histocaisse2 = spark.read 
     .format("csv") 
     .option("header", "true") //reading the headers 

     .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

    val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType)) 
     .withColumn("article_id", 'pos_id.cast(LongType)) 
     .withColumn("date", 'date.cast(DateType)) 
     .withColumn("qte", 'qte.cast(DoubleType)) 
     .withColumn("ca", 'ca.cast(DoubleType)) 
    hist2.show(false) 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-07|2.5 |3.5 | 
|2  |2   |2000-01-07|14.7|12.0| 
|3  |3   |2000-01-07|3.5 |1.2 | 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|2.5 |3.5 | 
|2  |2   |2000-01-08|14.7|12.0| 
|3  |3   |2000-01-08|3.5 |1.2 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

+------+----------+----------+----+----+ 
|pos_id|article_id|date  |qte |ca | 
+------+----------+----------+----+----+ 
|1  |1   |2000-01-08|5.0 |7.0 | 
|2  |2   |2000-01-08|39.4|24.0| 
|3  |3   |2000-01-08|7.0 |2.4 | 
|4  |4   |2000-01-08|3.5 |1.2 | 
|5  |5   |2000-01-08|14.5|1.2 | 
|6  |6   |2000-01-08|2.0 |1.25| 
+------+----------+----------+----+----+ 

Hier am Ende haben möchte, was ich

val histoCombinaison2=hist2.join(hist,Seq("article_id","pos_id"),"left") 
     .groupBy("article_id","pos_id").agg((hist2("qte")+hist("qte")) as ("qte"),(hist2("ca")+hist("ca")) as ("ca"),hist2("date")) 

    histoCombinaison2.show() 

tat und ich bekam die folgende Ausnahme

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`qte`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58) 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218) 

Antwort

0
// import functions 
import org.apache.spark.sql.functions.{coalesce, lit} 

// we might not need groupBy, 
// since after join, all the information will be in the same row 
// so instead of using aggregate function, we simply combine the related fields as a new column. 
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left") 
    .select($"pos_id", $"article_id", 
    coalesce(hist2("date"), hist1("date")).alias("date"), 
    (coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"), 
    (coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca")) 
    .orderBy("pos_id", "article_id") 

// df.show() 
|pos_id|article_id|  date| qte| ca| 
+------+----------+----------+----+----+ 
|  1|   1|2000-01-08| 5.0| 7.0| 
|  2|   2|2000-01-08|29.4|24.0| 
|  3|   3|2000-01-08| 7.0| 2.4| 
|  4|   4|2000-01-08| 3.5| 1.2| 
|  5|   5|2000-01-08|14.5| 1.2| 
|  6|   6|2000-01-08| 2.0|1.25| 
+------+----------+----------+----+----+ 

Dank.

+0

@LLi, ich habe getestet, wenn die Tabelle hist ist em Pty ich habe einen Fehler, wie würde der Code sein, wenn die Tabelle hist leer, und ich möchte nur Element in der Tabelle hist2 hinzufügen? – maher

0

Wie ich Ihren Kommentar erwähnt haben, dass Sie Ihre schema definieren sollte und es verwenden, csv-dataframe als

import sqlContext.implicits._ 

import org.apache.spark.sql.types._ 
val schema = StructType(Seq(
    StructField("pos_id", LongType, true), 
    StructField("article_id", LongType, true), 
    StructField("date", DateType, true), 
    StructField("qte", LongType, true), 
    StructField("ca", DoubleType, true) 
)) 

val hist1 = sqlContext.read 
    .format("csv") 
    .option("header", "true") 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv") 

hist1.show 

val hist2 = sqlContext.read 
    .format("csv") 
    .option("header", "true") //reading the headers 
    .schema(schema) 
    .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv") 

hist2.show 

Dann lesen, sollten Sie when Funktion verwenden, um die Logik, die Sie als

implementieren müssen zu definieren,
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left") 
    .select($"pos_id", $"article_id", 
    when(hist2("date").isNotNull, hist2("date")).otherwise(when(hist1("date").isNotNull, hist1("date")).otherwise(lit(null))).alias("date"), 
    (when(hist2("qte").isNotNull, hist2("qte")).otherwise(lit(0)) + when(hist1("qte").isNotNull, hist1("qte")).otherwise(lit(0))).alias("qte"), 
    (when(hist2("ca").isNotNull, hist2("ca")).otherwise(lit(0)) + when(hist1("ca").isNotNull, hist1("ca")).otherwise(lit(0))).alias("ca")) 

Ich hoffe, die Antwort ist hilfreich

Verwandte Themen