2016-08-05 2 views
3

Diese Frage bezieht sich auf this one, in der ich gefragt habe, wie man Daten von einem Reactive Spring Controller streamen kann.Spring 5 Web Reactive Programming - WebClient ClassCastException beim Zurückmarseln von JSON von Spring Reactive Controller, der Daten streamt

Wie Rossen darauf hinwies, müssen wir verwenden, um die gestreamten Ergebnisse als Server gesendet Ereignisse zu senden, so weit so gut.

Ich habe ein Dienst wie folgt aus:

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming() { 
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
             new Alert((long)2, "Alert message2"), 
             new Alert((long)3, "Alert message3")}) 
       .delayMillis(1000) 
       .log(); 
} 

es aus den Browser aufrufen, beginnen die 3 Ergebnisse mit Verzögerung von 1 Sekunde empfangen werden.

Ich wollte diesen Service aus einer WebClient nennen und implementiert es so aus:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(Alert.class)); 
     return response; 
    }  
} 

Und dies ist der Testcode:

@Test 
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class}) 
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AccountsServiceClient client; 

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{ 

     CountDownLatch latch = new CountDownLatch(1); 

     Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080"); 
     alerts.doOnComplete(() -> { 
      latch.countDown(); 
     }).subscribe((n) -> { 
      logger.info("------------> GOT ALERT {}", n); 
     }); 

     latch.await(); 
    } 
} 

Das Problem ist, dass, wenn der Client versucht, Um die Ergebnisse zu extrahieren, wie sie erhalten werden, kann keiner der HttpMessageReader' s text/event-stream + Alert.class lesen.

public class ResponseExtractors { 

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders, 
       ResolvableType responseType, MediaType contentType) { 

      return messageReaders.stream() 
        .filter(e -> e.canRead(responseType, contentType)) 
        .findFirst() 
        .orElseThrow(() -> 
          new WebClientException(
            "Could not decode response body of type '" + contentType 
              + "' with target type '" + respons 

eType.toString() + "'")); 
    } 

Ausnahme:

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) 
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169) 
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161) 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) 
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103) 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) 
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71) 
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) 
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) 
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source) 
    at java.util.Optional.orElseThrow(Optional.java:290) 
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200) 
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181) 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126) 
    ... 37 common frames omitted 
+0

Was ist der Unterschied zwischen einem WebClient und einem Webbrowser aus Sicht des Servers? –

+0

Ich sehe Ihren Punkt, aber, abgesehen von der neuen Semantik, wenn wir nicht die Ergebnisse gestreamt bekommen, aber alle gleichzeitig, was bringt es, den neuen 'WebClient' anstelle der alten' RestTemplate' zu ​​verwenden ? – codependent

Antwort

0

Vielleicht sollte dies vom Framework automatisch behandelt werden. Auf jeden Fall habe ich gelöst es selbst die JSON-Stream-Daten unmarshalling:

WebConfigClient:

@Configuration 
public class WebClientConfig { 

    @Bean 
    public ObjectMapper jacksonObjectMapper(){ 
     return new ObjectMapper(); 
    } 

    @Bean 
    public WebClient webClient(){ 
     WebClient webClient = new WebClient(new ReactorClientHttpConnector()); 
     return webClient; 
    } 

} 

Service-Client:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    @Autowired 
    private ObjectMapper jacksonObjectMapper; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(String.class)) 
       .map((e -> { 
        try { 
         e = e.substring(e.indexOf(":")+1); 
         Alert a = jacksonObjectMapper.readValue(e, Alert.class); 
         return a; 
        } catch (Exception e1) { 
         e1.printStackTrace(); 
         return null; 
        } 

       })); 
     return response; 
    } 

} 

UPDATE: Ab Frühling 5 M4 Dies geschieht durch den Rahmen. Sie können die Lösung mit der neuen API hier überprüfen: Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

0

Es gibt bereits ein Problem dafür. Bitte Kommentar/Abstimmung für SPR-14539.

Verwandte Themen