2017-12-21 2 views
0

Ich möchte alle emittierten Puffer der rs Observable in eine Datei mit einem Vert.x 3.5.0 Pump (RxJava2 Variante) streamen. Das folgende Beispiel funktioniert irgendwie nicht und ich weiß nicht warum.Wie schreibt man Observable <Buffer> in Datei mit einer Vert.x Pumpe?

import static org.junit.Assert.assertEquals; 

import java.io.File; 
import java.io.IOException; 

import org.apache.commons.io.FileUtils; 
import org.junit.Test; 

import io.reactivex.Observable; 
import io.vertx.core.file.OpenOptions; 
import io.vertx.reactivex.core.Vertx; 
import io.vertx.reactivex.core.buffer.Buffer; 
import io.vertx.reactivex.core.file.AsyncFile; 
import io.vertx.reactivex.core.streams.Pump; 

public class PumpTest { 

    @Test 
    public void testPump() throws IOException { 
     Vertx vertx = Vertx.vertx(); 
     File file = new File("target/test"); 
     file.delete(); 

     Observable<Buffer> rs = Observable.just(Buffer.buffer("test123")); 
     vertx.fileSystem().rxOpen(file.getAbsolutePath(), new OpenOptions()).map(f -> { 
      Pump pump = Pump.pump(rs, f); 
      pump.start(); 
      return f; 
     }).map(AsyncFile::flush).subscribe(AsyncFile::close); 
     assertEquals("test123", FileUtils.readFileToString(file)); 
    } 
} 
+0

Ich habe völlig übersehen, dass Subskription nicht blockiert ist und der Beispielcode nicht wirklich ausgeführt wurde. Sobald ich das behoben habe, bekam ich eine ClassCastException. Ich nehme an, das ist ein Fehler. Ich werde ein Problem auf GitHub erstellen. – Jotschi

+0

https://github.com/vert-x3/vertx-rx/issues/123 – Jotschi

Antwort

0

bei der Dokumentation der Suche von ReadStream<T> (ersten Parameter von Pump.pump) verweist auf die ReadStreamSubscriber Klasse. Was aus einem Observablen einen Readstream machen kann.

Anstelle von rs sollte ReadStreamSubscriber.asReadStream(rs, Function.identity()) in dem bereitgestellten Code funktionieren.

+0

Funktioniert immer noch nicht und das scheint ziemlich umständlich, da die asReadStream-Methode einen io.vertx.core.streams.ReadStream zurückgibt, der nicht sein wird akzeptiert von Pump.pump. Der zurückgegebene Lese-Stream müsste erneut verpackt werden (RxJava2-Wrapper), um der Methode zu entsprechen. – Jotschi

+0

Die ReadStreamSubscriber.asReadStream-Methode funktioniert, wenn sie mit der regulären io.vertxt.core.streams.Pump kombiniert wird. – Jotschi

Verwandte Themen