Eine grundlegende Lösung, wo ich eine geschachtelte Klasse verwenden FileObservableSource
die Daten zu erzeugen und dann die Gründung des bis ein Beobachter beobachtbare verschieben abonniert:
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class StackOverflow {
public static void main(String[] args) {
final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
observable.subscribe(
line -> System.out.println("next line: " + line),
Throwable::printStackTrace,
() -> System.out.println("finished")
);
}
static class FileObservableSource implements ObservableSource<String> {
private final String filename;
FileObservableSource(String filename) {
this.filename = filename;
}
@Override
public void subscribe(Observer<? super String> observer) {
try {
Files.lines(Paths.get(filename)).forEach(observer::onNext);
observer.onComplete();
} catch (IOException e) {
observer.onError(e);
}
}
}
}
Aber was, wenn ich weiß nicht, chunksize und müssen Dateizeile lesen nach Linie? –
Dann sollten Sie diesen Code anpassen, indem Sie Ihren 'InputStream' in einen' BufferedReader' umwandeln und dann 'readLine' verwenden. – GVillani82