Ich versuche, eine Beispielanwendung mit Apache Flink zu bauen, die den folgenden:Apache Flink - Gebrauchswerte aus einem Datenstrom, um dynamisch eine Streaming-Datenquelle zu erstellen
- Liest einen Strom von Aktiensymbolen (zB "CSCO", "FB") aus einer Kafka-Warteschlange.
- Für jedes Symbol führt eine Echtzeit-Suche der aktuellen Preise und Streams die Werte für die Downstream-Verarbeitung.
* Update auf Original-Beitrag *
zog ich die Kartenfunktion in eine separate Klasse und nicht bekommen, die Laufzeitfehlermeldung "Die Umsetzung des MapFunction nicht serialisierbar ist nicht mehr. Das Objekt enthält wahrscheinlich nicht serialisierbare Felder oder verweist auf diese. ".
Das Problem, mit dem ich jetzt konfrontiert bin, ist, dass das Kafka-Thema "stockprices" ich versuche, die Preise zu schreiben, sie nicht erhält. Ich versuche Ärger zu machen und werde alle Updates veröffentlichen.
public class RetrieveStockPrices {
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "stocks");
DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties));
DataStream<String> stockPrice =
streamOfStockSymbols
//get unique keys
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String trend) throws Exception {
return trend;
}
})
//collect events over a window
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
//return the last event from the window...all elements are the same "Symbol"
.apply(new WindowFunction<String, String, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception {
out.collect(input.iterator().next().toString());
}
})
.map(new StockSymbolToPriceMapFunction());
streamExecEnv.execute("Retrieve Stock Prices");
}
}
public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
@Override
public String map(String stockSymbol) throws Exception {
final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);
DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));
return "100000";
}
private static class CustomKeySelector implements KeySelector<String, String> {
@Override
public String getKey(String arg0) throws Exception {
return arg0.trim();
}
}
}
public class LookupStockPrice extends RichSourceFunction<String> {
public String stockSymbol = null;
public boolean isRunning = true;
public LookupStockPrice(String inSymbol) {
stockSymbol = inSymbol;
}
@Override
public void open(Configuration parameters) throws Exception {
isRunning = true;
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public void run(SourceFunction.SourceContext<String> ctx)
throws Exception {
String stockPrice = "0";
while (isRunning) {
//TODO: query Google Finance API
stockPrice = Integer.toString((new Random()).nextInt(100)+1);
ctx.collect(stockPrice);
Thread.sleep(10000);
}
}
}
Ist es überhaupt möglich, Streams als Antwort auf eingehende Daten dynamisch zu erstellen? –
Sie können eine 'FlatMapFunction' implementieren, die Daten basierend auf eingehenden Datensätzen dynamisch liest und ausgibt. Zum Beispiel, wenn Sie einen Stream mit Dateinamen haben, eine 'FlatMapFunction', könnten Sie diese Dateien öffnen und ihre Daten ausgeben. Die Ausgabetypen aller Datensätze müssen jedoch identisch sein. Es könnte auch schwierig sein, die Semantik der Ereigniszeitverarbeitung richtig zu machen, aber das ist eher ein generelles Problem von dynamisch hinzugefügten Quellen. –
@FabianHueske Ich löse einen ähnlichen Anwendungsfall. Wenn ich also FlatMapFunction verwenden muss, müssen wir die Datei mit normalen Datei-APIs von scala/Java lesen und nicht mit Flink's readTextFile. Grund dafür ist, dass wir StreamExecutionEnvironment innerhalb von flatMap nicht verwenden können. Ist mein Verständnis richtig? –