Ich benutzte eine alte Version von Flink. Ich rüste auf 1.2.0 auf und ich habe einige Probleme mit Filtern.Wie man einen einfachen Filter mit Flink in Scala anwendet
Ich habe ein Datastream von Log, die ganz gut funktioniert:
val logs: DataStream[Log] = env.addSource(new LogSource(
data, delay, factor))
// DISPLAY TUPLE IN CONSOLE
logs.print()
// EXECUTE SCRIPT
env.execute("stream")
habe ich natürlich die Dokumentation lesen, die zeigt:
dataStream.filter { _ != 0 }
ich ein paar Dinge wie diese versucht:
val cleanLogs = logs.filter { _.isComplete }
Aber ich habe den folgenden Fehler:
Typenkonflikt, erwartet: filter [Log], aktuell: (Alle) => Ein
So sehe ich nicht die Verbindung zwischen der Dokumentation und diesen Fehler. Irgendwelche Hilfe? Beispiele?
Dank
Was ist die Unterschrift der 'isComplete' Methode? –
Es ist keine Methode, das erste Attribut von Log ist ein boolescher Wert: isComplete. Mit Flink 0.10 klappte das ganz gut, aber vielleicht ist das nicht mehr möglich? – ImbaBalboa
Ich kann dein Problem nicht wirklich reproduzieren. Das einzige, was mir in den Sinn kommt, sind falsche Importe. Stellen Sie sicher, dass Sie die Scala-Versionen von 'DataStream' und' StreamExecutionEnvironment' importieren. Es ist am besten in Scala, immer 'org.apache.flink.streaming.api.scala._' zu importieren. –