Ich habe dieses Flink Programm unter:Wie verwendet man die Pattern-Matching-Where-Klausel in Flink?
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val pattern = Pattern.begin[String]("start").where(jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account") == "iOS") //ERROR HERE
val patternStream = CEP.pattern(partitionedInput, pattern)
ich an der val pattern = ...
Linie eine Störung erhalten Expected IterativeCondition[String], actual: (Nothing) => Unit
sagen.
Meine dataStream
besteht aus JSON-Objekten, die ich in der keyBy
zu Schlüssel durch den Kontoschlüssel innerhalb des JSON-Objekts analysieren. Dann versuche ich ein Muster zu erstellen, aber ich erhalte einen Fehler beim Erstellen des Musters.
importieren Was die Funktion, die Sie ist anzuwenden in 'wo (jsonString => ...)'? –
@DawidWysakowicz Ich habe den obigen Code der Funktion, die ich in der wo anwenden möchte, aktualisiert. – CapturedTree