Hallo Leute, ich habe einen Datenrahmen, der jedes Datum auf dem neuesten Stand ist, jeden Tag muss ich die neue qte und die neue ca zu dem alten hinzufügen und aktualisieren Sie das Datum. Also muss ich die, die aktualisieren, die bereits bestehenden und der neuen ones.Here ein Beispiel hinzufügen, was ichJoin Datenrahmen und führen Sie die Operation
val histocaisse = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
val hist = histocaisse
.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
val histocaisse2 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")
val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
hist2.show(false)
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-07|2.5 |3.5 |
|2 |2 |2000-01-07|14.7|12.0|
|3 |3 |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|2.5 |3.5 |
|2 |2 |2000-01-08|14.7|12.0|
|3 |3 |2000-01-08|3.5 |1.2 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|5.0 |7.0 |
|2 |2 |2000-01-08|39.4|24.0|
|3 |3 |2000-01-08|7.0 |2.4 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
Hier am Ende haben möchte, was ich
val histoCombinaison2=hist2.join(hist,Seq("article_id","pos_id"),"left")
.groupBy("article_id","pos_id").agg((hist2("qte")+hist("qte")) as ("qte"),(hist2("ca")+hist("ca")) as ("ca"),hist2("date"))
histoCombinaison2.show()
tat und ich bekam die folgende Ausnahme
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`qte`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:218)
@LLi, ich habe getestet, wenn die Tabelle hist ist em Pty ich habe einen Fehler, wie würde der Code sein, wenn die Tabelle hist leer, und ich möchte nur Element in der Tabelle hist2 hinzufügen? – maher