Ich habe vor kurzem versucht, von Flink 1.3.2 auf 1.4.0 zu aktualisieren und ich habe einige Probleme mit org.apache.hadoop.fs.{FileSystem, Path}
nicht mehr zu importieren. Das Problem wird an zwei Stellen vorkommen:Upgrade von Flink 1.3.2 auf 1.4.0 hadoop FileSystem und Path Probleme
ParquetWriter:
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.flink.streaming.connectors.fs.Writer
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
class AvroWriter[T <: GenericRecord]() extends Writer[T] {
@transient private var writer: ParquetWriter[T] = _
@transient private var schema: Schema = _
override def write(element: T): Unit = {
schema = element.getSchema
writer.write(element)
}
override def duplicate(): AvroWriter[T] = new AvroWriter[T]()
override def close(): Unit = writer.close()
override def getPos: Long = writer.getDataSize
override def flush(): Long = writer.getDataSize
override def open(fs: FileSystem, path: Path): Unit = {
writer = AvroParquetWriter.builder[T](path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build()
}
}
CustomBucketer:
import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.ObjectInputStream
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] {
@transient var dateFormatter: SimpleDateFormat = _
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
if (dateField != null && dateFieldFormat != null) {
dateFormatter = new SimpleDateFormat(dateFieldFormat)
}
}
override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = {
val partitions = bucketOrder.map(field => {
if (field == dateField) {
field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long]))
} else {
field + "=" + element.get(field)
}
}).mkString("/")
new Path(basePath + "/" + partitions)
}
}
Ich bemerkte, dass Flink hat jetzt:
import org.apache.flink.core.fs.{FileSystem, Path}
Aber neue Path
nicht scheinen mit dem AvroParquetWriter
oder demzu arbeiten 0 Methode. Ich weiß, dass es mit Flinks FileSystem- und Hadoop-Abhängigkeiten einige Änderungen gegeben hat und ich bin mir nicht sicher, was ich importieren muss, damit mein Code wieder funktioniert.
Muss ich sogar die Hadoop-Abhängigkeiten verwenden oder gibt es jetzt verschiedene Möglichkeiten zum Schreiben und Bucketting von Parquet-Dateien nach s3?
build.sbt:
val flinkVersion = "1.4.0"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.2",
"com.fasterxml.jackson.core" % "jackson-core" % "2.9.2"
)
Ok macht Sinn Ich werde versuchen, die Abhängigkeiten aufzuspüren, die ich einschließen muss - es sei denn, Sie wissen es ohne weiteres. Ich frage mich auch, ob ich sogar die Abhängigkeiten zum Schreiben von Parkett in s3 hinzufügen muss oder gibt es andere Möglichkeiten, dies jetzt in Flink 1.4 zu tun? – moku