Ich verwende einen Multiplayer-Spielclient mit dem Namen AppWarp (http://appwarp.shephertz.com), in dem Sie Ereignislistener hinzufügen können, die bei Ereignissen zurückgerufen werden. Nehmen wir an, wir sprechen über den Connection Listener, wo Sie diese Schnittstelle implementieren müssen :Wie konvertiert man Listener korrekt in Reaktive (Observables) mit RxJava?
public interface ConnectionRequestListener {
void onConnectDone(ConnectEvent var1);
void onDisconnectDone(ConnectEvent var1);
void onInitUDPDone(byte var1);
}
Mein Ziel ist es, in erster Linie eine Reaktiv-Version dieses Client erstellen in meiner Apps verwendet wird intern statt den Auftraggeber mit mir direkt (ich auch später Schnittstellen angewiesen werden, anstatt nur in Abhängigkeit von der WarpClient selbst wie im Beispiel, aber das ist nicht der wichtige Punkt, lesen Sie bitte meine Frage ganz am Ende).
Also, was ich tat, ist wie folgt:
1) ich ein neues Ereignis eingeführt, nannte es RxConnectionEvent (die vor allem Gruppen verbindungsbezogene Ereignisse) wie folgt:
public class RxConnectionEvent {
// This is the original connection event from the source client
private final ConnectEvent connectEvent;
// this is to identify if it was Connection/Disconnection
private final int eventType;
public RxConnectionEvent(ConnectEvent connectEvent, int eventType) {
this.connectEvent = connectEvent;
this.eventType = eventType;
}
public ConnectEvent getConnectEvent() {
return connectEvent;
}
public int getEventType() {
return eventType;
}
}
2) einige Ereignistypen wie folgt erstellt:
public class RxEventType {
// Connection Events
public final static int CONNECTION_CONNECTED = 20;
public final static int CONNECTION_DISCONNECTED = 30;
}
3) Erstellt die folgende beobachtbar, die meine neue RxConnectionEvent
import com.shephertz.app42.gaming.multiplayer.client.WarpClient;
import com.shephertz.app42.gaming.multiplayer.client.events.ConnectEvent;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
public class ConnectionObservable extends BaseObservable<RxConnectionEvent> {
private ConnectionRequestListener connectionListener;
// This is going to be called from my ReactiveWarpClient (Factory) Later.
public static Observable<RxConnectionEvent> createConnectionListener(WarpClient warpClient) {
return Observable.create(new ConnectionObservable(warpClient));
}
private ConnectionObservable(WarpClient warpClient) {
super(warpClient);
}
@Override
public void call(final Subscriber<? super RxConnectionEvent> subscriber) {
subscriber.onStart();
connectionListener = new ConnectionRequestListener() {
@Override
public void onConnectDone(ConnectEvent connectEvent) {
super.onConnectDone(connectEvent);
callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_CONNECTED));
}
@Override
public void onDisconnectDone(ConnectEvent connectEvent) {
super.onDisconnectDone(connectEvent);
callback(new RxConnectionEvent(connectEvent, RxEventType.CONNECTION_DISCONNECTED));
}
// not interested in this method (for now)
@Override
public void onInitUDPDone(byte var1) { }
private void callback(RxConnectionEvent rxConnectionEvent)
{
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(rxConnectionEvent);
} else {
warpClient.removeConnectionRequestListener(connectionListener);
}
}
};
warpClient.addConnectionRequestListener(connectionListener);
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
onUnsubscribed(warpClient);
}
}));
}
@Override
protected void onUnsubscribed(WarpClient warpClient) {
warpClient.removeConnectionRequestListener(connectionListener);
}
}
4) und schließlich meine BaseObservable sieht wie folgt aus emittiert:
public abstract class BaseObservable<T> implements Observable.OnSubscribe<T> {
protected WarpClient warpClient;
protected BaseObservable (WarpClient warpClient)
{
this.warpClient = warpClient;
}
@Override
public abstract void call(Subscriber<? super T> subscriber);
protected abstract void onUnsubscribed(WarpClient warpClient);
}
Meine Frage ist vor allem: ist meine Implementierung oben richtig oder sollte ich stattdessen separate Observable für jedes Ereignis erstellen, aber wenn ja, hat dieser Client mehr als 40-50 Ereignisse, muss ich für jedes Ereignis separate Observable erstellen?
Ich benutze auch den Code wie oben (in einer einfachen „non-final“ Integrationstest verwendet) folgt:
Sie vermeidenpublic void testConnectDisconnect() {
connectionSubscription = reactiveWarpClient.createOnConnectObservable(client)
.subscribe(new Action1<RxConnectionEvent>() {
@Override
public void call(RxConnectionEvent rxEvent) {
assertEquals(WarpResponseResultCode.SUCCESS, rxEvent.getConnectEvent().getResult());
if (rxEvent.getEventType() == RxEventType.CONNECTION_CONNECTED) {
connectionStatus = connectionStatus | 0b0001;
client.disconnect();
} else {
connectionStatus = connectionStatus | 0b0010;
connectionSubscription.unsubscribe();
haltExecution = true;
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
fail("Unexpected error: " + throwable.getMessage());
haltExecution = true;
}
});
client.connectWithUserName("test user");
waitForSomeTime();
assertEquals(0b0011, connectionStatus);
assertEquals(true, connectionSubscription.isUnsubscribed());
}
Ich schlage vor,