2016-12-18 4 views
5

Ich versuche, eine Beispielanwendung mit Apache Flink zu bauen, die den folgenden:Apache Flink - Gebrauchswerte aus einem Datenstrom, um dynamisch eine Streaming-Datenquelle zu erstellen

  1. Liest einen Strom von Aktiensymbolen (zB "CSCO", "FB") aus einer Kafka-Warteschlange.
  2. Für jedes Symbol führt eine Echtzeit-Suche der aktuellen Preise und Streams die Werte für die Downstream-Verarbeitung.

* Update auf Original-Beitrag *

zog ich die Kartenfunktion in eine separate Klasse und nicht bekommen, die Laufzeitfehlermeldung "Die Umsetzung des MapFunction nicht serialisierbar ist nicht mehr. Das Objekt enthält wahrscheinlich nicht serialisierbare Felder oder verweist auf diese. ".

Das Problem, mit dem ich jetzt konfrontiert bin, ist, dass das Kafka-Thema "stockprices" ich versuche, die Preise zu schreiben, sie nicht erhält. Ich versuche Ärger zu machen und werde alle Updates veröffentlichen.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "stocks"); 

     DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

     DataStream<String> stockPrice = 
      streamOfStockSymbols 
      //get unique keys 
      .keyBy(new KeySelector<String, String>() { 
       @Override 
       public String getKey(String trend) throws Exception { 
        return trend; 
       } 
       }) 
      //collect events over a window 
      .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
      //return the last event from the window...all elements are the same "Symbol" 
      .apply(new WindowFunction<String, String, String, TimeWindow>() { 
       @Override 
       public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
        out.collect(input.iterator().next().toString()); 
       } 
      }) 
      .map(new StockSymbolToPriceMapFunction()); 

     streamExecEnv.execute("Retrieve Stock Prices"); 
    } 
} 

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { 
    @Override 
    public String map(String stockSymbol) throws Exception { 
     final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
     streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 
     System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); 

     DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); 
     stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); 

     return "100000"; 
    } 

    private static class CustomKeySelector implements KeySelector<String, String> { 
     @Override 
     public String getKey(String arg0) throws Exception { 
      return arg0.trim(); 
     } 
    } 
} 


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
      stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
      isRunning = true; 
    } 


    @Override 
    public void cancel() { 
      isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
        throws Exception { 
      String stockPrice = "0"; 
      while (isRunning) { 
       //TODO: query Google Finance API 
       stockPrice = Integer.toString((new Random()).nextInt(100)+1); 
       ctx.collect(stockPrice); 
       Thread.sleep(10000); 
      } 
    } 
} 

Antwort

4

StreamExecutionEnvironment nicht innerhalb von Betreibern eines Streaming-Anwendung verwendet werden sollen eingezogen. Nicht beabsichtigt bedeutet, dies wird nicht getestet und gefördert. Es könnte funktionieren und etwas tun, wird sich aber höchstwahrscheinlich nicht gut verhalten und wahrscheinlich Ihre Anwendung beenden.

Die StockSymbolToPriceMapFunction in Ihrem Programm spezifiziert für jeden eingehenden Datensatz eine völlig neue und unabhängige neue Streaming-Anwendung. Da Sie jedoch streamExecEnv.execute() nicht aufrufen, werden die Programme nicht gestartet und die map-Methode kehrt zurück, ohne etwas zu tun.

Wenn Sie würde Anruf streamExecEnv.execute(), die Funktion einen neuen lokalen Flink Cluster in der Arbeiter JVM und starten Sie die Anwendung auf diesem lokalen Flink Cluster beginnen würde. Die lokale Flink-Instanz benötigt einen großen Teil des Heap-Speicherplatzes und nachdem einige Cluster gestartet wurden, wird der Worker wahrscheinlich aufgrund eines OutOfMemoryError sterben, was nicht das ist, was Sie passieren möchten.

+0

Ist es überhaupt möglich, Streams als Antwort auf eingehende Daten dynamisch zu erstellen? –

+0

Sie können eine 'FlatMapFunction' implementieren, die Daten basierend auf eingehenden Datensätzen dynamisch liest und ausgibt. Zum Beispiel, wenn Sie einen Stream mit Dateinamen haben, eine 'FlatMapFunction', könnten Sie diese Dateien öffnen und ihre Daten ausgeben. Die Ausgabetypen aller Datensätze müssen jedoch identisch sein. Es könnte auch schwierig sein, die Semantik der Ereigniszeitverarbeitung richtig zu machen, aber das ist eher ein generelles Problem von dynamisch hinzugefügten Quellen. –

+0

@FabianHueske Ich löse einen ähnlichen Anwendungsfall. Wenn ich also FlatMapFunction verwenden muss, müssen wir die Datei mit normalen Datei-APIs von scala/Java lesen und nicht mit Flink's readTextFile. Grund dafür ist, dass wir StreamExecutionEnvironment innerhalb von flatMap nicht verwenden können. Ist mein Verständnis richtig? –