2017-06-15 3 views
0

Ich habe eine Dataset-Struktur in Spark mit zwei Spalten, eine mit dem Namen user die andere category genannt. So dass die Tabelle etwas wie folgt aussieht:Versuch, eindeutige Benutzer zwischen zwei Kategorien in Spark zu zählen

+---------------+---------------+ 
|   user|  category| 
+---------------+---------------+ 
|  garrett|  syncopy| 
|  garrison| musictheory| 
|   marta|  sheetmusic| 
|  garrett| orchestration| 
|   harold|   chopin| 
|   marta| russianmusic| 
|   niko|   piano| 
|   james|  sheetmusic| 
|   manny|   violin| 
|  charles|  gershwin| 
|   dawson|   cello| 
|   bob|   cello| 
|   george|   cello| 
|   george| americanmusic| 
|   bob| personalcompos| 
|   george|  sheetmusic| 
|   fred|  sheetmusic| 
|   bob|  sheetmusic| 
|  garrison|  sheetmusic| 
|   george| musictheory| 
+---------------+---------------+ 
only showing top 20 rows 

Jede Zeile in der Tabelle ist einzigartig, aber ein Benutzer und eine Kategorie können mehrmals vorkommen. Ziel ist es, die Anzahl der Benutzer zu zählen, die zwei Kategorien teilen. Zum Beispiel cello und americanmusic teilen einen Benutzer mit dem Namen george und musictheory und sheetmusic teilen Benutzer george und garrison. Das Ziel besteht darin, die Anzahl der verschiedenen Benutzer zwischen n Kategorien zu ermitteln, was bedeutet, dass es höchstens n Quadratkanten zwischen den Kategorien gibt. Ich verstehe teilweise, wie man diese Operation durchführt, aber ich habe ein wenig Mühe, meine Gedanken zu Spark Java zu konvertieren.

Mein Denken ist, dass ich brauche eine Selbstverknüpfung auf user zu tun, um einen Tisch zu bekommen, die wie folgt strukturiert sein würde:

+---------------+---------------+---------------+ 
|   user|  category|  category| 
+---------------+---------------+---------------+ 
|  garrison| musictheory|  sheetmusic| 
|   george| musictheory|  sheetmusic| 
|  garrison| musictheory| musictheory| 
|   george| musictheory| musicthoery| 
|  garrison|  sheetmusic| musictheory| 
|   george|  sheetmusic| musictheory| 
+---------------+---------------+---------------+ 

Die Selbst Betrieb in Funken (Java-Code) beizutreten, ist nicht schwer:

Dataset<Row> newDataset = allUsersToCategories.join(allUsersToCategories, "users"); 

Dies wird immer irgendwo, aber ich Mappings auf die gleiche Kategorie wie in den Zeilen 3 und 4 in dem obigen Beispiel erhalten und ich rückwärts Mappings, wo die Kategorien so umgekehrt werden, dass im wesentlichen das doppelte jeder Interaktion mit dem Benutzer wie Zählen in den Reihen 5 und 6 von th Das obige Beispiel.

Was ich glaube, dass ich tun muss, ist eine Art von bedingten in meiner Verbindung, die etwas in Richtung X < Y sagt, so dass gleiche Kategorien und Duplikate weggeworfen werden. Schließlich muss ich dann die Anzahl der einzelnen Zeilen für n-Quadratt-Kombinationen zählen, wobei n die Anzahl der Kategorien ist.

Könnte jemand bitte erklären, wie man das in Spark und speziell in Spark Java macht, da ich mit der Scala-Syntax ein wenig nicht vertraut bin?

Danke für die Hilfe.

+0

Schreiben Sie die SQL-Abfrage und führen Sie sie aus. Und jeder Filter kann verwendet werden – AKSW

+0

was wäre die SQL-Abfrage explizit sein –

Antwort

0

fand ich die Antwort ein paar Stunden spark sql mit:

Dataset<Row> connection per shared user = spark.sql("SELECT a.user as user, " 
                  + "a.category as categoryOne, " 
                  + "b.category as categoryTwo " 
                  + "FROM allTable as a INNER JOIN allTable as b " 
                  + "ON a.user = b.user AND a.user < b.user"); 

Dies wird dann ein Datensatz mit drei Spalten erstellen user, categoryOne und categoryTwo. Jede Zeile ist eindeutig und zeigt an, wenn der Benutzer in beiden Kategorien existiert.

0

Ich bin nicht sicher, ob ich Ihre Anforderungen richtig verstehe, aber ich werde versuchen zu helfen.

Nach meinem Verständnis sollte das erwartete Ergebnis für obige Daten wie folgt aussehen. Wenn es nicht wahr ist, lassen Sie es mich wissen, ich werde versuchen, erforderliche Änderungen vorzunehmen.

+--------------+--------------+-+ 
|_1   |_2   | 
+--------------+--------------+-+ 
|personalcompos|sheetmusic |1| 
|cello   |musictheory |1| 
|americanmusic |cello   |1| 
|cello   |sheetmusic |2| 
|cello   |personalcompos|1| 
|russianmusic |sheetmusic |1| 
|americanmusic |sheetmusic |1| 
|americanmusic |musictheory |1| 
|musictheory |sheetmusic |2| 
|orchestration |syncopy  |1| 
+--------------+--------------+-+ 

In diesem Fall können Sie Ihr Problem mit unten Scala Code lösen:

allUsersToCategories 
    .groupByKey(_.user) 
    .flatMapGroups{case (user, userCategories) => 
     val categories = userCategories.map(uc => uc.category).toSeq 
     for { 
     c1 <- categories 
     c2 <- categories 
     if c1 < c2 
     } yield (c1, c2) 
    } 
    .groupByKey(x => x) 
    .count() 
    .show() 

Wenn Sie symetric Ergebnis benötigen, können Sie nur, wenn Anweisung if c1 != c2 in flatMapGroups Transformation ändern.

Bitte beachten Sie, dass in obigem Beispiel I Dataset API verwendet, die für Testzwecke mit folgendem Code erstellt wurden:

case class UserCategory(user: String, category: String) 

val allUsersToCategories = session.createDataset(Seq(
    UserCategory("garrett", "syncopy"), 
    UserCategory("garrison", "musictheory"), 
    UserCategory("marta", "sheetmusic"), 
    UserCategory("garrett", "orchestration"), 
    UserCategory("harold", "chopin"), 
    UserCategory("marta", "russianmusic"), 
    UserCategory("niko", "piano"), 
    UserCategory("james", "sheetmusic"), 
    UserCategory("manny", "violin"), 
    UserCategory("charles", "gershwin"), 
    UserCategory("dawson", "cello"), 
    UserCategory("bob", "cello"), 
    UserCategory("george", "cello"), 
    UserCategory("george", "americanmusic"), 
    UserCategory("bob", "personalcompos"), 
    UserCategory("george", "sheetmusic"), 
    UserCategory("fred", "sheetmusic"), 
    UserCategory("bob", "sheetmusic"), 
    UserCategory("garrison", "sheetmusic"), 
    UserCategory("george", "musictheory") 
)) 

Ich habe versucht Beispiel in Java zur Verfügung zu stellen, aber ich habe keine Erfahrung mit Java + Spark und es ist zu zeitaufwändig für mich, über Beispiel von Scala nach Java zu migrieren ...

Verwandte Themen