2017-08-24 1 views
0

Im Folgenden ist der Extrakt der Scala-Code geschrieben, um Praquet-Dateien zu extrahieren und das Schema und die ersten paar Datensätze aus der Parquet-Datei zu drucken.Drucken Sie Parkett Schema mit Spark Streaming

Aber nichts wird gedruckt.

  val batchDuration = 2 
    val inputDir = "file:///home/samplefiles" 

    val conf = new SparkConf().setAppName("gpParquetStreaming").setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    sc.hadoopConfiguration.set("spark.streaming.fileStream.minRememberDuration", "600000") 
    val ssc = new StreamingContext(sc, Seconds(batchDuration)) 

    ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.parquet.avro.AvroReadSupport") 

    val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](inputDir, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration) 

    val dataStream = stream.transform(rdd => rdd.map(tuple => tuple._2)).persist 
    val countData = dataStream.count 

    dataStream.transform(rdd => rdd.map(record => record.toString)).foreachRDD((rdd: RDD[String], time: Time) => { 
     val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
     import sqlContext.implicits._ 

     val dataDF = sqlContext.read.json(rdd) 
     dataDF.printSchema 
     dataDF.show 
    }) 

    countData.print 
    dataStream.print 
    stream.print 

    ssc.start() 
    ssc.awaitTermination() 
} 

Ich versuche, die vorhandenen Dateien auch zu lesen. Also habe ich den Parameter minRememberDuration auf eine große Zahl gesetzt. Aber immer noch hilft es nicht.

Irgendwelche Hilfe!

Danke,

EDIT: Abhängigkeiten verwendet

<dependency> 
    <groupId>org.scala-lang</groupId> 
    <artifactId>scala-library</artifactId> 
    <version>${scala.version}</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-sql_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 
<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-avro_2.11</artifactId> 
    <version>3.2.0</version> 
</dependency> 
<dependency> 
    <groupId>com.amazonaws</groupId> 
    <artifactId>aws-java-sdk</artifactId> 
    <version>1.2.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.11</artifactId> 
    <version>2.1.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-avro</artifactId> 
    <version>1.8.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-hadoop</artifactId> 
    <version>1.8.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro</artifactId> 
    <version>1.8.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-common</artifactId> 
    <version>1.8.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-mapreduce-client-core</artifactId> 
    <version>2.7.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-common</artifactId> 
    <version>2.7.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-column</artifactId> 
    <version>1.8.2</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-format</artifactId> 
    <version>2.3.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.parquet</groupId> 
    <artifactId>parquet-encoding</artifactId> 
    <version>1.8.2</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-aws</artifactId> 
    <version>2.8.0</version> 
</dependency> 

EDIT 2: Fehler:

17/08/25 02:00:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6) 
java.io.NotSerializableException: org.apache.avro.generic.GenericData$Record 
Serialization stack: 
    - object not serializable (class: org.apache.avro.generic.GenericData$Record, value: {"firstName": "Abhishek"}) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 
17/08/25 02:00:46 ERROR TaskSetManager: Task 0.0 in stage 8.0 (TID 6) had a not serializable result: org.apache.avro.generic.GenericData$Record 
+0

, was das ist sollte hinzufügen Problem stehen Sie gegenüber? –

+0

@squid Die Parkettdateien können nicht vom Standort abgerufen werden. – abhi5800

Antwort

0

Es scheint, wie Sie extends Serializable, um einige Ihrer Klassen

Verwandte Themen