2013-03-21 6 views
10

Gibt es eine Möglichkeit, einen Seq [Future [X]] in einen Enumerator[X] zu verwandeln? Der Anwendungsfall ist, dass ich Ressourcen durch Crawlen des Webs erhalten möchte. Dies wird eine Sequenz von Futures zurückgeben, und ich möchte einen Enumerator zurückgeben, der die Futures in der Reihenfolge, in der sie zuerst fertig sind, an den Iteratee weiterleitet.Umwandlung eines Seq [Future [X]] in einen Enumerator [X]

Es sieht aus wie Victor Klang Future select gist könnte dazu verwendet werden - obwohl es ziemlich ineffizient aussieht.

Hinweis: Die Iteratees und Enumerator die in Frage sind solche gegeben durch das Spiel Framework Version 2.x, das heißt mit den folgenden Importe: import play.api.libs.iteratee._

Antwort

2

Eine bessere, kürzere und ich denke, effizientere Antwort lautet:

 
    def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
     def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = { 
     Future.sequence(seqFutureX).flatMap { seqX: Seq[X] => 
      seqX.foldLeft(Future.successful(i)) { 
       case (i, x) => i.flatMap(_.feed(Input.El(x))) 
      } 
     } 
     } 
    } 

+0

Ich bearbeitet, um foldLeft zu verwenden, das schneller ist als foldRight. –

+2

Vorsicht, 'Future.sequence' gibt eine fehlgeschlagene Zukunft zurück, wenn eine der' seqFutureX'-Futures fehlschlägt. –

+0

und diese Lösungen erwartet alle Futures vor der Fütterung Iteratee, während eine andere von @bblfish es so früh wie möglich (ohne konservierte Reihenfolge!). – viktortnk

3

Victor Klang's select method Verwendung:

 
    /** 
    * "Select" off the first future to be satisfied. Return this as a 
    * result, with the remainder of the Futures as a sequence. 
    * 
    * @param fs a scala.collection.Seq 
    */ 
    def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): 
     Future[(Try[A], Seq[Future[A]])] = { 
    @scala.annotation.tailrec 
    def stripe(p: Promise[(Try[A], Seq[Future[A]])], 
       heads: Seq[Future[A]], 
       elem: Future[A], 
       tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { 
     elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) } 
     if (tail.isEmpty) p.future 
     else stripe(p, heads :+ elem, tail.head, tail.tail) 
    } 
    if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!")) 
    else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail) 
    } 
} 

dann kann ich bekommen, was ich benötigt mit

 
    Enumerator.unfoldM(initialSeqOfFutureAs){ seqOfFutureAs => 
     if (seqOfFutureAs.isEmpty) { 
      Future(None) 
     } else { 
      FutureUtil.select(seqOfFutureAs).map { 
      case (t, seqFuture) => t.toOption.map { 
       a => (seqFuture, a) 
      } 
      } 
     } 
    } 

+0

Ich bin ein bisschen besorgt dass die ausgewählte Implementierung von Victor Klang nicht effizient genug ist. In diesem Algorithmus müssen wir die gesamte Sequenz durchqueren, die erfordert, dass jede Zukunft bei jedem Durchlauf mit einem neuen Versprechen registriert wird. Es sollte möglich sein, einen Algorithmus zu erstellen, wo man das nur einmal tun muss ... Vielleicht ist es nur eine Frage des Unterklassen-Enumerators und des Registrierens jeder Zukunft in der Sequenz mit dem Enumerator. –

0

Sie könnten eine mit dem Java Executor Completion Service (JavaDoc) erstellen. Die Idee ist es, eine Sequenz von neuen Futures zu erstellen, die jeweils ExecutorCompletionService.take() verwenden, um auf das nächste Ergebnis zu warten. Jede Zukunft wird beginnen, wenn die vorherige Zukunft ihr Ergebnis hat.

Aber bitte beachten Sie, dass dies nicht so effizient ist, da viel Synchronisation hinter den Kulissen stattfindet. Es könnte effizienter sein, eine parallele Kartenreduzierung für die Berechnung zu verwenden (z. B. unter Verwendung von ParSeq von Scala) und den Enumerator auf das vollständige Ergebnis warten zu lassen.

+0

"Jede Zukunft beginnt, wenn die vorherige Zukunft ein Ergebnis hat": das scheint zu blockieren. In dem Code, der in meiner Antwort oben angegeben ist, werden alle Futures in der 'seqOfFuturesA' parallel ausgeführt. –

0

ACHTUNG: kompiliert nicht vor

Was ist so etwas wie diese zu beantworten:

def toEnumerator(seqFutureX: Seq[Future[X]]) = new Enumerator[X] { 
    def apply[A](i: Iteratee[X, A]): Future[Iteratee[X, A]] = 
    Future.fold(seqFutureX)(i){ case (i, x) => i.flatMap(_.feed(Input.El(x)))) } 
} 
+0

Die Signatur der Falte ist '' 'Def-Falte [T, R] (Futures: scala.TraversableOnce [Zukunft [T]]) (Null: R) (foldFun: (R, T) => R) (implizit Executor: ExecutionContext): Future [R] '' ' aber Ihr Code hat die Signatur ' 'falten [T, R] (Futures: Seq [Zukunft [T]]) (null: Iteratee [T, R ]) (foldFun: (R, T) => Future [R]) (impliziter Executor: ExecutionContext): Future [R] '' ' Es gibt ein Problem mit foldfun, weil' i.flatMap (_. feed (Input .El (x)) 'gibt ein' Future [R] 'nicht ein' R' –

+0

zurück, aber ich bin vom Typ "Iteratee [X, A]", und flatMap sollte ein "Iteratee [X, A]", nein, zurückgeben (vorausgesetzt, dass der Feed Iteratee [X, A] zurückgibt) –

+0

Play 'Iteratee [E, A] 'definiert' flatMap' als: 'def flatMap [B] (f: A => Iteratee [E, B]): Iteratee [E, B] 'damit dein Fall wirklich als 'case (i, x) => i.flatMap (a => ...)' geschrieben wird. Und dann ist "a" nicht länger ein "Iteratee", und daher hat es keine "feed" -Methode. Wenn man andererseits versucht, "case (i, x) => i feed (Input.El (x))" zu machen, dann endet man mit einem 'Future [Iteratee [...]]' was nicht ist welche Falte will. Das Tolle ist, dass ich ohne Scala's Typsystem nicht die Antwort gefunden hätte ... :-) –

0

Hier ist etwas, was ich gefunden handlich,

def unfold[A,B](xs:Seq[A])(proc:A => Future[B])(implicit errorHandler:Throwable => B):Enumerator[B] = { 
    Enumerator.unfoldM (xs) { xs => 
     if (xs.isEmpty) Future(None) 
     else proc(xs.head) map (b => Some(xs.tail,b)) recover { 
      case e => Some((xs.tail,errorHandler(e))) 
     } 
    } 
} 

def unfold[A,B](fxs:Future[Seq[A]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = { 

    (unfold(Seq(fxs))(fxs => fxs)(errorHandler1)).flatMap(unfold(_)(proc)(errorHandler)) 
} 

def unfoldFutures[A,B](xsfxs:Seq[Future[Seq[A]]])(proc:A => Future[B]) (implicit errorHandler1:Throwable => Seq[A], errorHandler:Throwable => B) :Enumerator[B] = { 

    xsfxs.map(unfold(_)(proc)).reduceLeft((a,b) => a.andThen(b)) 
} 
1

ich erkennen, dass die Frage schon ein bisschen alt, aber basierend auf Santhosh Antwort und die eingebauten in Enumterator.enumerate() Umsetzung kam ich mit der Follow-up:

def enumerateM[E](traversable: TraversableOnce[Future[E]])(implicit ec: ExecutionContext): Enumerator[E] = { 
    val it = traversable.toIterator 
    Enumerator.generateM { 
    if (it.hasNext) { 
     val next: Future[E] = it.next() 
     next map { 
     e => Some(e) 
     } 
    } else { 
     Future.successful[Option[E]] { 
     None 
     } 
    } 
    } 
} 

Beachten Sie, dass im Gegensatz zum ersten Viktor- select-based-solution Diese Option behält die Reihenfolge bei, aber Sie können alle Berechnungen immer noch asynchron starten. So zum Beispiel, können Sie wie folgt vorgehen:

// For lack of a better name 
def mapEachM[E, NE](eventuallyList: Future[List[E]])(f: E => Future[NE])(implicit ec: ExecutionContext): Enumerator[NE] = 
    Enumerator.flatten(
    eventuallyList map { list => 
     enumerateM(list map f) 
    } 
) 

Diese letztere Methode war in der Tat, was ich suchte, als ich auf diesen Thread gestolpert. Hoffe es hilft jemandem! :)

0

Ich möchte die Verwendung eines Broadcast-

def seqToEnumerator[A](futuresA: Seq[Future[A]])(defaultValue: A, errorHandler: Throwable => A): Enumerator[A] ={ 
    val (enumerator, channel) = Concurrent.broadcast[A] 
    futuresA.foreach(f => f.onComplete({ 
     case Success(Some(a: A)) => channel.push(a) 
     case Success(None) => channel.push(defaultValue) 
     case Failure(exception) => channel.push(errorHandler(exception)) 
    })) 
    enumerator 
    } 

I hinzugefügt und Errorhandling Defaultvalues ​​vorzuschlagen, aber Sie können diejenigen überspringen von onSuccess oder onFailure verwenden, statt onComplete

Verwandte Themen