2017-02-18 7 views
0

Ich habe zwei .txt Datendateien. Der erste enthält zwei Spalten (Film, Kino) und der zweite enthält zwei Spalten (Film, Betrachter), wie im folgenden Beispiel gezeigt. Was ich tun möchte, ist finden Sie den Film, der in cinema_1 mit der maximalen Anzahl der Zuschauer gezeigt wurde.Verbinden Sie zwei Datenrahmen, summieren Sie die Werte und erhalten Sie das Maximum

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_1 | cinema_2 | 
| movie_2 | cinema_3 | 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_1 | 10 | 
| movie_2 | 98 | 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_1 | 340 | 
| movie_3 | 31 | 
+------+-------------+ 

das heißt in dem obigen Beispiel sind die beiden Kandidaten movie_3 und movie_4 (in cinema_1 gezeigt) und die richtige Antwort ist movie_4 mit 100 Ansichten (movie_3 während 50 (19 + 31) Ansichten).

Was ich bisher getan habe:

Schritt 1: Holen Sie sich die Daten

val moviesCinemas = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesCinemas.txt"); 

    val moviesViewers = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesViewers.txt"); 

Schritt 2: Lassen Sie sich die Filme in cinema_1 gezeigt

val cinema1Movies = moviesCinemas.filter(col("cinema").like("cinema_1")) 

führende zu:

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

Schritt 3: Jetzt für diese zwei Filme muss ich ihre Zuschauer zusammenfassen (vom Datenrahmen moviesViewers) und den mit der maximalen Zahl melden. Hier stecke ich eigentlich fest.

Ich habe versucht, die cinema1Movies und moviesViewers Datenrahmen

val joinMoviesViewers = moviesViewers.join(cinema1Movies, Seq("movie")) 

, die folgendes Ergebnis gibt zu verbinden: Ich bin nicht ganz sicher,

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_3 | 31 | 
+------+-------------+ 

Nun, wie die viewers für jede movie zusammenzufassen um so etwas zu bekommen (und endlich den Film mit den Max-Viewern zu bekommen):

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 50 | 
+------+-------------+ 

Antwort

1

Start aus dem Datenrahmen verbunden:

val aggJoin = joinMoviesViewers.groupBy("movie").agg(sum($"viewers").as("viewers")) 
// aggJoin: org.apache.spark.sql.DataFrame = [movie: string, viewers: bigint] 

val maxViewers = aggJoin.agg(max($"viewers")).first().getLong(0) 
// maxViewers: Long = 100 

// depending on what data type you have for viewers, you might use getDouble here 
// val maxViewers = aggJoin.agg(max($"viewers")).first().getDouble(0) 

aggJoin.filter($"viewers" === maxViewers).show 
+-------+-------+ 
| movie|viewers| 
+-------+-------+ 
|movie_4| 100| 
+-------+-------+ 
+0

Es funktioniert, aber anstelle von '.getLong (0)' add '.getDouble (0)'. Andernfalls erhalten Sie eine Ausnahme. Bitte bearbeiten Sie Ihre Antwort, um sie zu akzeptieren. Danke vielmals. –

1

Unten ist der API-Ansatz, um das Ergebnis abzuleiten.

import org.apache.spark.sql.functions._ 

val result = moviesCinemas 
    .filter($"cinema" === "cinema_1") 
    .join(moviesViewers, "movie") 
    .select(moviesCinemas("movie"),moviesViewers("viewers")) 
    .groupBy($"movie") 
    .agg(sum($"viewers").as("sum_cnt")) 
    .orderBy($"sum_cnt".desc) 

    result.first 
    res34: org.apache.spark.sql.Row = [movie_4,100] 

Das folgende verwendet spark sql, um das gleiche Ergebnis zu erhalten.

moviesCinemas.registerTempTable("movies_cinemas") 
moviesViewers.registerTempTable("movies_viewers") 

val spark = SparkSession.builder. 
    master("local") // set your master here 
    .appName("spark session example") 
    .getOrCreate() 

val result = spark.sql( 
""" 
SELECT 
t0.movie, 
sum(viewers) as total_viewers 
FROM 
movies_cinemas t0 JOIN movies_viewers t1 
on t0.movie = t1.movie 
WHERE t0.cinema = "cinema_1" 
GROUP BY t0.movie 
ORDER BY total_viewers desc 
""" 
) 

result.first 

res6: org.apache.spark.sql.Row = [movie_4,100] 
+0

Dies funktioniert auch! Danke vielmals. –

Verwandte Themen