2016-09-21 3 views
3

Ich bin ein Iterator zu einer HTTP-Ressource implementieren, die ich eine Liste von Elementen paged wiederherstellen kann, habe ich versucht, dies mit einem einfachen Iterator zu tun, aber es ist eine blockierende Implementierung, und da bin ich mit akka macht es meine Dispatcher ein bisschen verrückt.Akka Stream Wiederholung wiederholtes Ergebnis

Mein Wille ist es, den gleichen Iterator mit akka-stream zu implementieren. Das Problem ist, ich brauche etwas andere Wiederholungsstrategie.

Der Dienst gibt eine Liste von Elementen zurück, die durch eine id identifiziert werden, und manchmal, wenn ich nach der nächsten Seite abfrage, gibt der Dienst die gleichen Produkte auf der aktuellen Seite zurück.

Mein aktueller Algorithmus ist dies.

var seenIds = Set.empty 
var position = 0 

def isProblematicPage(elements: Seq[Element]) Boolean = { 
    val currentIds = elements.map(_.id) 
    val intersection = seenIds & currentIds 
    val hasOnlyNewIds = intersection.isEmpty 
    if (hasOnlyNewIds) { 
    seenIds = seenIds | currentIds 
    } 
    !hasOnlyNewIds 
} 

def incrementPage(): Unit = { 
    position += 10 
} 

def doBackOff(attempt: Int): Unit = { 
    // Backoff logic 
} 

@tailrec 
def fetchPage(attempt: Int = 0): Iterator[Element] = { 
    if (attempt > MaxRetries) { 
    incrementPage() 
    return Iterator.empty 
    } 

    val eventualPage = service.retrievePage(position, position + 10) 

    val page = Await.result(eventualPage, 5 minutes) 

    if (isProblematicPage(page)) { 
    doBackOff(attempt) 
    fetchPage(attempt + 1) 
    } else { 
    incrementPage() 
    page.iterator 
    } 
} 

Ich mache die Umsetzung mit akka-streams aber ich kann nicht herausfinden, wie die Seiten und Test für die Wiederholung akkumulieren die Ströme Struktur.

Irgendwelche Vorschläge?

+1

Sollte die 'return' Zeile in' 'sein fetchPage' Iterator.empty' zurückkehren? –

Antwort

1

Die Flow.scan Bühne war ein guter Rat, aber es fehlte das Feature mit Futures zu beschäftigen, so dass ich es implementiert asynchrone Version Flow.scanAsync es auf Akka 2.4.12 jetzt verfügbar ist.

Die aktuelle Implementierung ist:

val service: WebService 
val maxTries: Int 
val backOff: FiniteDuration 

def retry[T](zero: T, attempt: Int = 0)(f: => Future[T]): Future[T] = { 
    f.recoverWith { 
    case ex if attempt >= maxAttempts => 
     Future(zero) 
    case ex => 
     akka.pattern.after(backOff, system.scheduler)(retry(zero, attempt + 1)(f)) 
    } 
} 

def isProblematicPage(lastPage: Seq[Element], currPage: Seq[Element]): Boolean = { 
    val lastPageIds = lastPage.map(_.id).toSet 
    val currPageIds = currPage.map(_.id).toSet 
    val intersection = lastPageIds & currPageIds 
    intersection.nonEmpty 
} 

def retrievePage(lastPage: Seq[Element], startIndex: Int): Future[Seq[Element]] = { 
    retry(Seq.empty) { 
    service.fetchPage(startIndex).map { currPage: Seq[Element] => 
     if (isProblematicPage(lastPage, currPage)) throw new ProblematicPageException(startIndex) 
     else currPage 
    } 
    } 
} 


val pagesRange: Range = Range(0, maxItems, pageSize) 

val scanAsyncFlow = Flow[Int].via(ScanAsync(Seq.empty)(retrievePage)) 

Source(pagesRange) 
    .via(scanAsyncFlow) 
    .mapConcat(identity) 
    .runWith(Sink.seq) 

Dank Ramon für die Beratung :)

2

Die Methode Flow.scan ist in solchen Situationen nützlich.

würde ich Ihren Strom mit einer Quelle von Positionen beginnen:

type Position = Int 

//0,10,20,... 
def positionIterator() : Iterator[Position] = Iterator from (0,10) 

val positionSource : Source[Position,_] = Source fromIterator positionIterator 

Diese Position Quelle kann dann zu einem Flow.scan gerichtet werden, die eine ähnliche Funktion wie Ihre fetchPage (Randnotiz nutzt: Sie wartet so viel vermeiden sollten Wie es möglich ist, gibt es einen Weg, in Ihrem Code nicht zu warten, aber das ist außerhalb des Bereichs Ihrer ursprünglichen Frage). Die neue Funktion muss in den „Zustand“ bereits gesehen Elements nehmen:

def fetchPageWithState(service : Service) 
         (seenEls : Set[Element], position : Position) : Set[Elements] = { 

    val maxRetries = 10 

    val seenIds = seenEls map (_.id) 

    @tailrec 
    def readPosition(attempt : Int) : Seq[Elements] = { 
    if(attempt > maxRetries) 
     Iterator.empty 
    else { 
     val eventualPage : Seq[Element] = 
     Await.result(service.retrievePage(position, position + 10), 5 minutes) 

     if(eventualPage.map(_.id).exists(seenIds.contains)) { 
     doBackOff(attempt) 
     readPosition(attempt + 1) 
     } 
     else 
     eventualPage    
    } 
    }//end def readPosition 

    seenEls ++ readPosition(0).toSet 
}//end def fetchPageWithState 

Diese jetzt in einem Flow verwendet werden können:

def fetchFlow(service : Service) : Flow[Position, Set[Element],_] = 
    Flow[Position].scan(Set.empty[Element])(fetchPageWithState(service)) 

Der neue Flow kann leicht an Ihre Position Quelle angeschlossen werden

def elementsSource(service : Service) : Source[Set[Element], _] = 
    positionSource via fetchFlow(service) 

Jeder neuer Wert von elementsSource wird eine stetig wachsende Reihe von einzigartigen Elementen von abgerufenen Seiten sein: eine Quelle von Set[Element] zu erstellen s.

+0

Danke für die Antwort, es war sehr nützlich! Ich versuche, das 'Await' aus dem Code zu entfernen, und benutze deine Implementierung und es wird nicht unterstützt, da' seenElements' ein 'Future [Set [Element]]' sein muss, damit die Scans stattdessen verkettet werden der Auflösung. Ich werde 'foldAsync' versuchen, um zu sehen, ob es hilft :) –

+0

@mateusduboli Gern geschehen, Happy Hacking. –

Verwandte Themen