Ich versuche, einen Funken Dataset in eine vorhandene postgresql Tabelle zu schreiben (nicht die Tabellenmetadaten wie Spaltentypen ändern). Eine der Spalten dieser Tabelle ist vom Typ HStore und verursacht Probleme.Wie schreibt man in PostgreSQL hstore mit Spark-Dataset
Ich sehe die folgende Ausnahme, wenn ich die Schreib (hier die Original-Karte ist leer, das, wenn eine leere Zeichenfolge entkommen gibt) starten:
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419)
at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying
Dies ist, wie ich tue es:
def escapePgHstore[A, B](hmap: Map[A, B]) = {
hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")
ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
.drop("original_column")
.coalesce(1).write
.mode(org.apache.spark.sql.SaveMode.Append)
.option("driver", "org.postgresql.Driver")
.jdbc(jdbcUrl, hashedTablePartName, props)
Wenn ich die original_column
von Karte nicht entkommen [String, Long] zu String mit escapePgHstore
ich die folgenden Fehler sehen:
java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Was ist der richtige Weg, um Funken einen gültigen hstore Datentyp zu schreiben?
Das funktionierte gut für mich! Du hast mir eine Menge Zeit gespart und das war die einzige Information, die ich zu dem Thema finden konnte. Allerdings habe ich noch einen weiteren Schlüssel gefunden: Die 'hstore'-Spalte, an die du schreibst, muss bereits existieren. Wenn der 'SaveMode', den Spark verwendet, auf" Überschreiben "gesetzt ist, hat Postgres niemals die Möglichkeit, den Text in die Spalte' hstore' zu parsen. Spark teilt Postgres einfach mit, dass es sich um eine 'Text'-Spalte handelt. – mtrewartha