2017-11-03 2 views
0

Ich habe eine Methode, die eine Nachricht liest/schreibt.Erstellen Sie eine gemeinsame Observable, die mehrere Anrufe vermeiden

fun sendMessage(message: String): Observable<MESSAGE> { 
    return readMessage().doOnSubscribe { 
      socket.write(message)  
    } 
} 

readMessage() gibt ein Thema veröffentlichen zurück, die einen Wert von einem heißen beobachtbaren aus einem Strom emittiert (socket.read()).

protected fun readMessage(): Observable<MESSAGE> { 
    if (messageSubject == null) { 
     messageSubject = PublishSubject.create() 
     socket.read() 
       .flatMap { 
        [email protected] flowTransformer.runLoop(it) 
       } 
       .flatMap { 
        //Do some stuff 
       } 
       .subscribe(messageSubject) 
    } 
    return messageSubject 
} 

Ich nenne sendMessage() an einer anderen Stelle und mehrfach in der gleichen Kette.

 sendMessage("Message1").flatMap { 
     sendMessage("Message2") 
    }.flatMap { 
     sendMessage("Message 3") 
    }.subscribe({ 
     //next 
    }, { 
     //error 
    }) 

Das Problem ist, wenn ich sendMessage() nenne ich noch nicht an den Verlag abonniert haben (so die Nachrichtenantwort ist drop). Ich befürchte, dass wenn ich ReplaySubject verwende, es zu viele Nachrichten ausgibt, weil ich eine Menge von sendMessage() verwende.

Irgendwann liest die readObservable von der ersten sendMessage alle nächste Nachricht. Und es ist ein Problem, weil die Analyseoperation CPU-intensiv ist.

Wie könnte ich diese Kette verbessern?

+0

Sie können sicherstellen, dass das Abonnement bereit ist, sobald die erste Nachricht gesendet wird. – tynn

+0

Wie könnte ich das tun? Ich denke, dass ich das schon gemacht habe, indem ich nur auf den Socket geschrieben habe, wenn ich sendMessage() abonniert habe? – Timo

Antwort

0

Sie können eine PublishSubject mit einer Puffergröße

public void ReplaySubjectBufferExample() 
{ 
    var bufferSize = 2; 
    var subject = new ReplaySubject<string>(bufferSize); 
    subject.OnNext("a"); 
    subject.OnNext("b"); 
    subject.OnNext("c"); 
    subject.Subscribe(Console.WriteLine); 
    subject.OnNext("d"); 
} 

Also, auf diese Weise erstellen, können Sie die Anzahl der Elemente steuern Wiederholung zu sein. Quelle ReplaySubject

Verwandte Themen