2016-09-20 4 views
2

Ich verwende RX Java für den Großteil der Verarbeitung in meinem Tool. Grundsätzlich ist die Idee des Werkzeugs ich arbeite ist:Wie gruppieren oder cachen Sie Emissionen im komplexen Prozessablauf?

  • die Payload erhalten (zum Beispiel HTML-Datei) von einer Quelle
  • es in Fragmente geteilt einige spezifische Regel.
  • Jedes Fragment kann zusätzliche Daten erfordert über asynchronen http Anruf geholt werden (kann solche Anrufe ein oder mehr gemacht werden), um die Daten
  • jedes Fragment verwenden könnte genau gleiche HTTP-Aufrufe (Endpunkte)
  • dann verwenden, von dem http-Endpunkt zurück, um etwas mit dem Fragment zu tun, dann kombinieren Sie alle verarbeiteten Fragmente zurück auf die Nutzlast - nennen wir es "verarbeitete Payload" Grundsätzlich ist der Fluss, den ich beschrieben, bereits implementiert und funktioniert gut, einfache Marmor-Diagramm zeigt grundlegende Idee.

Processing flow

Nicht sicher, ob Diagramm lesbar sein wird, so dass der Pseudocode der Verarbeitung wie folgt aussieht:

Observable.just("SOME_PAYLOAD_AS_STRING") 
     .flatMap(payload -> splitToFragmentObservables(payload)) //Getting observables of fragments 
     .concatMapEager(//concat all processed fragments 
       fragment -> getServiceCallsObservable(fragment) //get service calls for all fragments 
          .flatMap(this::doServiceCall) // do service call 
          .reduce(new HashMap<>(), (all, result) -> { //reduce results into map 
           all.addAll(result); 
           return all; 
          }) 
         .map(all -> newFragmentWithData(fragment, all)) //apply somehow the all service results to my fragment 
     ) 
     .reduce(new StringBuilder(), StringBuilder::append) //reduce all fragments back to string 
     .map(StringBuilder::toString); 

Nun, ich denke, wie einige der zur Verbesserung der Verarbeitung, insbesondere HTTP-Aufrufe. Wie Sie im Diagramm sehen, kann jedes Fragment genau die gleichen Aufrufe (A, B oder C) ausführen, was unnötiger Overhead ist.

Ich bin auf der Suche nach einem Weg, wie Sie vermeiden, die gleichen Service-Anrufe in jedem Fragment zu tun. Was ist der beste Weg, es zu verbessern? Gruppiere dieselben Anrufe, führe den Anruf aus und benutze ihn dann irgendwie, wenn du das Endergebnis machst? Oder stattdessen etwas Caching?

Antwort

1

Es gibt eine Möglichkeit, die von Observable produzierten Ergebnisse zwischenzuspeichern und allen Teilnehmern das gleiche Ergebnis zu geben.

Sie haben zwei Möglichkeiten:

  1. Verwenden cache Operator. Aber dann müssten Sie die Cache-Invalidierung implementieren.
  2. Verwenden Sie komplexere Lösungen mit den Operatoren replay und publish. Aber @JakeWharton ist bereits mit einem solchen Problem fertig geworden, das für Sie gibt. Here ist die Erklärung dieser einfachen Bibliothek.

Extra: Here ist eine andere Lösung für das Problem von den gleichen Wert für den beobachtbaren emittiert.

Hoffe, ich verstand das Problem und gab eine richtige Antwort.

0

Verwenden Sie Charles Proxy (oder ähnlich), um zu sehen, welche Anrufe tatsächlich getätigt werden.

Wenn Sie eine gute Http-Bibliothek mit Caching verwenden, dann ist Ihr Code möglicherweise bereits perfekt. Nicht vorzeitig optimieren!

Abhängig von den Caching-Regeln, die Ihre Endpunkte haben (und ob Sie sie ändern können), können Sie möglicherweise zwischengespeicherte Ergebnisse für aufeinander folgende Aufrufe an denselben Endpunkt verwenden.Wenn Sie die Kontrolle über diese Endpunkte haben, legen Sie nachgiebige Cache-Header fest.

Das Hinzufügen von Caching zu getServiceCallsObservable wäre wahrscheinlich eine viel unordentlichere Lösung. Es müsste auch threadsicher sein, da getServiceCallsObservable von verschiedenen Worker-Threads aufgerufen wird. (Nun, es sollte zumindest, müssen Sie möglicherweise einige zusätzliche Planung abhängig von der Implementierung und Planung Details von einigen Ihrer Methoden hinzufügen)

0

Vielen Dank für die Tipps. Während ich nach Guava Cache suchte, fand ich eine elegante Lösung, die auch cache Operator verwendet.

einfach flatMap Operator, der Mapping beobachtbar Service Ergebnis ist

.flatMap(this::doServiceCall) 

Ich änderte Guave Cache get-Methode mit Callback zu verwenden, der ausgeführt wird, wenn kein Wert für den angegebenen Schlüssel

.flatMap(service -> cache.get(service.getUrl(),() -> doServiceCall(service.getUrl()).cache())) 

I So bin einfach zwischengespeichert beobachtbar aus dem Service-Aufruf. Der Cache wird bei jeder neuen Nutzlast, die gerade verarbeitet wird, neu initialisiert - Art von Cache, der nur für die Anforderung lebt.

Verwandte Themen