2017-05-31 4 views
0

Ich habe ein Problem beim Versuch, Mono aus dem Reactor mit Spring Cloud Stream zu verwenden, und kann nicht wirklich herausfinden, was vor sich geht.Mono <> mit Spring Cloud Stream

stelle ich mir einen Zuhörer wie dieses:

@StreamListener 
@Output(Urls.OUTUT) 
public Flux<String> expandUrls(@Input(Urls.INPUT) Flux<String> urlFormats) 
{ 
    return urlFormats 
     .map(this::expandUrl) 
     .flatMapIterable(urls -> urls); 
} 

es also im Grunde eine URL wie diese http://www.example.com/page/%d zu so etwas wie dieses

http://www.example.com/page/1

http://www.example.com/page/2

http://www.example.com/page/3

formatiert Erweiterung

Es funktioniert wie erwartet, aber als ich versuchte, es so zu tun:

@StreamListener 
@Output(Urls.OUTPUT) 
public Flux<String> expandUrls(@Input(Urls.INPUT) Mono<String> urlFormats) 
{ 
    return urlFormats 
     .repeat(3) 
     .zipWith(pageNumbers) 
     .map(this::formatUrl); 
} 

wo Seitennummer Flux.fromStream(Stream.iterate(1, p -> p+1).limit(3)) ist

ich die folgende Ausnahme erhalten

Caused by: java.lang.IllegalArgumentException: A method annotated with @StreamListener may use @Input or @Output annotations only in declarative mode and for parameters that are binding targets or convertible from binding targets. 

I der Ausnahme losgeworden indem Sie es so machen

@StreamListener(value = Urls.INPUT) 
@Output(Urls.OUTPUT) 
public Flux<String> expandUrls(Mono<String> urlFormats) 
{ 
    return urlFormats 
     .repeat(3) 
     .zipWith(pageNumbers) 
     .map(this::formatUrl); 
} 

Aber jetzt habe ich diese:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'http': was expecting ('true', 'false' or 'null') 
at [Source: http://www.example.com/page-%d,1,0.html; 

Meine Frage ist dann: Wie Mono zu verwenden, mit Frühling Wolke Strom. Ist es überhaupt möglich, es so zu benutzen? Wenn ja, wie geht das? Oh, ich benutze Kafka als Broker beim Kafka-Starter.

+1

Dies ist nur, weil SCSt 'Mono' für diesen Anwendungsfall nicht unterstützt. Nur "Flux" ist ausreichend. –

Antwort

1

Spring Cloud Stream Der @Input-Parametertyp von StreamListener unterstützt den Reaktortyp Flux nur, da er für reaktive Streaming-Anwendungen gut geeignet ist als Mono am @Input-Parametertyp.

Verwandte Themen