2016-07-21 1 views
1

den folgenden Code Betrachten wir für ein Update mit lang Polling zu hören:Erhalten Sie JAX-RS AsyncResponse, aber suspendieren später

Map<String, List<AsyncResponse>> tagMap = new ConcurrentGoodStuff(); 

// This endpoint listens for notifications of the tag 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("listen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag, 
     @Suspended final AsyncResponse response) { 
    tagMap.get(tag).add(response); 
} 

// This endpoint is for push-style notifications 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@PUT 
@Path("update/{tag}/{value}") 
public Response updateTag(
     @PathParam("tag") final String tag, 
     @PathParam("value") final String value) { 
    for(AsyncResponse response : tagMap.get(tag)) { 
     // Resumes all previously suspended responses 
     response.resume(value); 
    } 
    return Response.ok("cool whatever").build(); 
} 

Der Kunde einen Zuhörer mit der normalen Jersey Kunden hinzufügt AsyncInvoker, ruft die asynchrone Aufgabe, und dann ruft eine andere Task die Update-Methode auf.

Wenn ich das teste, stoße ich in eine Wettlaufsituation. Direkt nachdem ich den Listener asynchron auf listenForUpdates() hinzugefügt habe, führe ich eine Aktualisierung auf dem Endpunkt mit updateTag() synchron durch. Das Update wird jedoch ausgeführt, bevor der Listener hinzugefügt wird, und die asynchrone Antwort wird nicht fortgesetzt.

Eine Lösung ist, die suspend() Methode auf die Antwort nach, die es zu den Listener hinzufügen. Aber es ist nicht klar, wie dies zu tun ist, da @Suspended ein bereits suspendiertes AsyncResponse Objekt bereitstellt. Was soll ich tun, damit die asynchrone Antwort unterbrochen wird nur nach Hinzufügen zum Listener? Wird das tatsächlich die Suspend-Methode aufrufen? Wie kann ich dies mit dem Async-Client von Jersey oder mit einem anderen Long-Polling-Client erreichen?

Für Lösungen bin ich offen für verschiedene Bibliotheken, wie Atmosphäre oder Guava. Ich bin nicht offen für das Hinzufügen eines Thread.sleep() in meinem Test, da dies ein intermittierender Fehler ist, der darauf wartet, zu passieren.

+0

Ich weiß nicht genau, was Sie zu tun versuchen, aber Sie sollten sich einige Beispiele im Jersey-Projekt ansehen. Es gibt eine Reihe verschiedener asynchroner Beispiele. Ich denke, eines der Chat-Beispiele macht das, was du versuchst zu tun. –

+1

Ich denke [dies ist die eine] (https://github.com/jersey/jersey/tree/master/examples/server-async-managed). Nicht sicher, ob das genau das ist, was Sie versuchen, obwohl –

+0

Ich denke, die blockierende Warteschlange ist definitiv der Weg zu gehen, wenn ich sicherstellen will, dass der Verbrauch einer Überprüfung für den Verbrauch vorausgeht. Ich habe derzeit Updates, die immer nicht blockierend sind. Wenn ich ein Flag hinzufüge, um bei einem Listener, der gerade aktualisiert wird, zu blockieren, hilft das definitiv beim Testen. Cool, ich denke ich werde das machen. Vielen Dank. Fügen Sie das als "echte Antwort" hinzu und ich gebe Ihnen einige Internet-Punkte weiter. –

Antwort

0

Ich endete mit RxJava, aber nicht bevor ich mit einer BlockingQueue statt List in der Map eine genauso gute Lösung kam. Es geht ungefähr so:

ConcurrentMap<String, BlockingQueue<AsyncResponse>> tagMap = new ConcurrentGoodStuff(); 

// This endpoint initiates a listener array for the tag. 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("initListen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag) { 
    tagMap.putIfAbsent(tag, new LinkedBlockingQueue<>()); 
} 

// This endpoint listens for notifications of the tag 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@GET 
@Path("listen/{tag}") 
public void listenForUpdates(
     @PathParam("tag") final String tag, 
     @Suspended final AsyncResponse response) { 
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag); 

    if (responses != null) { 
     responses.add(response); 
    } 
} 

// This endpoint is for push-style notifications 
@Produces(MediaType.APPLICATION_JSON) 
@Consumes(MediaType.APPLICATION_JSON) 
@PUT 
@Path("update/{tag}/{value}") 
public Response updateTag(
     @PathParam("tag") final String tag, 
     @PathParam("value") final String value) { 
    BlockingQueue<AsyncResponse> responses = tagMap.get(tag); 

    if (responses == null) { 
     return Response.noContent().build(); 
    } 
    if (responses.isEmpty()) { 
     // Block-wait for an async listener 
     try { 
      AsyncResponse response = tagMap.poll(15, TimeUnit.SECONDS); 

      if (response == null) { 
       return Response.noContent().build(); 
      } 

      response.resume(value); 
     } catch (InterruptedException e) { 
      return Response.noContent().build(); 
     } 
    } else { 
     for (AsyncResponse response : responses) { 
      // Resumes all previously suspended responses 
      response.resume(value); 
     } 
    } 
    return Response.ok("cool whatever").build(); 
} 

Ich habe diesen genauen Code nicht getestet, aber ich habe eine Version davon in der Vergangenheit verwendet. Solange Sie den Endpunkt initListen zuerst synchron aufrufen, können Sie den asynchronen Endpunkt listen und dann den synchronen Endpunkt update aufrufen, und es wird keine signifikante Wettlaufsituation geben.

Es gibt einen leichten Hinweis auf eine Race-Bedingung im update Endpunkt, aber es ist geringfügig. Die Blockierungswarteschlange responses kann bei der Iteration leer werden, oder sie kann von mehreren Quellen unterschiedlich aktualisiert werden. Um dies zu erleichtern, habe ich die Methode drainTo(Collection) für eine instanziierte Datenstruktur pro Anforderung verwendet. Dies löst noch immer nicht den Anwendungsfall, in dem mehrere Clients versuchen, das gleiche Tag von Listenern zu aktualisieren, aber ich brauche diesen Anwendungsfall nicht.

Verwandte Themen