Ich bin ein Iterator zu einer HTTP-Ressource implementieren, die ich eine Liste von Elementen paged wiederherstellen kann, habe ich versucht, dies mit einem einfachen Iterator
zu tun, aber es ist eine blockierende Implementierung, und da bin ich mit akka
macht es meine Dispatcher ein bisschen verrückt.Akka Stream Wiederholung wiederholtes Ergebnis
Mein Wille ist es, den gleichen Iterator mit akka-stream
zu implementieren. Das Problem ist, ich brauche etwas andere Wiederholungsstrategie.
Der Dienst gibt eine Liste von Elementen zurück, die durch eine id
identifiziert werden, und manchmal, wenn ich nach der nächsten Seite abfrage, gibt der Dienst die gleichen Produkte auf der aktuellen Seite zurück.
Mein aktueller Algorithmus ist dies.
var seenIds = Set.empty
var position = 0
def isProblematicPage(elements: Seq[Element]) Boolean = {
val currentIds = elements.map(_.id)
val intersection = seenIds & currentIds
val hasOnlyNewIds = intersection.isEmpty
if (hasOnlyNewIds) {
seenIds = seenIds | currentIds
}
!hasOnlyNewIds
}
def incrementPage(): Unit = {
position += 10
}
def doBackOff(attempt: Int): Unit = {
// Backoff logic
}
@tailrec
def fetchPage(attempt: Int = 0): Iterator[Element] = {
if (attempt > MaxRetries) {
incrementPage()
return Iterator.empty
}
val eventualPage = service.retrievePage(position, position + 10)
val page = Await.result(eventualPage, 5 minutes)
if (isProblematicPage(page)) {
doBackOff(attempt)
fetchPage(attempt + 1)
} else {
incrementPage()
page.iterator
}
}
Ich mache die Umsetzung mit akka-streams
aber ich kann nicht herausfinden, wie die Seiten und Test für die Wiederholung akkumulieren die Ströme Struktur.
Irgendwelche Vorschläge?
Sollte die 'return' Zeile in' 'sein fetchPage' Iterator.empty' zurückkehren? –