2017-09-19 7 views
0

Ich muss den JSON-zu-ORC-Konvertierungsprozess automatisieren. Ich konnte fast mit dem ORC-tools-Paket von Apache dorthin gelangen, außer dass JsonReader nicht mit Map type und throws an exception umgehen kann. Das Folgende funktioniert, aber nicht mit dem Kartentyp.Java: JSON aus einer Datei lesen, in ORC konvertieren und in eine Datei schreiben

Path hadoopInputPath = new Path(input); 
    try (RecordReader recordReader = new JsonReader(hadoopInputPath, schema, hadoopConf)) { // throws when schema contains Map type 
     try (Writer writer = OrcFile.createWriter(new Path(output), OrcFile.writerOptions(hadoopConf).setSchema(schema))) { 
      VectorizedRowBatch batch = schema.createRowBatch(); 
      while (recordReader.nextBatch(batch)) { 
       writer.addRowBatch(batch); 
      } 
     } 
    } 

So begann ich in der Suche unter Verwendung von Hive Klassen für Json-to-ORC Umwandlung, die einen zusätzlichen Vorteil hat, dass ich in Zukunft in anderen Formaten, wie AVRO mit geringfügigen Änderungen am Code umwandeln kann. Ich bin mir jedoch nicht sicher, was der beste Weg ist, dies mit Hive-Klassen zu tun. Insbesondere ist es nicht klar, wie HCatRecord in eine Datei geschrieben wird, wie unten gezeigt.

HCatRecordSerDe hCatRecordSerDe = new HCatRecordSerDe(); 
    SerDeUtils.initializeSerDe(hCatRecordSerDe, conf, tblProps, null); 

    OrcSerde orcSerde = new OrcSerde(); 
    SerDeUtils.initializeSerDe(orcSerde, conf, tblProps, null); 

    Writable orcOut = orcSerde.serialize(hCatRecord, hCatRecordSerDe.getObjectInspector()); 
    assertNotNull(orcOut); 

    InputStream input = getClass().getClassLoader().getResourceAsStream("test.json.snappy"); 
    SnappyCodec compressionCodec = new SnappyCodec(); 
    try (CompressionInputStream inputStream = compressionCodec.createInputStream(input)) { 
     LineReader lineReader = new LineReader(new InputStreamReader(inputStream, Charsets.UTF_8)); 
     String jsonLine = null; 
     while ((jsonLine = lineReader.readLine()) != null) { 
      Writable jsonWritable = new Text(jsonLine); 
      DefaultHCatRecord hCatRecord = (DefaultHCatRecord) jsonSerDe.deserialize(jsonWritable); 
      // TODO: Write ORC to file???? 
     } 
    } 

Alle Ideen, wie Sie den Code oben oder einfachere Möglichkeiten zu tun, JSON-to-ORC abzuschließen sehr geschätzt wird. Hier

+0

Ehrlich, würde ich Spark/Pig/tatsächlichen HiveQL verwenden, um dies zu tun –

+0

Ist eine Map nicht wie ein normales JSON-Objekt? Also ein Struct to Hive? –

+0

cricket_007, diese JSON-zu-ORC-Konvertierung muss als Teil eines Webdienstes ausgeführt werden, der bereits JSON-Daten empfängt und andere Dinge damit erledigt, z. B. die Archivierung. Daher ist es nicht wirklich eine Option für uns, diese Konvertierung mit Spark/Hive-Jobs durchzuführen (obwohl wir sie an anderen Stellen für diese Konvertierung verwendet haben), da es erforderlich sein wird, JSON-Daten erneut an diese Jobs zu senden. – alecswan

Antwort

0

ist, was ich am Ende tun mit Spark-Bibliotheken pro cricket_007 Vorschlag:

Maven Abhängigkeit (mit einigen Ausnahmen Maven-Duplikat-Finder-Plugin glücklich zu halten):

<properties> 
     <dep.jackson.version>2.7.9</dep.jackson.version> 
     <spark.version>2.2.0</spark.version> 
     <scala.binary.version>2.11</scala.binary.version> 
    </properties> 

    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> 
     <version>${dep.jackson.version}</version> 
     <exclusions> 
      <exclusion> 
       <groupId>com.google.guava</groupId> 
       <artifactId>guava</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-hive_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <exclusions> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>apache-log4j-extras</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>org.apache.hadoop</groupId> 
       <artifactId>hadoop-client</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>net.java.dev.jets3t</groupId> 
       <artifactId>jets3t</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>com.google.code.findbugs</groupId> 
       <artifactId>jsr305</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>stax</groupId> 
       <artifactId>stax-api</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>org.objenesis</groupId> 
       <artifactId>objenesis</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

Java-Code Synopse :

SparkConf sparkConf = new SparkConf() 
    .setAppName("Converter Service") 
    .setMaster("local[*]"); 

SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate(); 

// read input data 
Dataset<Row> events = sparkSession.read() 
    .format("json") 
    .schema(inputConfig.getSchema()) // StructType describing input schema 
    .load(inputFile.getPath()); 

// write data out 
DataFrameWriter<Row> frameWriter = events 
    .selectExpr(
     // useful if you want to change the schema before writing it to ORC, e.g. ["`col1` as `FirstName`", "`col2` as `LastName`"] 
     JavaConversions.asScalaBuffer(outputSchema.getColumns())) 
    .write() 
    .options(ImmutableMap.of("compression", "zlib")) 
    .format("orc") 
    .save(outputUri.getPath()); 

Hoffe das hilft jemandem, um anzufangen.

Verwandte Themen