2017-07-22 10 views
0

Ich habe immer noch mit einigen der Grundlagen von RxJava zu kämpfen und würde einige Hilfe sehr zu schätzen wissen.Unendlich Observable zu HashMap als Observable

Ich habe eine unendliche, heiße Observable, die regelmäßig markierte Ereignisse ausgibt (erfasst in einer einfachen Klasse mit einem Namen (tag) und einigen Eigenschaften). Die Tags sind endlich (in diesem Fall etwa 10 unterschiedliche Tags), aber die Ereignisspezifika sind jedes Mal anders (z. B. ein Zeitstempel).

Ich versuche jetzt, eine HashMap mit den Tags als Schlüssel und den Ereignissen als Eintrag zu erstellen, so dass die HashMap selbst zu einer unendlichen Observablen wird, die die HashMap bei jeder Änderung ausgibt.

Bisher habe ich ein Subject verwendet, um das ursprüngliche Observable zu abonnieren und die HashMap zu senden, aber ich habe auch die ".toMap" -Methode gesehen. Allerdings kann ich nicht herausfinden, wie man diese Methode mit einer unendlichen beobachtbaren Quelle benutzt und mit jeder Änderung ausstrahlt. Aus der Dokumentation ist mir nicht einmal klar, ob das überhaupt möglich wäre.

Wenn es nicht möglich ist, gibt es neben der Verwendung von Subjekten eine andere Möglichkeit, dasselbe zu erreichen? Ich möchte das schlank halten, und Subject scheint ziemlich schwer.

+0

Sie wollen eine 'Map' oder eine' Observable'? Wenn Sie 'Observable' möchten, können Sie' groupBy' verwenden. Wenn Sie 'Map' möchten, können Sie das' GroupedObservable' in einer 'Map' sammeln. –

Antwort

1

Hier ist mein Code in eine Map oder Observable<Map> konvertieren. Aber ich weiß nicht, warum du das tun musst.

fun <T, K> Observable<T>.toInfiniteMap(keySelector: (T) -> K): Map<K, Observable<T>> { 
    val map = ConcurrentHashMap<K, Observable<T>>() 
    this.subscribeOn(Schedulers.newThread()) 
      .doOnNext { println(it) } 
      .groupBy(keySelector) 
      .doOnNext { map.put(it.getKey(), it) } 
      .subscribe() 
    return map 
} 

fun <T, K> Observable<T>.toInfiniteMapObservable(keySelector: (T) -> K): 
     Observable<Map<K, Observable<T>>> { 
    val map = ConcurrentHashMap<K, Observable<T>>() 
    return this.subscribeOn(Schedulers.newThread()) 
      .doOnNext { println(it) } 
      .groupBy(keySelector) 
      .doOnNext { map.put(it.getKey(), it) } 
      .map { map } 
}