Wir arbeiten mit project reactor
und haben ein großes Problem im Moment. Dies ist, wie wir (veröffentlichen unsere Daten) produzieren:Abonnenten onnext enthält keinen vollständigen Artikel
public Flux<String> getAllFlux() {
return Flux.<String>create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
ObjectMapper mapper = new ObjectMapper();
while(iterator.hasNext()) {
try {
sink.next(mapper.writeValueAsString(iterator.next().getValue()));
} catch (IOException e) {
e.printStackTrace();
}
}
sink.complete();
}
} .start();
});
}
Wie Sie sehen können wir Daten aus einem Iterator nehmen und veröffentlichen jedes Element in diesem Iterator als JSON-String. Unsere Teilnehmer macht folgendes:
flux.subscribe(new Subscriber<String>() {
private Subscription s;
int amount = 1; // the amount of received flux payload at a time
int onNextAmount;
String completeItem="";
ObjectMapper mapper = new ObjectMapper();
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe");
this.s = s;
this.s.request(amount);
}
@Override
public void onNext(String item) {
MyObject myObject = null;
try {
System.out.println(item);
myObject = mapper.readValue(completeItem, MyObject.class);
System.out.println(myObject.toString());
} catch (IOException e) {
System.out.println(item);
System.out.println("failed: " + e.getLocalizedMessage());
}
onNextAmount++;
if (onNextAmount % amount == 0) {
this.s.request(amount);
}
}
@Override
public void onError(Throwable t) {
System.out.println(t.getLocalizedMessage())
}
@Override
public void onComplete() {
System.out.println("completed");
});
}
Wie Sie sehen können wir einfach drucken, die String Element, das wir empfangen und es in ein Objekt mit jackson Wrapper Parsen. Das Problem, das wir bekamen ist nun, dass für die meisten unserer Produkte alles funktioniert:
{"itemId": "someId", "itemDesc", "some description"}
Aber für einige Elemente wird die Zeichenfolge wie folgt beispielsweise abgeschnitten:
{"itemId": "some"
Und das nächste Element danach wäre
"Id", "itemDesc", "some description"}
Es gibt kein Muster für diese Schnitte. Es ist völlig zufällig und es ist jedes Mal anders, wenn wir diesen Code ausführen. Natürlich bekommt unser Jackson einen Fehler Unexpected end of Input
mit diesem Verhalten.
Was verursacht solch ein Verhalten und wie können wir es lösen?