2017-10-11 2 views
2

haben Anwendungsfall, in dem wir Dateien von S3 lesen möchten, die JSON haben. Basierend auf einem bestimmten JSON-Knotenwert möchten wir dann die Daten gruppieren und in S3 schreiben.Partitionierung durch Spalte in Apache Spark zu S3

Ich kann die Daten lesen, aber nicht in der Lage, ein gutes Beispiel zu finden, wie die Daten basierend auf JSON-Schlüssel partitionieren und dann auf S3 hochladen. Kann mir jemand ein Beispiel geben oder auf ein Tutorial hinweisen, das mir bei diesem Anwendungsfall helfen kann?

Ich habe das Schema meiner Daten bekam, nachdem die Datenrahmen zu erstellen:

root 
|-- customer: struct (nullable = true) 
| |-- customerId: string (nullable = true) 
|-- experiment: string (nullable = true) 
|-- expiryTime: long (nullable = true) 
|-- partitionKey: string (nullable = true) 
|-- programId: string (nullable = true) 
|-- score: double (nullable = true) 
|-- startTime: long (nullable = true) 
|-- targetSets: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- featured: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- data: struct (nullable = true) 
| | | | | |-- asinId: string (nullable = true) 
| | | | |-- pk: string (nullable = true) 
| | | | |-- type: string (nullable = true) 
| | |-- reason: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- recommended: array (nullable = true) 
| | | |-- element: string (containsNull = true) 

ich auf dem zufälligen Hash auf der customerId Spalte die Daten basierend partitionieren möchten. Aber wenn ich dies tun:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save"); 

Es Fehler geben:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true)); 

Bitte lassen Sie mich wissen, dass ich customerId Spalte zugreifen können.

Antwort

3

Lassen Sie uns Beispiel Dataset sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"} 
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"} 
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"} 
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"} 

Jetzt ist es mit Spark-Hacking beginnen nehmen

val jsonDf = spark.read 
    .format("json") 
    .load("path/of/sample.json") 

jsonDf.show() 

+---------+-------+-----+-----+ 
|  CITY|CUST_ID|STATE| ZIP| 
+---------+-------+-----+-----+ 
| San Jose| 115734| CA|95106| 
|Allentown| 115728| PA|18101| 
|Allentown| 115730| PA|18101| 
|San Mateo| 114728| CA|94401| 
| Somerset| 114726| NJ| 8873| 
+---------+-------+-----+-----+ 

Dann Partition-Datensatz durch Spalte "ZIP" und schreiben zu S3

jsonDf.write 
    .partitionBy("ZIP") 
    .save("s3/bucket/location/to/save") 
    // one liner athentication to s3 
    //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save") 

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

Edit: Auflösung: Partition Spalte customerId nicht in Schema (per Kommentar) gefunden

customerId existiert innerhalb customer Struktur, so versuchen die customerId dann Partition tun zu extrahieren.

df.withColumn("customerId", $"customer.customerId") 
    .drop("customer") 
    .write.partitionBy("customerId") 
    .save("s3/bucket/location/to/save") 
Verwandte Themen