2016-04-10 20 views
0

Ich lese eine Menge Literatur über die Rx und auf der einen Seite ist alles klar, aber auf der anderen Seite ist nichts klar. Ich versuche, eine einfache Filterung und Speicherung von Daten vom Server mit dieser Bibliothek durchzuführen, und es funktioniert nicht wie erwartet. Ich brauche eine einfache Kanallistenverwaltung zu implementieren:RxAndroid. Einfaches Cachen und Filtern von Daten

1. Cache on disk and in memory 
2. When user requested - return filtered channels 
3. All Subscribers that was attached to this Observable must be notified if filtered cahnnels list was changed (call onNext() for all Subscribers) 

schrieb ich folgendes:

ArrayList<Channel> channels = null; 
ArrayList<Channel> filteredChannels = null; 

Observable<ArrayList<Channel>> filteredObservable = Observable.create() 
// here I need to check if cache is valid, if no - download from server 
// then filter channels and return filteredChannels in onNext call 
// all subscribers that called subscribe before and not called unsubscribe must be notified if filteredChannels changed 
// how to implement this? 
.subscribeOn(Schedulers.io()); 

public void setFilter(Filter filter) { 
// update filteredChannels with the new filter and notify all Subscribers (call onNext()). How to implement this? 
} 

public Observable<ArrayList<Channel>> getFilteredChannels() { 
    return filteredObservable; 
} 

Habe ich das richtig verstanden Logik des Rx-Muster oder nicht? Danke im Voraus.

Antwort

0

fand ich die Lösung für diese Aufgabe: Ich flatMap und map verwenden müssen, und ich versuchte, BehaviourSubject zu verwenden, um alle Abonnenten für variable Änderung zu informieren, aber diese Lösung ist überhaupt nicht stabil, weil der Fehler MissingBackpressureException, dass, warum ich ist schrieb weiter ObsevableValue Klasse:

public class ObservableValue<T> { 
    private static final String TAG = ObservableValue.class.getSimpleName(); 

    private ArrayList<Registration> subscriptions = new ArrayList<>(); 
    private T value; 
    private int index; 
    private boolean firstNotify; 
    private ReentrantLock lock = new ReentrantLock(); 

    public ObservableValue() { 
     firstNotify = false; 
    } 

    public ObservableValue(T defaultValue) { 
     value = defaultValue; 
     firstNotify = true; 
    } 

    @SuppressWarnings("unchecked") 
    public void setValue(T value) { 
     lock.lock(); 
     this.value = value; 
     for (Registration listener: subscriptions) { 
      if (!listener.isFinished()) { 
       listener.getListener().valueChanged(value); 
      } 
     } 
     lock.unlock(); 
    } 

    public T getValue() { 
     lock.lock(); 
     T result = value; 
     lock.unlock(); 
     return result; 
    } 

    public Registration subscribe(ValueChangeListener<T> listener) { 
     lock.lock(); 
     Registration s = new Registration(this, index, listener); 
     index++; 
     subscriptions.add(s); 
     if (firstNotify||value!=null) { 
      listener.valueChanged(value); 
     } 
     lock.unlock(); 
     return s; 
    } 

    protected void finish(int index) { 
     lock.lock(); 
     for (int i=0;i<subscriptions.size();i++) { 
      Registration s = subscriptions.get(i); 
      if (s.getIndex()==index) { 
       subscriptions.remove(i); 
       break; 
      } 
     } 
     lock.unlock(); 
    } 
} 

public abstract class ValueChangeListener<T> { 
    private Looper looper; 

    public ValueChangeListener() {} 

    public ValueChangeListener(Looper looper) { 
     this.looper = looper; 
    } 

    public void valueChanged(T data) { 
     if (looper!=null) { 
      Handler handler = new Handler(looper, msg -> { 
       onValueChanged(data); 
       return true; 
      }); 
      handler.sendEmptyMessage(0); 
     } else { 
      onValueChanged(data); 
     } 
    } 

    public abstract void onValueChanged(T data); 
} 

public class Registration { 
    private ObservableValue observableValue; 
    private int index; 
    private ValueChangeListener listener; 
    private volatile boolean finished = false; 

    protected Registration(ObservableValue observableValue, int index, ValueChangeListener listener) { 
     this.observableValue = observableValue; 
     this.index = index; 
     this.listener = listener; 
    } 

    protected ValueChangeListener getListener() { 
     return listener; 
    } 

    protected int getIndex() { 
     return index; 
    } 

    public boolean isFinished() { 
     return finished; 
    } 

    public void finish() { 
     finished = true; 
     observableValue.finish(index); 
    } 
} 

Jetzt funktioniert es wie erwartet. Wie in Rx gibt ObservableValue ein Registrierungsobjekt zurück, das finished() sein muss, wenn es nicht mehr benötigt wird.

0

Ich würde .map() 's Funktion verwenden, um Ihre Liste zu filtern und dann Ihre gewünschte Ausgabe zurückzugeben. Dies würde bedeuten, dass Ihr Observable die gefilterten Ergebnisse ausgibt.

Dann könnten Sie die gefilterte Liste zu Ihrem Cache in Ihre .subscribe() Funktionen übergeben.