2017-08-24 4 views
2

Ich habe eine beobachtbare Kette, die anfängliche Observable stammt aus dem Netzwerk und wird jedes Mal ausgelöst, wenn die Nachricht zum Lesen bereit ist. Der nächste Handler liest dann die Nachricht und deserialisiert sie. Jetzt habe ich eine Gabelung des Observablen, der eine ist der Nachrichtenhandler und der andere protokolliert die Nachricht.f # beobachtbare Gabel und Nebeneffekt

Das Problem ist, dass, weil ich Observable benutze ich tatsächlich versuchen werde, die Nachricht zweimal zu lesen.

Ich verstehe, dass die Verwendung von Event anstelle von Observable das Problem lösen wird, jedoch habe ich dann ein Garbage Collection-Problem, das dazu führen kann, dass Sockets nicht erfasst werden.

Eine Lösung, an die ich gedacht habe, ist das Einfügen einer Art Trennzeichen, die eine Kette von Observablen beendet und eine neue erzeugt, existiert eine solche Funktion bereits als Teil von fsharp oder einer anderen Bibliotheksbibliothek.

Gibt es andere Lösungen für das Problem?

Edit:

Codebeispiel, das nicht richtig

let messagesStream = 
    socket.observable |> 
    Observable.map (fun() -> socket.read()) |> 
    Observable.map (fun m -> deserialize m) 

messagesStream |> Observable.add (fun m -> printf m) 
messagesStream |> Observable.add (fun m -> handle m) 
+3

Auf der Seite: Sie sollten die vorderen Rohre setzen * vor * die Funktion in ihr Rohr, nicht hinter * * der Wert, den Sie Rohr. Wenn man sie so aufstellt, wird deutlich, was vor sich geht. – TeaDrivenDev

+2

Jede Subskription für eine beobachtbare Pipeline verursacht eine Subskription der beobachtbaren Quelle. Sie erstellen zwei Subskriptionen für die Datei "socket.observable". Sie sollten versuchen, einen Veröffentlichungsoperator am Ende des messagesStream-Objekts zu platzieren. – Enigmativity

+0

Danke, ich habe über eine Veröffentlichung gefunden, weißt du irgendeine Bibliothek für f # mit Veröffentlichungsfunktion? Wenn Sie eine Antwort und ein Codebeispiel hinzufügen können, werde ich das als Antwort markieren. Danke – somdoron

Antwort

0

Es klingt funktioniert, wie Sie eine beobachtbare, die das Lesen der Nachricht übernimmt aus dem Netzwerk schaffen könnte und deserialisiert es. Unter der Annahme, dass dies mit Standard-Rx-Operatoren geschieht, sollte dies eine Observable zurückgeben, die neue, deserialisierte Netzwerknachrichten überträgt.

Sie können 2 Abonnenten für diese Observable haben, eine, die auf neue Nachrichten mit der gewünschten Geschäftslogik reagiert und der zweite Abonnent die Nachrichten protokolliert.

Das sollte den Nebeneffekt des mehrfachen Lesens aus dem Netzwerk beseitigen. Das Drücken von 2 Kopien der deserialisierten Nachricht sollte keinen Nebeneffekt haben.

+0

Ich habe den Code mit einem Beispiel bearbeitet, das nicht funktioniert. – somdoron

+0

Ich bin mir nicht sicher, ob ich deine Antwort verstehe. Was Sie vorschlagen, ist nicht Observable.map zu verwenden, sondern das Netzwerk zu abonnieren und dann neue Observable zu produzieren? Kannst du ein Codebeispiel geben? – somdoron

+0

Wenn Sie den Code sehen, den Sie gepostet haben, sieht es so aus, als hätten Sie versucht, was ich in meiner Antwort vorgeschlagen habe. Was genau funktioniert nicht? –

0

Der einfachste Weg, um einige Protokollierung hinzuzufügen ist Observable.iter wie folgt zu verwenden:

let messagesStream = 
    socket.observable 
    |> Observable.map (fun() -> socket.read()) 
    |> Observable.map (fun m -> deserialize m) 
    |> Observable.iter (printfn "%A") 

messagesStream |> Observable.add (fun m -> handle m)