2017-12-27 5 views
2

Ich habe vor kurzem versucht, von Flink 1.3.2 auf 1.4.0 zu aktualisieren und ich habe einige Probleme mit org.apache.hadoop.fs.{FileSystem, Path} nicht mehr zu importieren. Das Problem wird an zwei Stellen vorkommen:Upgrade von Flink 1.3.2 auf 1.4.0 hadoop FileSystem und Path Probleme

ParquetWriter:

import org.apache.avro.Schema 
import org.apache.avro.generic.GenericRecord 
import org.apache.hadoop.fs.{FileSystem, Path} 
import org.apache.flink.streaming.connectors.fs.Writer 
import org.apache.parquet.avro.AvroParquetWriter 
import org.apache.parquet.hadoop.ParquetWriter 
import org.apache.parquet.hadoop.metadata.CompressionCodecName 

class AvroWriter[T <: GenericRecord]() extends Writer[T] { 

    @transient private var writer: ParquetWriter[T] = _ 
    @transient private var schema: Schema = _ 

    override def write(element: T): Unit = { 
    schema = element.getSchema 
    writer.write(element) 
    } 

    override def duplicate(): AvroWriter[T] = new AvroWriter[T]() 

    override def close(): Unit = writer.close() 

    override def getPos: Long = writer.getDataSize 

    override def flush(): Long = writer.getDataSize 

    override def open(fs: FileSystem, path: Path): Unit = { 
    writer = AvroParquetWriter.builder[T](path) 
     .withSchema(schema) 
     .withCompressionCodec(CompressionCodecName.SNAPPY) 
     .build() 
    } 

} 

CustomBucketer:

import org.apache.flink.streaming.connectors.fs.bucketing.Bucketer 
import org.apache.flink.streaming.connectors.fs.Clock 
import org.apache.hadoop.fs.{FileSystem, Path} 
import java.io.ObjectInputStream 
import java.text.SimpleDateFormat 
import java.util.Date 

import org.apache.avro.generic.GenericRecord 

import scala.reflect.ClassTag 

class RecordFieldBucketer[T <: GenericRecord: ClassTag](dateField: String = null, dateFieldFormat: String = null, bucketOrder: Seq[String]) extends Bucketer[T] { 

    @transient var dateFormatter: SimpleDateFormat = _ 

    private def readObject(in: ObjectInputStream): Unit = { 
    in.defaultReadObject() 
    if (dateField != null && dateFieldFormat != null) { 
     dateFormatter = new SimpleDateFormat(dateFieldFormat) 
    } 
    } 

    override def getBucketPath(clock: Clock, basePath: Path, element: T): Path = { 
    val partitions = bucketOrder.map(field => { 
     if (field == dateField) { 
     field + "=" + dateFormatter.format(new Date(element.get(field).asInstanceOf[Long])) 
     } else { 
     field + "=" + element.get(field) 
     } 
    }).mkString("/") 
    new Path(basePath + "/" + partitions) 
    } 

} 

Ich bemerkte, dass Flink hat jetzt:

import org.apache.flink.core.fs.{FileSystem, Path} 

Aber neue Path nicht scheinen mit dem AvroParquetWriter oder demzu arbeiten 0 Methode. Ich weiß, dass es mit Flinks FileSystem- und Hadoop-Abhängigkeiten einige Änderungen gegeben hat und ich bin mir nicht sicher, was ich importieren muss, damit mein Code wieder funktioniert.

Muss ich sogar die Hadoop-Abhängigkeiten verwenden oder gibt es jetzt verschiedene Möglichkeiten zum Schreiben und Bucketting von Parquet-Dateien nach s3?

build.sbt:

val flinkVersion = "1.4.0" 

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, 
    "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, 
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-core" % flinkVersion, 
    "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, 
    "org.apache.kafka" %% "kafka" % "0.10.0.1", 
    "org.apache.avro" % "avro" % "1.7.7", 
    "org.apache.parquet" % "parquet-hadoop" % "1.8.1", 
    "org.apache.parquet" % "parquet-avro" % "1.8.1", 
    "io.confluent" % "kafka-avro-serializer" % "3.2.2", 
    "com.fasterxml.jackson.core" % "jackson-core" % "2.9.2" 
) 

Antwort

0

Die erforderlichen Klassen org.apache.hadoop.fs.{FileSystem, Path} befinden sich im Projekt hadoop-commons.

1

Aufbau ein "Hadoop-Free-Flink" war ein wesentliches Merkmal der 1.4-Release. Alles, was Sie tun müssen, ist die hadoop Abhängigkeiten zu Ihrem Classpath enthalten oder unter Angabe der changelogs:

... Das bedeutet auch, dass in Fällen, in denen Sie Anschlüsse auf HDFS verwendet werden, wie die BucketingSink oder RollingSink, Sie Stellen Sie nun sicher, dass Sie entweder eine Flink-Distribution mit gebündelten Hadoop-Abhängigkeiten verwenden oder sicherstellen, dass Hadoop-Abhängigkeiten beim Erstellen einer JAR-Datei für Ihre Anwendung berücksichtigt werden.

+0

Ok macht Sinn Ich werde versuchen, die Abhängigkeiten aufzuspüren, die ich einschließen muss - es sei denn, Sie wissen es ohne weiteres. Ich frage mich auch, ob ich sogar die Abhängigkeiten zum Schreiben von Parkett in s3 hinzufügen muss oder gibt es andere Möglichkeiten, dies jetzt in Flink 1.4 zu tun? – moku

Verwandte Themen