Ich versuche zu verstehen, wie man rxcpp verwendet, mein Eindruck war, dass, wenn ein Observable einen Wert emittiert, alle Beobachter, die abonniert sind benachrichtigt werden, indem ihre on_next() -Methoden aufgerufen werden, wobei ihnen der emittierte Wert übergeben wird. Dieserxcpp - warum nicht alle Beobachter 'on_next Funktion aufgerufen werden, wenn ein Observable einen Wert ausgibt
ist nicht der Fall mit dem folgenden Beispiel:
auto eventloop = rxcpp::observe_on_event_loop();
printf("Start task\n");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
[](int i){
printf("Observable sending: %d\n", i);
return i;
}
);
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#1 onNext: %d\n", v);},
[](){printf("#1 onCompleted\n");});
values.
subscribe_on(eventloop).
take(2).
as_blocking().
subscribe(
[](int v){printf("#2 onNext: %d\n", v);},
[](){printf("#2 onCompleted\n");});
printf("Finish task\n");
ich die Ausgabe zu erwarten sein, so etwas wie:
Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task
dh on_next auf alle abonnierten Beobachter aufgerufen wird, wenn der neue Wert kommt durch.
Stattdessen ist der Ausgang tatsächlich:
Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task
Die Verwendung von as_blocking() verhindert, dass der zweite Abonnent gestartet wird, bis der erste abgeschlossen ist –