2017-03-01 6 views
0

Ich benutzte eine alte Version von Flink. Ich rüste auf 1.2.0 auf und ich habe einige Probleme mit Filtern.Wie man einen einfachen Filter mit Flink in Scala anwendet

Ich habe ein Datastream von Log, die ganz gut funktioniert:

val logs: DataStream[Log] = env.addSource(new LogSource(
     data, delay, factor)) 

// DISPLAY TUPLE IN CONSOLE 
logs.print() 

// EXECUTE SCRIPT 
env.execute("stream") 

habe ich natürlich die Dokumentation lesen, die zeigt:

dataStream.filter { _ != 0 } 

ich ein paar Dinge wie diese versucht:

val cleanLogs = logs.filter { _.isComplete } 

Aber ich habe den folgenden Fehler:

Typenkonflikt, erwartet: filter [Log], aktuell: (Alle) => Ein

So sehe ich nicht die Verbindung zwischen der Dokumentation und diesen Fehler. Irgendwelche Hilfe? Beispiele?

Dank

+0

Was ist die Unterschrift der 'isComplete' Methode? –

+0

Es ist keine Methode, das erste Attribut von Log ist ein boolescher Wert: isComplete. Mit Flink 0.10 klappte das ganz gut, aber vielleicht ist das nicht mehr möglich? – ImbaBalboa

+1

Ich kann dein Problem nicht wirklich reproduzieren. Das einzige, was mir in den Sinn kommt, sind falsche Importe. Stellen Sie sicher, dass Sie die Scala-Versionen von 'DataStream' und' StreamExecutionEnvironment' importieren. Es ist am besten in Scala, immer 'org.apache.flink.streaming.api.scala._' zu importieren. –

Antwort

0

Das Problem war zuerst ein falscher Import von StreamExecutionEnvironment die wie filter mit Basisfunktionen zu diesem Problem führen.

Als ich dann eine alte Version von Flink benutzte, benutzte ich LocalExecutionEnvironment Klasse, die nicht mehr in Flink 1.X verfügbar ist.

Statt: StreamExecutionEnvironment.createLocalEnvironment(1)

Verwandte Themen