2017-07-10 3 views
0

Wir arbeiten mit project reactor und haben ein großes Problem im Moment. Dies ist, wie wir (veröffentlichen unsere Daten) produzieren:Abonnenten onnext enthält keinen vollständigen Artikel

public Flux<String> getAllFlux() { 
    return Flux.<String>create(sink -> { 
     new Thread(){ 
      public void run(){ 
       Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator(); 
       ObjectMapper mapper = new ObjectMapper(); 

       while(iterator.hasNext()) { 
        try { 
         sink.next(mapper.writeValueAsString(iterator.next().getValue())); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       } 

       sink.complete(); 
      } 
     } .start(); 
    }); 
} 

Wie Sie sehen können wir Daten aus einem Iterator nehmen und veröffentlichen jedes Element in diesem Iterator als JSON-String. Unsere Teilnehmer macht folgendes:

flux.subscribe(new Subscriber<String>() { 
    private Subscription s; 

    int amount = 1; // the amount of received flux payload at a time 
    int onNextAmount; 

    String completeItem=""; 

    ObjectMapper mapper = new ObjectMapper(); 

    @Override 
    public void onSubscribe(Subscription s) { 
     System.out.println("subscribe"); 

     this.s = s; 
     this.s.request(amount); 
    } 

    @Override 
    public void onNext(String item) { 
     MyObject myObject = null; 

     try { 
      System.out.println(item); 

      myObject = mapper.readValue(completeItem, MyObject.class); 

      System.out.println(myObject.toString()); 
     } catch (IOException e) { 
      System.out.println(item); 
      System.out.println("failed: " + e.getLocalizedMessage()); 
     } 

     onNextAmount++; 

     if (onNextAmount % amount == 0) { 
      this.s.request(amount); 
     } 
    } 

    @Override 
    public void onError(Throwable t) { 
     System.out.println(t.getLocalizedMessage()) 
    } 

    @Override 
    public void onComplete() { 
     System.out.println("completed"); 
    }); 
} 

Wie Sie sehen können wir einfach drucken, die String Element, das wir empfangen und es in ein Objekt mit jackson Wrapper Parsen. Das Problem, das wir bekamen ist nun, dass für die meisten unserer Produkte alles funktioniert:

{"itemId": "someId", "itemDesc", "some description"} 

Aber für einige Elemente wird die Zeichenfolge wie folgt beispielsweise abgeschnitten:

{"itemId": "some" 

Und das nächste Element danach wäre

"Id", "itemDesc", "some description"} 

Es gibt kein Muster für diese Schnitte. Es ist völlig zufällig und es ist jedes Mal anders, wenn wir diesen Code ausführen. Natürlich bekommt unser Jackson einen Fehler Unexpected end of Input mit diesem Verhalten.

Was verursacht solch ein Verhalten und wie können wir es lösen?

Antwort

1

Lösung:

Senden Sie das Objekt innerhalb des Flusses anstelle des String:

public Flux<ItemIgnite> getAllFlux() { 
    return Flux.create(sink -> { 
     new Thread(){ 
      public void run(){ 
       Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator(); 

       while(iterator.hasNext()) { 
        sink.next(iterator.next().getValue()); 
       } 
      } 
     } .start(); 
    }); 
} 

und verwenden Sie den folgenden produces Typen:

@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json") 

Der Schlüssel hier ist stream+json zu verwenden und nicht nur json.

Verwandte Themen