Für PySPark; Ich komme aus einem R/Pandas-Hintergrund, also finde ich Spark-Dataframes etwas einfacher.
dies zu tun:
- -Setup eines Funke SQL-Kontext
- Lesen Sie Ihre Datei in einen Datenrahmen
- Ihre Datenrahmen als Tabelle Temp Registrieren
- Abfrage direkt mit SQL-Syntax
- Speichern Sie die Ergebnisse als Objekte, Ausgabe in Dateien .. Ihre Sache
Hier ist ein Klasse-I erstellt, dies zu tun:
class SQLspark():
def __init__(self, local_dir='./', hdfs_dir='/users/', master='local', appname='spark_app', spark_mem=2):
self.local_dir = local_dir
self.hdfs_dir = hdfs_dir
self.master = master
self.appname = appname
self.spark_mem = int(spark_mem)
self.conf = (SparkConf()
.setMaster(self.master)
.setAppName(self.appname)
.set("spark.executor.memory", self.spark_mem))
self.sc = SparkContext(conf=self.conf)
self.sqlContext = SQLContext(self.sc)
def file_to_df(self, input_file):
# import file as dataframe, all cols will be imported as strings
df = self.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").option("inferSchema", "true").load(input_file)
# # cache df object to avoid rebuilding each time
df.cache()
# register as temp table for querying, use 'spark_df' as table name
df.registerTempTable("spark_df")
return df
# you also cast a spark dataframe as a pandas df
def sparkDf_to_pandasDf(self, input_df):
pandas_df = input_df.toPandas()
return pandas_df
def find_distinct(self, col_name):
my_query = self.sqlContext.sql("""SELECT distinct {} FROM spark_df""".format(col_name))
# now do your thing with the results etc
my_query.show()
my_query.count()
my_query.collect()
###############
if __name__ == '__main__':
# instantiate class
# see function for variables to input
spark = TestETL(os.getcwd(), 'hdfs_loc', "local", "etl_test", 10)
# specify input file to process
tsv_infile = 'path/to/file'
, die von 'simple' Variation ist auf' sehr simple' nach Ihrem RDBMS, die Sie den Fehler gemacht, nicht angeben. Bearbeiten Sie also Ihre Antwort und fügen Sie die RDBMS bitte –
@ThomasG Um faire Plattform wird angegeben, und es ist kein RDBMS. – zero323
Mögliches Duplikat von [In PySpark 1.5.0, wie listet man alle Elemente der Spalte \ 'y \' basierend auf den Werten der Spalte \ 'x \'?] Auf (http://stackoverflow.com/questions/36115411/ in-pyspark-1-5-0-how-do-you-liste-alle-artikel-von-spalte-y-basiert-auf-den-werten-von) – zero323