2016-04-06 6 views
0

Ich habe eine Anforderung, ein Feld in meiner RDD zu einem anderen Feld von einer anderen Karte zuordnen UserDAO.users Ich habe versucht, die Zuordnung hier herauszufinden, kann aber nicht die username zurückgeben noch. Ich bin dies in der aktualisierten Karte bekommen, wenn ich ein foreach drucken tun ist [email protected]Scala - map einen Wert von einer Karte zu einer anderen Karte

Hier mein Code-Schnipsel:

rdd.map { l => { 
     l.map { case (k, v) => { 
     k match { 
      case "a_userid" => { 
      l.updated("a_username", userDAO.users.map(c => c.filter(f => f.userid == v.toInt)).map(y => y.map(e => e.username))) 
      } 
      case _ => 
      } 
      } 
     } 
     } 
    } 

Also im Grunde

rdd - RDD[Map[String, String]]

UserDAO.users - Future[Seq[User]] - wo Benutzer ist eine Fallklasse

und die aktualisierte rdd-RDD[Map[String, String]]

-

Jede Idee, wie dieses Problem zu lösen?

Dank

+0

Können Sie bitte die Typen Ihrer Variablen angeben? Insbesondere "rdd" und "userDAO.users". Bitte geben Sie auch die Art des erwarteten Ergebnisses an. – Aivean

+0

@Aivean aktualisiert meine Frage, danke –

+0

ok, nächste Frage, wie groß ist die Reihenfolge der Benutzer, zurückgegeben von 'userDAO.users'? Gibt es einen Grund, es auf Arbeiterknoten (über 'rdd.map') aufzurufen, anstatt es vorher zu materialisieren? – Aivean

Antwort

1

Ich habe Ihren Code neu geschrieben es funktioniert. Bitte beachten Sie, dass es sich um eine Blockierung handelt. Sonst gibt es keinen anderen Weg RDD[Map[String, String]] zu erreichen.

Ich unterteile rdd.map Abschnitt für Klarheit.

Erste Variante. Ich habe Ihren Ansatz verwendet, Benutzer innerhalb von map zu lesen. Bitte beachten Sie, dass dies sehr ineffizient ist, da alle Benutzer jedes Mal pro Iteration gelesen werden, das heißt 11 Millionen mal:

// rdd.map ommitted 
l.get("a_userid").flatMap { 
    userId:String => 
    val newUserName:Option[String] = 
     Await.result(userDAO.users 
     .map(c => c.find(f => f.userid == userId.toInt)) 
     .map(y => y.map(e => e.username)), 
     30 seconds 
    ) 
    newUserName.map(l.updated("a_username", _)) 
}.getOrElse(l) 

Alternative Ansatz beinhaltet vorher Benutzer auf die Karte zu lesen. Diese Karte wird dann an alle Funkenarbeiter gesendet. Da deine Karte nicht so groß ist, ist es in Ordnung. Dieser Ansatz ist effizienter, da Sie nur eine einzige Map-Suche pro Iteration über durchführen, was sehr schnell ist.

val users:Map[Int, String] = Await.result(userDAO.users 
    .map(uss => uss.map(u => u.userid -> u.username).toMap), 
    30 seconds 
) 

// rdd.map ommitted 
l.get("a_userid").flatMap { 
    userId:String => 
    users.get(userId.toInt).map(l.updated("a_username", _)) 
}.getOrElse(l) 

UPD: Nur aus Gründen der Vollständigkeit ist hier eine andere asynchrone Variante:

userDAO.users 
    .map(uss => uss.map(u => u.userid -> u.username).toMap) 
    .map { users:Map[Int, String] => 
     rdd.map { l:Map[String, String] => 
     l.get("a_userid").flatMap { 
      userId:String => 
      users.get(userId.toInt).map(l.updated("a_username", _)) 
     }.getOrElse(l) 
     } 
    } 

Es folgt dem gleichen Ansatz wie Variante 2, gibt aber Future[RDD[Map[String, String]]] statt konkretes Ergebnis.

+0

Ich dachte so viel, aber froh, Ihre Meinung zu haben, danke. –

Verwandte Themen