2017-09-04 1 views
0

Ich möchte einen EventBus mit RxJava implementieren und ich brauche klebrige Ereignisse. Ich weiß, dass ich ein BehaviorSubject verwenden kann, aber es speichert nur das letzte ausgegebene Element, während ich alle Ereignisse zwischenspeichern möchte, die sich durch ihren Typ unterscheiden (Klassenname). Es gibt eine andere Option - ReplaySubject, aber es hat einen Overhead - es enthält alle emittierten Elemente. Gibt es eine Möglichkeit, eine Art ReplaySubject zu erstellen, das nur nach Typelementen enthalten ist?ReplaySubject mit verschiedenen Elementen

+0

Sie können die Elemente filtern, bevor Sie sie im Betreff ausgeben. Sie werden also nach ihrem Typ bereits eindeutig sein, wenn sie im 'Betreff' empfangen werden. – masp

+0

@masp es ist ein Event-Bus. Es ist wie ein unendlicher Strom. Ich weiß nicht, wie viele Elemente ich aussenden werde, daher kann ich sie nicht sammeln und filtern, bevor ich sie an ein Thema sende. – Buckstabue

+0

Sie möchten das neueste Element nach Typ zwischenspeichern? Dann haben Sie mehrere BehaviorSubjects oder ReplaySubjects, einen für jeden Typ, der Ihnen auch Typsicherheit gibt. – akarnokd

Antwort

1

Ich glaube nicht, dass es saubere Lösungen gibt. Sie können dies jedoch möglicherweise durchführen.

  1. Erstellen Sie für jeden Ereignistyp eine BehaviorSubject<>. Verwenden Sie eine ConcurrentMap, um jedes eingehende Ereignis an die richtige Subject zu senden.
  2. Pflegen Sie eine Liste dieser Themen in der Reihenfolge der Ankunft von Ereignissen.
  3. Wenn eine neue Subskription empfangen wird, wird eine Observable erstellt, die die Zusammenführung aller Subjekte mit der ersten Liste von Subjekten, die bereits Events erhalten haben, gefolgt von denen, die nicht in einer Reihenfolge sind.

Dies ist ein Code, der das obige klären könnte. Ungetestet.

// The subscription operation will perform a merge of the two lists 
Map<EventType, BehaviorSubject<Event>> map = new ConcurrentHashMap<>(); 
List<BehaviorSubject<Event>> listOfUnseenEvents = new ArrayList<>(); 
List<BehaviorSubject<Event>> listOfSeenEvents = new ArrayList<>(); 
// ... 
listOfUnseenEvents = map.values().asList(); 

public Observable<Event> busSub() { 
    List<BehaviorSubject<Event>> allEvents = new ArrayList<>(); 
    synchronized (map) { 
    allEvents.addAll(listOfSeenEvents); 
    allEvents.addAll(listOfUnseenEvents); 
    } 
    return Observable.merge(allEvents); 
} 

// receive an event and dispatch it 
eventSource 
    .subscribe(event -> processEvent(event)); 

public void processEvent(Event event) { 
    BehaviorSubject<Event> eSubject = map.get(event.getEventType()); 
    synchronized (map) { 
     if (containsEventType(listOfSeenEvents, event.getEventType())) { 
     removeEventType(listOfSeenEvents, event.getEventType()); 
     } else { 
     removeEventType(listOfUnseenEvents, event.getEventType()); 
     } 
     listOfSeenEvents.add(eSubject); 
    } 
    eSubject.onNext(event); 
} 

Bitte beachten Sie, dass dieser Code sich die Tatsache zunutze, dass nimmt merge() zu jedem der gegebenen Observablen um abonnieren werden. Dafür gibt es keine Garantie in der RxJava-Dokumentation.

Wenn nicht alle Ereignistypen im Voraus bekannt sind, funktioniert dies nicht.

+0

Danke für die Idee, ich muss nicht zwischen gesehenen und ungesehenen Ereignissen unterscheiden, also könnte es vereinfacht werden. Auch warum funktioniert das nicht, wenn alle Ereignistypen unbestimmt sind? Ich kann Ereignisse nach ihrem Typ gruppieren und es scheint genug zu sein, um nur das letzte Ereignis des bestimmten Typs zu posten. – Buckstabue

+0

Sie benötigen "ungesehene" Ereignis-Themen, damit Teilnehmer schließlich Ereignisse für diese Typen sehen. Andernfalls müssten Sie für jeden Abonnenten ein "nicht klassifiziertes" Subjekt und eine Liste von Ereignistypen haben, die in nicht klassifizierte fallen. –

Verwandte Themen