2017-07-05 4 views
0

Ich habe dieses Flink Programm unter:Wie verwendet man die Pattern-Matching-Where-Klausel in Flink?

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props)) 

val partitionedInput = dataStream.keyBy(jsonString => { 
    val jsonParser = new JsonParser() 
    val jsonObject = jsonParser.parse(jsonString).getAsJsonObject() 
    jsonObject.get("account") 
}) 

val pattern = Pattern.begin[String]("start").where(jsonString => 
      val jsonParser = new JsonParser() 
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject() 
      jsonObject.get("account") == "iOS") //ERROR HERE 

val patternStream = CEP.pattern(partitionedInput, pattern) 

ich an der val pattern = ... Linie eine Störung erhalten Expected IterativeCondition[String], actual: (Nothing) => Unit sagen.

Meine dataStream besteht aus JSON-Objekten, die ich in der keyBy zu Schlüssel durch den Kontoschlüssel innerhalb des JSON-Objekts analysieren. Dann versuche ich ein Muster zu erstellen, aber ich erhalte einen Fehler beim Erstellen des Musters.

+0

importieren Was die Funktion, die Sie ist anzuwenden in 'wo (jsonString => ...)'? –

+0

@DawidWysakowicz Ich habe den obigen Code der Funktion, die ich in der wo anwenden möchte, aktualisiert. – CapturedTree

Antwort

1

Stellen Sie sicher, dass Sie die richtige API verwenden. Für scala sollten Sie

import org.apache.flink.cep.scala.pattern.Pattern 

statt

import org.apache.flink.cep.pattern.Pattern 
+0

Vielen Dank! Können Sie sich bitte eine andere Frage anschauen, die ich diesbezüglich hatte ?: https://stackoverflow.com/questions/44965109/nothing-is-bing-printed-out-from-a-flink-patterned-stream – CapturedTree

Verwandte Themen