Ich habe eine Methode, die eine Nachricht liest/schreibt.Erstellen Sie eine gemeinsame Observable, die mehrere Anrufe vermeiden
fun sendMessage(message: String): Observable<MESSAGE> {
return readMessage().doOnSubscribe {
socket.write(message)
}
}
readMessage()
gibt ein Thema veröffentlichen zurück, die einen Wert von einem heißen beobachtbaren aus einem Strom emittiert (socket.read()
).
protected fun readMessage(): Observable<MESSAGE> {
if (messageSubject == null) {
messageSubject = PublishSubject.create()
socket.read()
.flatMap {
[email protected] flowTransformer.runLoop(it)
}
.flatMap {
//Do some stuff
}
.subscribe(messageSubject)
}
return messageSubject
}
Ich nenne sendMessage()
an einer anderen Stelle und mehrfach in der gleichen Kette.
sendMessage("Message1").flatMap {
sendMessage("Message2")
}.flatMap {
sendMessage("Message 3")
}.subscribe({
//next
}, {
//error
})
Das Problem ist, wenn ich sendMessage()
nenne ich noch nicht an den Verlag abonniert haben (so die Nachrichtenantwort ist drop). Ich befürchte, dass wenn ich ReplaySubject verwende, es zu viele Nachrichten ausgibt, weil ich eine Menge von sendMessage()
verwende.
Irgendwann liest die readObservable von der ersten sendMessage alle nächste Nachricht. Und es ist ein Problem, weil die Analyseoperation CPU-intensiv ist.
Wie könnte ich diese Kette verbessern?
Sie können sicherstellen, dass das Abonnement bereit ist, sobald die erste Nachricht gesendet wird. – tynn
Wie könnte ich das tun? Ich denke, dass ich das schon gemacht habe, indem ich nur auf den Socket geschrieben habe, wenn ich sendMessage() abonniert habe? – Timo