2017-02-17 3 views
0

Mit ReactiveX in Python, wie kann ich einen Stream von Observables zusammenfassen?zusammenfassend Observables in reactivex python

Ich habe einen Strom von Wörterbüchern, die {"Benutzer": "...", "Datum": ...} sind. Ich möchte eine Funktion machen, die ich anwenden kann, die das Wörterbuch mit dem spätesten Datum für jeden Benutzer akkumulieren, und dann die angesammelten Observablen aussenden, wenn das Ende des Stroms getroffen wird (es ist wie max, aber muss das Benutzerfeld sehen und wird mehrere ausgeben Werte).

Beispiel - Eingangsstrom:

{ "user": "a", "date": "2017-02-14" } 
{ "user": "b", "date": "2016-01-01" } 
{ "user": "c", "date": "2015-01-01" } 
{ "user": "a", "date": "2017-01-01" } 
{ "user": "b", "date": "2017-01-01" } 

Erwartete Ausgabe (Bestellung würde keine Rolle spielen)

{ "user": "a", "date": "2017-02-14" } 
{ "user": "c", "date": "2015-01-01" } 
{ "user": "b", "date": "2017-01-01" } 

I "Filtern von Observablen", "Transforming Observable", "Die Kombination von Observablen" zu lesen, und " Entscheidungsbaum von beobachtbaren Operatoren "unter https://ninmesara.github.io/RxPY/api/operators/index.html, und betrachtete reduzieren/aggregieren (nur emittiert einzelnen Wert am Ende), und flat_map (weiß nicht, wie Ende des Streams zu erkennen). many_select und window (besonders) sehen vielversprechend aus, aber es fällt mir schwer sie zu verstehen.

Wie kann ich das mit rx (entweder von einem der bestehenden Betreiber verwenden oder einen benutzerdefinierten Operator machen [die ich weiß nicht, wie noch tun]?)

Antwort

0

ich denke, die folgende Macht Tun Sie, was Sie wollen.

import rx 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .subscribe(print) 
+0

wow ergibt. Das ist Kochbuchqualität. –

0

Die Antwort von Hans ist nahe, braucht nur eine Feinabstimmung.

Meine Beobachter erwarten, dass die { 'user': ..., 'date': } Wörterbücher zu bekommen:

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .map(lambda x: x[0]) \ 
     .subscribe(pp1) 

<type 'dict'> {'date': '2017-02-14', 'user': 'a'} 
<type 'dict'> {'date': '2016-01-01', 'user': 'b'} 
<type 'dict'> {'date': '2015-01-01', 'user': 'c'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'a'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'b'} 

Dadurch könnte die .group_by und .flat_map Ergebnisse in den Beobachtern am Ende ergibt Listen der Länge 1 enthält die Zusammenfassungen bekommen , anstatt nur die Zusammenfassungen.

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .subscribe(pp1) 

ergibt

<type 'list'> [{'date': '2017-02-14', 'user': 'a'}] 
<type 'list'> [{'date': '2017-01-01', 'user': 'b'}] 
<type 'list'> [{'date': '2015-01-01', 'user': 'c'}] 

benötigt, um eine Karte hinzuzufügen:

import rx 

def pp1(x): 
    print type(x), x 

rx.Observable.from_([ 
    { "user": "a", "date": "2017-02-14" }, 
    { "user": "b", "date": "2016-01-01" }, 
    { "user": "c", "date": "2015-01-01" }, 
    { "user": "a", "date": "2017-01-01" }, 
    { "user": "b", "date": "2017-01-01" }]) \ 
     .group_by(lambda x: x['user']) \ 
     .flat_map(lambda x: x.max_by(lambda y: y['date'], lambda a,b: -1 if a < b else 1 if a>b else 0)) \ 
     .map(lambda x: x[0]) \ 
     .subscribe(pp1) 

, die die erwartete

<type 'dict'> {'date': '2017-02-14', 'user': 'a'} 
<type 'dict'> {'date': '2017-01-01', 'user': 'b'} 
<type 'dict'> {'date': '2015-01-01', 'user': 'c'}