2017-11-17 5 views
3

New Spring hat einige WebSocketClient-Beispiel unter Spring documentation.Beispiele für die Verwendung ReactorNettyWebSocketClient

WebSocketClient client = new ReactorNettyWebSocketClient(); 
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000); 

Aber es ist sehr kurz und nicht klar:

  1. Wie eine Nachricht an den Server senden (abonnieren Sie einen Kanal)?
  2. Dann behandeln eingehende Stream und emittieren Flux-Nachrichten?
  3. Erneute Verbindung zum Server herstellen, wenn die Verbindung unterbrochen ist?

Könnte jemand komplexeres Beispiel zur Verfügung stellen?

UPD. Ich habe versucht, wie etwas zu tun:

public Flux<String> getStreaming() { 

    WebSocketClient client = new ReactorNettyWebSocketClient(); 
    EmitterProcessor<String> output = EmitterProcessor.create(); 
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }"); 

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"), 
      session -> session 
        .send(input.map(session::textMessage)) 
        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()) 
        .then()); 

    return output.doOnSubscribe(s -> sessionMono.subscribe()); 
} 

Aber das gibt nur eine Nachricht. Als hätte ich kein Abo bekommen.

Antwort

1

Ich nehme an, dass Sie einen "Echo" -Dienst verwenden. Um einige Nachrichten von dem Dienst zu erhalten, müssen Sie sie in den Websocket schieben und der Dienst wird sie Ihnen "zurückgeben".

In Ihrem Beispielcode schreiben Sie nur ein einzelnes Element in den Websocket. Sobald Sie weitere Nachrichten in den Socket schieben, erhalten Sie mehr zurück.

Ich habe den Code für die Verbindung zu ws://echo.websocket.org anstelle eines lokalen Dienstes angepasst. Wenn Sie zu /stream navigieren, sehen Sie jede Sekunde eine neue Nachricht.

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) 
public Flux<String> getStreaming() throws URISyntaxException { 

    Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date()))) 
     .delayElements(Duration.ofSeconds(1)); 

    WebSocketClient client = new ReactorNettyWebSocketClient(); 
    EmitterProcessor<String> output = EmitterProcessor.create(); 

    Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage)) 
     .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then()); 

    return output.doOnSubscribe(s -> sessionMono.subscribe()); 
} 

Hope this helps ...

Verwandte Themen