Ich bin ein Noobie in PySpark (Spark 2.1.0 und Python 3.5) und ich habe ein Problem, das ich nicht bestehen konnte.GeoText und UDF in PySpark
Ich versuche GeoText in UDF zu verwenden, hier ist mein Code:
def countries(x):
count = GeoText(x).countries
w = ''
if not count:
return ''
else:
for country in count:
w += country
return w
Und das schaffe ich ein UDF:
udfCountry=udf(countries, StringType())
Und dann zu verwenden, ich versuche:
df2 = df.withColumn('country',udfCountry(df2.Location))
Aber läuft jede SQL-Bedingung, wie zB dies:
df2.where(df2.country == 'a').show()
Ursache dieses Stacktrace:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-194-84c9636e0170> in <module>()
1 #df3.select(df3.country,df3.cities,df3._Location).where(df3.country!='').take(10)
----> 2 df4.where(df4.country == 'a').show()
/opt/spark-2.1.0/python/pyspark/sql/dataframe.py in show(self, n, truncate)
316 """
317 if isinstance(truncate, bool) and truncate:
--> 318 print(self._jdf.showString(n, 20))
319 else:
320 print(self._jdf.showString(n, int(truncate)))
/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/spark-2.1.0/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o3116.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 79, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
for obj in iterator:
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
for item in iterator:
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-191-27c9af37cc7f>", line 2, in countries
File "/opt/anaconda3/lib/python3.5/site-packages/geotext/geotext.py", line 107, in __init__
candidates = re.findall(city_regex, text)
File "/opt/anaconda3/lib/python3.5/re.py", line 213, in findall
return _compile(pattern, flags).findall(string)
TypeError: expected string or bytes-like object
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
for obj in iterator:
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
for item in iterator:
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
mapper = lambda a: udf(*a)
File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-191-27c9af37cc7f>", line 2, in countries
File "/opt/anaconda3/lib/python3.5/site-packages/geotext/geotext.py", line 107, in __init__
candidates = re.findall(city_regex, text)
File "/opt/anaconda3/lib/python3.5/re.py", line 213, in findall
return _compile(pattern, flags).findall(string)
TypeError: expected string or bytes-like object
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Was ich gezeigt habe, dass meine UDF Funktion wie folgt zu ändern:
def countries(x):
#count = GeoText(x).countries
count = 'a'
w = ''
if not count:
return ''
else:
for country in count:
w += country
return w
Ursache es funktioniert.
Kann mir jemand erklären, warum das passiert? Und was kann ich tun, damit es funktioniert?
EDIT
Komisch - wenn ich meine Datenrahmen speichern wieder auf Parkett und dann lesen, ist alles richtig funktioniert ...
was ist Ihre Eingabe. Sieht aus wie 'candidates = re.findall (city_regex, text)' im Geotext fehlschlägt. – Pushkr
Hmm Ich bin mir nicht sicher, was meinst du - aber das ist Location von Stackoverflow Dump – PastorPL