haben Anwendungsfall, in dem wir Dateien von S3 lesen möchten, die JSON haben. Basierend auf einem bestimmten JSON-Knotenwert möchten wir dann die Daten gruppieren und in S3 schreiben.Partitionierung durch Spalte in Apache Spark zu S3
Ich kann die Daten lesen, aber nicht in der Lage, ein gutes Beispiel zu finden, wie die Daten basierend auf JSON-Schlüssel partitionieren und dann auf S3 hochladen. Kann mir jemand ein Beispiel geben oder auf ein Tutorial hinweisen, das mir bei diesem Anwendungsfall helfen kann?
Ich habe das Schema meiner Daten bekam, nachdem die Datenrahmen zu erstellen:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
ich auf dem zufälligen Hash auf der customerId Spalte die Daten basierend partitionieren möchten. Aber wenn ich dies tun:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
Es Fehler geben:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
Bitte lassen Sie mich wissen, dass ich customerId Spalte zugreifen können.