2017-04-16 2 views
2

Ich bin völlig neu in RxJava und reaktive Programmierung. Ich habe eine Aufgabe, wo ich die Datei lesen und auf Observable speichern muss. Ich habe versucht, eine Callable mit BufferedReader innerhalb machen und Observable.fromCallable() verwenden, aber es hat nicht viel funktioniert.RxJava. Lesen Sie die Datei zu beobachtbaren

Können Sie mir bitte zeigen, wie kann ich das tun?

Ich benutze RxJava 2.0.

Antwort

0

Sie können etwas ähnliches wie this implementation tun.

Und dann wird diese Klasse here auf diese Weise verwendet:

public static Observable<byte[]> from(InputStream is, int size) { 
    return Observable.create(new OnSubscribeInputStream(is, size)); 
} 

Schließlich können Sie es verwenden:

Observable<byte[]> chunks = Bytes.from(file, chunkSize); 

Weitere Details here.

+0

Aber was, wenn ich weiß nicht, chunksize und müssen Dateizeile lesen nach Linie? –

+1

Dann sollten Sie diesen Code anpassen, indem Sie Ihren 'InputStream' in einen' BufferedReader' umwandeln und dann 'readLine' verwenden. – GVillani82

2

Eine grundlegende Lösung, wo ich eine geschachtelte Klasse verwenden FileObservableSource die Daten zu erzeugen und dann die Gründung des bis ein Beobachter beobachtbare verschieben abonniert:

import io.reactivex.Observable; 
import io.reactivex.ObservableSource; 
import io.reactivex.Observer; 

import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 

public class StackOverflow { 

    public static void main(String[] args) { 
     final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml")); 
     observable.subscribe(
       line -> System.out.println("next line: " + line), 
         Throwable::printStackTrace, 
         () -> System.out.println("finished") 
       ); 
    } 

    static class FileObservableSource implements ObservableSource<String> { 

     private final String filename; 

     FileObservableSource(String filename) { 
      this.filename = filename; 
     } 

     @Override 
     public void subscribe(Observer<? super String> observer) { 
      try { 
       Files.lines(Paths.get(filename)).forEach(observer::onNext); 
       observer.onComplete(); 
      } catch (IOException e) { 
       observer.onError(e); 
      } 
     } 
    } 
} 
+0

Wie würde sich das unter Gegendruck verhalten? Nehmen wir an, ich habe einen Abonnenten, der auf einem anderen Thread beobachtet, der jede Zeile langsamer verarbeitet als gelesen. – 0cd

+0

würde es sich schlecht verhalten. keine Gegendruckbehandlung hier. Ich würde dafür die Reactive-Stream-Implementierung verwenden, aber diese Frage handelte von einem grundlegenden Beispiel. –

Verwandte Themen