0

Ich habe eine Operation, die ich in PySpark 2.0 durchführen möchten, die als df.rdd.map einfach durchzuführen wäre, aber da würde ich lieber in der Dataframe Ausführung bleiben Ich möchte aus Leistungsgründen einen Weg finden, dies nur mit Dataframe-Operationen zu tun.Verwenden Sie Daten in Spark Datareframe Spalte als Bedingung oder Eingabe in einer anderen Spalte Ausdruck

Der Betrieb, in RDD-Stil ist, so etwas wie dieses:

def precision_formatter(row): 
    formatter = "%.{}f".format(row.precision) 
    return row + [formatter % row.amount_raw/10 ** row.precision] 
df = df.rdd.map(precision_formatter) 

Grundsätzlich habe ich eine Spalte, die mir sagt, für jede Zeile, was die Genauigkeit für meine Zeichenfolge sollte die Formatierung Betrieb sein, und Ich möchte die Spalte 'Betrag_Zeichnung' abhängig von dieser Genauigkeit selektiv als Zeichenfolge formatieren.

Antwort

0

Ich kenne keine Möglichkeit, den Inhalt einer oder mehrerer Spalten als Eingabe für eine andere Spaltenoperation zu verwenden. Der nächste, den ich kommen kann, ist die Verwendung von Column.when mit einer extern definierten Menge von booleschen Operationen, die der Menge möglicher boolescher Bedingungen/Fälle innerhalb der Spalte oder Spalten entsprechen. In diesem speziellen Fall, zum Beispiel, wenn Sie alle möglichen Werte von row.precision erhalten (oder besser schon haben), dann können Sie über diese Menge iterieren und eine Column.when Operation für jeden Wert in der Menge anwenden. Ich glaube, dass dieser Satz mit df.select('precision').distinct().collect() erhalten werden kann.

Da die pyspark.sql.functions.when und Column.when Operationen selbst zurückgeben Column Objekt, können Sie über die Elemente in der Menge iterieren (aber es erhalten wurde) und halten ‚Anhängen‘ when Operationen miteinander programmatisch, bis Sie den Satz erschöpft:

import pyspark.sql.functions as PSF 

def format_amounts_with_precision(df, all_precisions_set): 
    amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType())) 
    for precision in all_precisions_set: 
     if precision != 0: # this is a messy way of having a base case above 
      fmt_str = '%.{}f'.format(precision) 
      amt_col = amt_col.when(df['precision'] == precision, 
          PSF.format_string(fmt_str, df['amount_raw']/10 ** precision) 

    return df.withColumn('amount', amt_col) 
0

Sie können es mit einem Python UDF tun. Sie können so viele Eingabewerte (Werte aus Spalten einer Zeile) aufnehmen und einen einzelnen Ausgabewert ausgeben. Es würde wie folgt aussehen:

from pyspark.sql import types as T, functions as F 
from pyspark.sql.function import udf, col 

# Create example data frame 
schema = T.StructType([ 
    T.StructField('precision', T.IntegerType(), False), 
    T.StructField('value', T.FloatType(), False) 
]) 

data = [ 
    (1, 0.123456), 
    (2, 0.123456), 
    (3, 0.123456) 
] 

rdd = sc.parallelize(data) 
df = sqlContext.createDataFrame(rdd, schema) 

# Define UDF and apply it 
def format_func(precision, value): 
    format_str = "{:." + str(precision) + "f}" 
    return format_str.format(value) 

format_udf = F.udf(format_func, T.StringType()) 

new_df = df.withColumn('formatted', format_udf('precision', 'value')) 
new_df.show() 

Auch, wenn anstelle der Spalte Genauigkeitswert Sie ein global man verwenden mögen, können Sie die beleuchtete (..) Funktion verwenden, wenn Sie es so nennen:

new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value')) 
Verwandte Themen