2015-05-03 8 views
8

Ich habe eine Server-Client-Anwendung (Java EE & Android), Kommunikation über WebSockets. Die Kommunikation funktioniert und auch das Protokoll selbst zum Senden von Objekten als JSON, die korrekt verpackt, serialisiert, gesendet, deserialisiert, ausgepackt und neu aufgebaut werden. Beide Anwendungen verwenden ein anderes Bibliotheksprojekt mit allen möglichen Anforderungs- und Antwortklassen.Anfrage-Antwort-Synchronisation/Abgleich in Null Kontext

Nun zu meinem Problem: Die Bibliothek sollte auch eine Strategie für nicht blockierende Kommunikation implementieren, aber transparente Anfrage-Antwort-Implementierung. Wahrscheinlich bin ich nicht der Erste mit diesem Problem, also denke ich, dass es da draußen einige nette Implementierungen geben könnte :).

Was ich will:

// server should sleep 5000ms and then return 3*3 
Future<Integer> f1 = server.put(
    new SleepAndReturnSquareRequest(5000, 3), 
    new FutureCallback<Integer>{ 
    public void onSuccess(Integer square) { 
     runOnUiThread(new Runnable{ 
     // Android Toast show square 
     }); 
    } 

    // impl onFailure 
    } 
); 

Future<Date> f2 = server.put(
    new TimeRequest(), 
    new FutureCallback<Date>{ 
    public void onSuccess(Date date) { 
     // called before other onSuccess 
    } 

    // impl onFailure 
    } 
); 

// e.g. when the activity in android changes I'll cancel all futures. That means no more callbacks and (later) if possible client sends cancellations to the server for long requests. 

Der Code sollte eine SleepAndReturnRequest und dann ein TimeRequest natürlich nicht blockierend senden. Die erste Anfrage dauert 5 Sekunden, die zweite Anfrage fast null Millisekunden. Ich erwarte, dass die Implementierung den zweiten Rückruf sofort nach Erhalt der Antwort aufruft, während der erste Rückruf nach etwa 5 Sekunden aufgerufen wird. Die Implementierung ist verantwortlich für den Anfrage-Antwort-Abgleich auf der anfragenden Seite.

Was ich habe versucht, und dachte darüber nach:

Googles Guave listenable future ein guter Ansatz für die ‚reagiert Seite‘ wäre ich denke, weil es nur eine Aufgabe auf jedem Thread ausgeführt wird, sendet das Ergebnis zurück Am Ende. Das sollte einfacher sein.

Für die 'anfragende Seite' benötige ich eine Implementierung, die einer Nachricht eine eindeutige Kennung hinzufügt, um die Antwort abgleichen zu können. Hoffentlich können Sie mir einige Pakete sagen, die diesen Job erledigen.

Vielen Dank für Ihre Hilfe.

// edit:

ich glaube, meine Frage falsch verstanden wurde, oder es ist nicht präzise genug. Überlegen Sie, wie Sie GET oder POST über Websocket implementieren. Jede GET/POST-Anfrage hat eine Antwort und dann wird die Verbindung geschlossen. Clients verbinden sich mit einem bestimmten Port, Server nimmt einen Thread aus einem Thread-Pool, verarbeitet die Anfrage und antwortet. Die Übereinstimmung von Anfrage zu Antwort, denke ich, wird in der Transportschicht # 4 gemacht.

Da ich WebSockets verwenden wollen, muss ich Schicht, die passende Software implementieren 7.

Hier einige Schritte ich zu implementieren. K ist der eindeutige Schlüsseltyp, V ist der generische Typ des Inhalts einer Nachricht. Das könnte eine Zeichenfolge, ein Byte-Stream oder was auch immer sein.

public class Synchronizer<K, V> implements UniqueMessageListener<K, V> { 

    private final ConcurrentMap<K, FutureCallback<V>> callbackMap = new ConcurrentHashMap<>(); 

    private final ListeningExecutorService executor; 

    private final UniqueMessageFactory<K, V> factory; 
    private final UniqueMessageSender<K, V> sender; 
    private UniqueMessageReceiver<K, V> receiver = null; 

    public Synchronizer(
      ListeningExecutorService executor, 
      UniqueMessageFactory<K, V> factory, 
      UniqueMessageSender<K, V> sender 
    ) { 
     this.executor = executor; 
     this.factory = factory; 
     this.sender = sender; 
    } 

    public void register(UniqueMessageReceiver<K, V> receiver) { 

     unregister(); 

     this.receiver = receiver; 
     receiver.addListener(this); 
    } 

    public void unregister() { 
     if(receiver != null) { 
      receiver.removeListener(this); 
      receiver = null; 
     } 
    } 

    public Future<V> put(Message<V> message, final FutureCallback<V> callback) { 
     final UniqueMessage<K, V> uniqueMessage = factory.create(message); 

     final Future<Boolean> sendFuture = sender.send(uniqueMessage); 

     final ListenableFuture<Boolean> listenableSendFuture = 
       JdkFutureAdapters.listenInPoolThread(sendFuture, executor); 


     listenableSendFuture.addListener(
       new Runnable() { 
        @Override 
        public void run() { 
         try { 
          if(listenableSendFuture.get() == true) { 
           callbackMap.put(
             uniqueMessage.getId(), 
             callback 
           ); 
          } else { 
           // maybe try it later again? 
          } 
         } catch(Exception e) { 
          // ... 
         } 
        } 
       }, 
       executor 
     ); 

     // implement cancel 
     return new SynchronizeFuture<>(
       listenableSendFuture, 
       callback 
     ); 
    } 

    @Override 
    public void onReceive(UniqueMessage<K, V> message) { 
     K id = message.getId(); 
     FutureCallback<V> callback; 

     callback = callbackMap.remove(id); 

     if(callback != null) { 
      callback.onSuccess(message.getContent()); 
     } 
    } 
} 

Es gibt eine Menge für mich zu testen, aber ich denke, es wird funktionieren.

+0

Gibt es etwas in oder Grizzly netty für mich zu arbeiten? – Aitch

+0

Sie benötigen etwas, um eine Transaktions-ID auf der Client-Seite zu generieren, das ist Ihre Frage? –

+0

@Mr_Thorynque Nr.Es geht darum, eine Synchronisation von Anfragen und Antworten über Websockets zu implementieren. Die Verwendung einer eindeutigen Transaktions-ID ist ziemlich offensichtlich. Lies meinen Code oben. – Aitch

Antwort

-1

Sie könnten ProtoBuf-RPC-Pro versuchen - aber es ist nicht für WebSockets obwohl ich denke, dass es ein Projekt auf GitHub es unterstützt, ist :)

Verwandte Themen