2017-02-24 1 views
1

Ich habe eine Ereignisquelle, die Ereignisse generiert, die zu bestimmten Gruppen gehören. Ich möchte diese Gruppen puffern und die Gruppen (in Stapeln) an Speicher senden. Bisher habe ich das:Puffergruppe nach Gruppen mit reaktiven Erweiterungen, geschachtelt subscribe

eventSource 
    .GroupBy(event => event.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Subscribe(group => group.Events 
          .Buffer(TimeSpan.FromSeconds(60), 100) 
          .Subscribe(list => SendToStorage(list))); 

So gibt es eine verschachtelte abonnieren die Ereignisse in einer Gruppe. Irgendwie denke ich, dass es einen besseren Weg gibt, aber ich konnte es noch nicht herausfinden.

Antwort

3

Hier ist die Lösung:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 

Hier ein paar allgemeine Regeln, die Sie 'reduzieren' können helfen:

1) Eine verschachtelte Das Abonnement wird normalerweise mit Select alles vor dem geschachtelten Abonnement, gefolgt von einem Merge, gefolgt von dem geschachtelten Abonnement festgelegt. So dass die Anwendung, erhalten Sie diese:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => new { group.Key, Events = group }) 
    .Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 

2) Sie können offensichtlich zwei aufeinanderfolgenden wählt kombinieren (und da Sie nichts mit dem anonymen Objekt zu tun, kann nur das Entfernen):

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Merge() 
    .Subscribe(list => SendToStorage(list)); 
Schließlich

3), ein Select von einem Merge gefolgt kann auf eine SelectMany reduziert werden:

eventSource 
    .GroupBy(e => e.GroupingKey) 
    .SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100)) 
    .Subscribe(list => SendToStorage(list)); 
+1

eine ausgezeichnete, gut präsentiert Antwort. – Enigmativity

+0

Schrieb einige Komponententests, um zu überprüfen, dass es funktionierte und funktioniert wie ein Charme. Vielen Dank! –

1

Dies ist eine Möglichkeit, es zu tun

(from g in eventSource.GroupByUntil(e => e.GroupingKey, 
            g => g.Buffer(TimeSpan.FromSeconds(60), 100)) 
from b in g.ToList() 
select b).Subscribe(SendToStorage);