2016-12-05 5 views
2

Ich versuche, einen Funken Dataset in eine vorhandene postgresql Tabelle zu schreiben (nicht die Tabellenmetadaten wie Spaltentypen ändern). Eine der Spalten dieser Tabelle ist vom Typ HStore und verursacht Probleme.Wie schreibt man in PostgreSQL hstore mit Spark-Dataset

Ich sehe die folgende Ausnahme, wenn ich die Schreib (hier die Original-Karte ist leer, das, wenn eine leere Zeichenfolge entkommen gibt) starten:

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause. 
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136) 
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419) 
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308) 
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004) 
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187) 
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212) 
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351) 
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying 

Dies ist, wie ich tue es:

def escapePgHstore[A, B](hmap: Map[A, B]) = { 
    hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",") 
} 
... 
val props = new Properties() 
props.put("user", "xxxxxxx") 
props.put("password", "xxxxxxx") 

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column")) 
    .drop("original_column") 
    .coalesce(1).write 
    .mode(org.apache.spark.sql.SaveMode.Append) 
    .option("driver", "org.postgresql.Driver") 
    .jdbc(jdbcUrl, hashedTablePartName, props) 

Wenn ich die original_column von Karte nicht entkommen [String, Long] zu String mit escapePgHstore ich die folgenden Fehler sehen:

java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint> 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292) 
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.App$class.main(App.scala:76) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

Was ist der richtige Weg, um Funken einen gültigen hstore Datentyp zu schreiben?

Antwort

2

Es stellt sich heraus, ich habe nur Postgres zu lassen versuchen, die geeignete Art von meiner Kolumne zu erraten. Durch Setzen von stringtype auf unspecified in der Verbindungszeichenfolge wie in der official documentation beschrieben.

props.put("stringtype", "unspecified") 

Jetzt funktioniert es perfekt !!

+1

Das funktionierte gut für mich! Du hast mir eine Menge Zeit gespart und das war die einzige Information, die ich zu dem Thema finden konnte. Allerdings habe ich noch einen weiteren Schlüssel gefunden: Die 'hstore'-Spalte, an die du schreibst, muss bereits existieren. Wenn der 'SaveMode', den Spark verwendet, auf" Überschreiben "gesetzt ist, hat Postgres niemals die Möglichkeit, den Text in die Spalte' hstore' zu ​​parsen. Spark teilt Postgres einfach mit, dass es sich um eine 'Text'-Spalte handelt. – mtrewartha

0

Dies ist ein pyspark Code einen Datenrahmen zu einer Postgres Tabelle für das Schreiben, die HSTORE JSON und JSONB Spalten. Im Allgemeinen müssen Sie also für komplizierte Datentypen, die in Postgres erstellt wurden und die in Spark Datareframe nicht erstellt werden können, stringtype="unspecified" in den Optionen oder in den Eigenschaften angeben, die Sie für einen Write-Datenrahmen in SQL-Funktion festlegen.

Im Folgenden ist ein Beispiel für eine Spark-Datenrahmen zu PostgreSQL Tabelle schreiben write() Funktion:

dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save() 
Verwandte Themen