Diese Frage bezieht sich auf this one, in der ich gefragt habe, wie man Daten von einem Reactive Spring Controller streamen kann.Spring 5 Web Reactive Programming - WebClient ClassCastException beim Zurückmarseln von JSON von Spring Reactive Controller, der Daten streamt
Wie Rossen darauf hinwies, müssen wir verwenden, um die gestreamten Ergebnisse als Server gesendet Ereignisse zu senden, so weit so gut.
Ich habe ein Dienst wie folgt aus:
@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream")
public Flux<Alert> getAccountAlertsStreaming() {
return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"),
new Alert((long)2, "Alert message2"),
new Alert((long)3, "Alert message3")})
.delayMillis(1000)
.log();
}
es aus den Browser aufrufen, beginnen die 3 Ergebnisse mit Verzögerung von 1 Sekunde empfangen werden.
Ich wollte diesen Service aus einer WebClient nennen und implementiert es so aus:
@Component
public class AccountsServiceClient {
@Autowired
private WebClient webClient;
public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){
Flux<Alert> response = webClient
.perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream"))
.extract(bodyStream(Alert.class));
return response;
}
}
Und dies ist der Testcode:
@Test
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class})
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private AccountsServiceClient client;
public void testNumbersServiceClientStreamingTest() throws InterruptedException{
CountDownLatch latch = new CountDownLatch(1);
Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080");
alerts.doOnComplete(() -> {
latch.countDown();
}).subscribe((n) -> {
logger.info("------------> GOT ALERT {}", n);
});
latch.await();
}
}
Das Problem ist, dass, wenn der Client versucht, Um die Ergebnisse zu extrahieren, wie sie erhalten werden, kann keiner der HttpMessageReader'
s text/event-stream
+ Alert.class
lesen.
public class ResponseExtractors {
protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders,
ResolvableType responseType, MediaType contentType) {
return messageReaders.stream()
.filter(e -> e.canRead(responseType, contentType))
.findFirst()
.orElseThrow(() ->
new WebClientException(
"Could not decode response body of type '" + contentType
+ "' with target type '" + respons
eType.toString() + "'"));
}
Ausnahme:
reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at reactor.core.Exceptions.bubble(Exceptions.java:97)
at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263)
at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169)
at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161)
at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010)
at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71)
at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert'
at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200)
at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181)
at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89)
at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126)
... 37 common frames omitted
Was ist der Unterschied zwischen einem WebClient und einem Webbrowser aus Sicht des Servers? –
Ich sehe Ihren Punkt, aber, abgesehen von der neuen Semantik, wenn wir nicht die Ergebnisse gestreamt bekommen, aber alle gleichzeitig, was bringt es, den neuen 'WebClient' anstelle der alten' RestTemplate' zu verwenden ? – codependent