Ich möchte alle emittierten Puffer der rs Observable in eine Datei mit einem Vert.x 3.5.0 Pump (RxJava2 Variante) streamen. Das folgende Beispiel funktioniert irgendwie nicht und ich weiß nicht warum.Wie schreibt man Observable <Buffer> in Datei mit einer Vert.x Pumpe?
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
import io.reactivex.Observable;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.streams.Pump;
public class PumpTest {
@Test
public void testPump() throws IOException {
Vertx vertx = Vertx.vertx();
File file = new File("target/test");
file.delete();
Observable<Buffer> rs = Observable.just(Buffer.buffer("test123"));
vertx.fileSystem().rxOpen(file.getAbsolutePath(), new OpenOptions()).map(f -> {
Pump pump = Pump.pump(rs, f);
pump.start();
return f;
}).map(AsyncFile::flush).subscribe(AsyncFile::close);
assertEquals("test123", FileUtils.readFileToString(file));
}
}
Ich habe völlig übersehen, dass Subskription nicht blockiert ist und der Beispielcode nicht wirklich ausgeführt wurde. Sobald ich das behoben habe, bekam ich eine ClassCastException. Ich nehme an, das ist ein Fehler. Ich werde ein Problem auf GitHub erstellen. – Jotschi
https://github.com/vert-x3/vertx-rx/issues/123 – Jotschi