1

I 2 Datenrahmen habenSpaltenwerte in Funken Einstürzen Datenrahmen

case class UserTransactions(id: Long, transactionDate: java.sql.Date, currencyUsed: String, value: Long) 

ID, TransactionDate, CurrencyUsed, value 
1, 2016-01-05, USD, 100 
1, 2016-01-09, GBP, 150 
1, 2016-02-01, USD, 50 
1, 2016-02-10, JPN, 10 
2, 2016-01-10, EURO, 50 
2, 2016-01-10, GBP, 100 

case class ReportingTime(userId: Long, reportDate: java.sql.Date) 

userId, reportDate 
1, 2016-01-05 
1, 2016-01-31 
1, 2016-02-15 
2, 2016-01-10 
2, 2016-02-01 

Jetzt möchte ich Zusammenfassung erhalten, indem alle bisher verwendeten Währungen von userId, reportDate und sum kombiniert. Die Ergebnisse sollten wie folgt aussehen:

userId, reportDate, trasactionSummary 
1, 2016-01-05, None 
1, 2016-01-31, (USD -> 100)(GBP-> 150) // combined above 2 transactions less than 2016-01-31 
1, 2016-02-15, (USD -> 150)(GBP-> 150)(JPN->10) // combined transactions less than 2016-02-15 
2, 2016-01-10, None 
2, 2016-02-01, (EURO-> 50) (GBP-> 100) 

Was ist der beste Weg, dies zu tun? Wir haben über 300 Millionen Transaktionen, bei denen jeder Benutzer bis zu 10.000 Transaktionen haben kann.

+0

In Ihrem Beispiel Ausgabe, warum zeigen Sie 'None' für' reportDate's, die die entsprechen 1. Transaktion im 'UserTransactions' Dataframe? Möchten Sie die erste Transaktion immer überspringen? – Metropolis

+0

Wie in der ersten Transaktion Benutzer hat keine Geschichte so in der Zusammenfassung zeigt es keine und aus der zweiten Transaktion wird es eine Zusammenfassung haben wie (USD -> 100) (GBP-> 150) – Rahul

Antwort

2

Das folgende Snippet würde Ihre Anforderung erfüllen. Die erste Verbindung und Aggregation erfolgt über die Dataframe-API von pyspark. Dann wird die Gruppierung der Daten (unter Verwendung von reduceByKey) und die abschließende Erstellung des Datensatzes über RDD api durchgeführt, da dies für solche Operationen geeigneter ist.

from datetime import datetime 
from pyspark.sql.functions import udf 
from pyspark.sql.types import DateType 
from pyspark.sql import functions as F 

df1 = spark.createDataFrame([(1,'2016-01-05','USD',100), 
(1,'2016-01-09','GBP',150), 
(1,'2016-02-01','USD',50), 
(1,'2016-02-10','JPN',10), 
(2,'2016-01-10','EURO',50), 
(2,'2016-01-10','GBP',100)],['id', 'tdate', 'currency', 'value']) 

df2 = spark.createDataFrame([(1,'2016-01-05'), 
(1,'2016-01-31'), 
(1,'2016-02-15'), 
(2,'2016-01-10'), 
(2,'2016-02-01')],['user_id', 'report_date']) 


func = udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType()) ### function to convert string data type to date data type 

df2 = df2.withColumn('tdate', func(df2.report_date)) 
df1 = df1.withColumn('tdate', func(df1.tdate)) 
result = df2.join(df1, (df1.id == df2.user_id) & (df1.tdate < df2.report_date), 'left_outer').select('user_id', 'report_date', 'currency', 'value').groupBy('user_id', 'report_date', 'currency').agg(F.sum('value').alias('value')) 

data = result.rdd.map(lambda x: (x.user_id,x.report_date,x.currency,x.value)).keyBy(lambda x: (x[0],x[1])).mapValues(lambda x: filter(lambda x: bool(x),[(x[2],x[3]) if x[2] else None])).reduceByKey(lambda x,y: x + y).map(lambda x: (x[0][0],x[0][1], x[1])) 

Das Endergebnis wird wie folgt angezeigt.

>>> spark.createDataFrame([ (x[0],x[1],str(x[2])) for x in data.collect()], ['id', 'date', 'values']).orderBy('id', 'date').show(20, False) 
+---+----------+--------------------------------------------+ 
|id |date  |values          | 
+---+----------+--------------------------------------------+ 
|1 |2016-01-05|[]           | 
|1 |2016-01-31|[(u'USD', 100), (u'GBP', 150)]    | 
|1 |2016-02-15|[(u'USD', 150), (u'GBP', 150), (u'JPN', 10)]| 
|2 |2016-01-10|[]           | 
|2 |2016-02-01|[(u'EURO', 50), (u'GBP', 100)]    | 
+---+----------+--------------------------------------------+ 
+0

Großartig es funktionierte | 1 | 2016-01- 05 | Karte() | | 1 | 2016-01-31 | Karte (USD -> 100, GBP -> 150) | | 1 | 2016-02-15 | Karte (USD -> 150, GBP -> 150, JPN -> 10) | | 2 | 2016-01-10 | Karte() | | 2 | 2016-02-01 | Karte (EURO -> 50, GBP -> 100) | + --- + ---------- + --------------------------------- ----- + – Rahul

1

Falls jemand in Scala braucht

case class Transaction(id: String, date: java.sql.Date, currency:Option[String], value: Option[Long]) 
case class Report(id:String, date:java.sql.Date) 

def toDate(date: String): java.sql.Date = { 
    val sf = new SimpleDateFormat("yyyy-MM-dd") 
    new java.sql.Date(sf.parse(date).getTime) 
} 

val allTransactions = Seq(
    Transaction("1", toDate("2016-01-05"),Some("USD"),Some(100L)), 
    Transaction("1", toDate("2016-01-09"),Some("GBP"),Some(150L)), 
    Transaction("1",toDate("2016-02-01"),Some("USD"),Some(50L)), 
    Transaction("1",toDate("2016-02-10"),Some("JPN"),Some(10L)), 
    Transaction("2",toDate("2016-01-10"),Some("EURO"),Some(50L)), 
    Transaction("2",toDate("2016-01-10"),Some("GBP"),Some(100L)) 
) 
val allReports = Seq(
    Report("1",toDate("2016-01-05")), 
    Report("1",toDate("2016-01-31")), 
    Report("1",toDate("2016-02-15")), 
    Report("2",toDate("2016-01-10")), 
    Report("2",toDate("2016-02-01")) 
) 

val transections:Dataset[Transaction] = spark.createDataFrame(allTransactions).as[Transaction] 
val reports: Dataset[Report] = spark.createDataFrame(allReports).as[Report] 

val result = reports.alias("rp").join(transections.alias("tx"), (col("tx.id") === col("rp.id")) && (col("tx.date") < col("rp.date")), "left_outer") 
    .select("rp.id", "rp.date", "currency", "value") 
    .groupBy("rp.id", "rp.date", "currency").agg(sum("value")) 
    .toDF("id", "date", "currency", "value") 
    .as[Transaction] 

val data = result.rdd.keyBy(x => (x.id , x.date)) 
    .mapValues(x => if (x.currency.isDefined) collection.Map[String, Long](x.currency.get -> x.value.get) else collection.Map[String, Long]()) 
    .reduceByKey((x,y) => x ++ y).map(x => (x._1._1, x._1._2, x._2)) 
    .toDF("id", "date", "map") 
    .orderBy("id", "date") 

Konsolenausgabe

+---+----------+--------------------------------------+ 
|id |date  |map         | 
+---+----------+--------------------------------------+ 
|1 |2016-01-05|Map()         | 
|1 |2016-01-31|Map(GBP -> 150, USD -> 100)   | 
|1 |2016-02-15|Map(USD -> 150, GBP -> 150, JPN -> 10)| 
|2 |2016-01-10|Map()         | 
|2 |2016-02-01|Map(GBP -> 100, EURO -> 50)   | 
+---+----------+--------------------------------------+ 
Verwandte Themen