Ich versuche, eine SQL-Abfrage zu analysieren, und möchte eine Funktion für jede Zeile eines Datenrahmens aufrufen. Die Funktion ist wie folgt:__getattr__ Fehler beim Aufrufen von foreach für Datenrahmen in pyspark
def updateParser(df):
# update tab1 set value1 = 0.34 where id = 1111
# identify positions
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set
wherePos = df.select(instr(df.query, ' where ').alias('where')).collect()[0].where
idPos = df.select(instr(df.query, ' id').alias('id')).collect()[0].id
# identify table, fields&values, id
df = df.withColumn('table', upper(trim(df.query.substr(7, setPos - 7))))
df = df.withColumn('fieldValueList', upper(trim(df.query.substr(setPos + 5, (wherePos - (setPos + 5) + 1)))))
df = df.withColumn('id', upper(trim(df.query.substr(idPos + 5, 10))))
#identify the column being updated and the value
df.show(n=5, truncate = False)
Und ich rufe dies über:
updateDF.foreach(updateParser)
Aber ich bin die unten stehende Störung zu erhalten:
File "/home/mapr/scripts/cdc.py", line 19, in updateParser
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set
File "/opt/mapr/spark/spark-1.5.2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1257, in __getattr__
raise AttributeError(item)
AttributeError: select
ich nicht bin mit getattr überall. Ist es erforderlich? Wenn ich foreach nicht verwende und das direkt auf dem Datenframe abspiele, läuft es gut. Könnte jemand bitte beraten.
a) Dies ist kein gültiger Python-Code (zumindest Einrückung korrigieren) b) Wenn 'updateDF' ein' DataFrame' ist, ist dies kein gültiger Spark-Code. – zero323
Einrückung ging verloren, weil es vom vi-Editor kopiert wurde, und der Code läuft gut in pyspark und es wird in CLI und in pyspark Job getestet. – learning