2016-12-14 4 views
0

Ich habe eine ziemlich komplexe Kette von Rx-Code, der verwendet wird, um eine Reihe von Operationen durchzuführen. Im Wesentlichen startet ein Observable beim Start eine Liste von Elementen. Für jeden Artikel wird eine Verbindung hergestellt (diese Artikel sind BLE-Geräte) und ein Merkmal wird geschrieben.Benachrichtigen von Fehlerereignissen in der Kette ohne Verwendung von traditionellen Listener

Diese Eigenschaft wird alle 10 Sekunden neu geschrieben, bis ein Fehler auftritt (z. B. Herausziehen der Batterie). Danach ist das nächste Element in der ursprünglichen Liste verbunden und so weiter.

Hier ist, wo die meisten der Aktion tritt:

public Observable<String> connectForPolice(String name, RemovalListener listener) { 

     StringBuilder builder = new StringBuilder(mAddress); 
     builder.setCharAt(mAddress.length() - 1, '4'); 
     RxBleDevice device = mRxBleClient.getBleDevice(builder.toString()); 

     return device.establishConnection(mContext, false) 
       .timeout(12, TimeUnit.SECONDS) 
       .doOnError(throwable -> { 
        Log.d(TAG, "Error thrown after timeout."); 
        throwable.printStackTrace(); 
       }) 
       .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() { 
        @Override 
        public Observable<byte[]> call(RxBleConnection rxBleConnection) { 
         byte[] value = new byte[1]; 
         value[0] = (byte) (3 & 0xFF); 

         return Observable // ... we return an observable ... 
           .defer(() -> { 
            return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value); 
           }) 
           .repeatWhen(observable -> { 
            return observable.delay(10, TimeUnit.SECONDS); 
           }); 
        } 
       }) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .onErrorResumeNext(throwable -> { 
         throwable.printStackTrace(); 
         listener.onFinished(name); 
        return Observable.empty(); 
       }); 
    } 

Es gibt hier ein paar Probleme. Wie Sie sehen, wird ein traditioneller Listener an diese Methode übergeben und dieser Listener wird in onErrorResumeNext aufgerufen. Dieser Listener wird verwendet, um das Element aus einem RecyclerView in der Aktivität zu entfernen, aber das ist nicht wichtig. Das Problem ist, ein Listener auf diese Weise zu verwenden ist ein bisschen ein Anti-Pattern von Rx. Unten ist der Rest des entsprechenden Code, den Code enthält, die das obige Beispiel nennt:

@Override 
    public void onFinished(String deviceName) { 
     mAdapter.removeItem(deviceName); 
    } 

    private void startConnecting() { 
     mIsBeepingAll = true; 
     mConnectingSubscription = Observable.from(mAdapter.getItems()) 
       .flatMap((Func1<Device, Observable<?>>) device -> { 
        Log.d(TAG, "connecting for policing"); 
        return device.connectForPolice(device.getName(), PoliceActivity.this); 
       }, 1) 
       .doOnError(throwable -> throwable.printStackTrace()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<Object>() { 
        @Override 
        public void onCompleted() { 
         Log.d(TAG, "onCompleted..."); 
        } 

        @Override 
        public void onError(Throwable e) { 
         e.printStackTrace(); 
         Log.d(TAG, "onError..."); 
        } 

        @Override 
        public void onNext(Object o) { 
         Log.d(TAG, "onNext... "); 
        } 
       }); 
    } 

Die Umsetzung des Zuhörers enthalten ist. Die Frage ist, wie kann ich das Äquivalent dessen ausführen, was der Hörer mit Rx tut?

Antwort

1

Die einfachste, dies zu ändern wäre:

  .onErrorResumeNext(throwable -> { 
       throwable.printStackTrace(); 
       listener.onFinished(name); 
       return Observable.empty(); 
      }); 

in diese:

  .onErrorResumeNext(throwable -> { 
       throwable.printStackTrace(); 
       return Observable.just(name); 
      }); 

Und der Anrufer würde verwenden:

private void startConnecting() { 
    mIsBeepingAll = true; 
    mConnectingSubscription = Observable.from(mAdapter.getItems()) 
      .flatMap((Func1<Device, Observable<?>>) device -> { 
       Log.d(TAG, "connecting for policing"); 
       return device.connectForPolice(device.getName(), PoliceActivity.this); 
      }, 1) 
      .doOnError(throwable -> throwable.printStackTrace()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Subscriber<Object>() { 
       @Override 
       public void onCompleted() { 
        Log.d(TAG, "onCompleted..."); 
       } 

       @Override 
       public void onError(Throwable e) { 
        e.printStackTrace(); 
        Log.d(TAG, "onError..."); 
       } 

       @Override 
       public void onNext(String deviceName) { 
        Log.d(TAG, "onNext... "); 
        mAdapter.removeItem(deviceName); 
       } 
      }); 
} 
+0

Dies ist, was ich hatte ursprünglich gedacht zu tun . Ich war unter der Annahme, dass "Observable.empty()" benötigt wurde, so dass "onCompleted()" sofort aufgerufen würde, so dass es sich erneut abonnieren würde, um mit dem nächsten Element in der Liste zu arbeiten. Ist das nicht der Fall? Ich muss das nochmal testen. – Orbit

+0

Das ist wahr. 'Observable.empty()' gibt nichts aus - wird nur vervollständigt. 'Observable.just (Objekt)' emittiert ein "Objekt" und schließt dann ab. Es wird verwendet, weil Sie auf eine externe Trennung angewiesen sind (was unter normalen Umständen eine Ausnahme darstellt) und nicht den gesamten Stream ausschließen möchten. –

+0

Ah ok, ich dachte nicht an "Observable.just()" sofort abgeschlossen. – Orbit

Verwandte Themen