2017-04-24 6 views
0

Ich bin ein Noobie in PySpark (Spark 2.1.0 und Python 3.5) und ich habe ein Problem, das ich nicht bestehen konnte.GeoText und UDF in PySpark

Ich versuche GeoText in UDF zu verwenden, hier ist mein Code:

def countries(x): 
    count = GeoText(x).countries 
    w = '' 
    if not count: 
     return '' 
    else: 
     for country in count: 
      w += country 
     return w 

Und das schaffe ich ein UDF:

udfCountry=udf(countries, StringType()) 

Und dann zu verwenden, ich versuche:

df2 = df.withColumn('country',udfCountry(df2.Location)) 

Aber läuft jede SQL-Bedingung, wie zB dies:

df2.where(df2.country == 'a').show() 

Ursache dieses Stacktrace:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-194-84c9636e0170> in <module>() 
     1 #df3.select(df3.country,df3.cities,df3._Location).where(df3.country!='').take(10) 
----> 2 df4.where(df4.country == 'a').show() 

/opt/spark-2.1.0/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    316   """ 
    317   if isinstance(truncate, bool) and truncate: 
--> 318    print(self._jdf.showString(n, 20)) 
    319   else: 
    320    print(self._jdf.showString(n, int(truncate))) 

/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/opt/spark-2.1.0/python/pyspark/sql/utils.py in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling o3116.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 79, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main 
    process() 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream 
    for obj in iterator: 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched 
    for item in iterator: 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<ipython-input-191-27c9af37cc7f>", line 2, in countries 
    File "/opt/anaconda3/lib/python3.5/site-packages/geotext/geotext.py", line 107, in __init__ 
    candidates = re.findall(city_regex, text) 
    File "/opt/anaconda3/lib/python3.5/re.py", line 213, in findall 
    return _compile(pattern, flags).findall(string) 
TypeError: expected string or bytes-like object 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) 
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2112) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2327) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:248) 
    at sun.reflect.GeneratedMethodAccessor65.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main 
    process() 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream 
    for obj in iterator: 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched 
    for item in iterator: 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/opt/spark-2.1.0/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<ipython-input-191-27c9af37cc7f>", line 2, in countries 
    File "/opt/anaconda3/lib/python3.5/site-packages/geotext/geotext.py", line 107, in __init__ 
    candidates = re.findall(city_regex, text) 
    File "/opt/anaconda3/lib/python3.5/re.py", line 213, in findall 
    return _compile(pattern, flags).findall(string) 
TypeError: expected string or bytes-like object 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) 
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Was ich gezeigt habe, dass meine UDF Funktion wie folgt zu ändern:

def countries(x): 
     #count = GeoText(x).countries 
     count = 'a' 
     w = '' 
     if not count: 
      return '' 
     else: 
      for country in count: 
       w += country 
      return w 

Ursache es funktioniert.

Kann mir jemand erklären, warum das passiert? Und was kann ich tun, damit es funktioniert?

EDIT

Komisch - wenn ich meine Datenrahmen speichern wieder auf Parkett und dann lesen, ist alles richtig funktioniert ...

+0

was ist Ihre Eingabe. Sieht aus wie 'candidates = re.findall (city_regex, text)' im Geotext fehlschlägt. – Pushkr

+0

Hmm Ich bin mir nicht sicher, was meinst du - aber das ist Location von Stackoverflow Dump – PastorPL

Antwort

0

Schließlich habe ich gezeigt, dass einige der Eingangs Keine ist. Einige "Nullcheck" Probleme zu lösen und alles begann wie ein Charme zu arbeiten.

Verwandte Themen