2016-03-20 13 views
2

Also gehe ich davon aus haben Sie die folgende Tabelle:SQL on Spark: Wie bekomme ich alle Werte von DISTINCT?

Name | Color 
------------------------------ 
John | Blue 
Greg | Red 
John | Yellow 
Greg | Red 
Greg | Blue 

Ich mag würde für jeden Namen eine Tabelle der verschiedenen Farben bekommen - wie viele und deren Werte. Bedeutung, etwas wie das:

Name | Distinct | Values 
-------------------------------------- 
John | 2  | Blue, Yellow 
Greg | 2  | Red, Blue 

Irgendwelche Ideen, wie man das macht?

+0

, die von 'simple' Variation ist auf' sehr simple' nach Ihrem RDBMS, die Sie den Fehler gemacht, nicht angeben. Bearbeiten Sie also Ihre Antwort und fügen Sie die RDBMS bitte –

+0

@ThomasG Um faire Plattform wird angegeben, und es ist kein RDBMS. – zero323

+0

Mögliches Duplikat von [In PySpark 1.5.0, wie listet man alle Elemente der Spalte \ 'y \' basierend auf den Werten der Spalte \ 'x \'?] Auf (http://stackoverflow.com/questions/36115411/ in-pyspark-1-5-0-how-do-you-liste-alle-artikel-von-spalte-y-basiert-auf-den-werten-von) – zero323

Antwort

3

collect_list gibt Ihnen eine Liste, ohne Duplikate zu entfernen. collect_set automatisch Duplikate entfernen so nur

select 
Name, 
count(distinct color) as Distinct, # not a very good name 
collect_set(Color) as Values 
from TblName 
group by Name 

diese Funktion seit Funke implementiert ist es 1.6.0 Besuche:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala

/** 
    * Aggregate function: returns a set of objects with duplicate elements eliminated. 
    * 
    * For now this is an alias for the collect_set Hive UDAF. 
    * 
    * @group agg_funcs 
    * @since 1.6.0 
    */ 
    def collect_set(columnName: String): Column = collect_set(Column(columnName)) 
0

Für PySPark; Ich komme aus einem R/Pandas-Hintergrund, also finde ich Spark-Dataframes etwas einfacher.

dies zu tun:

  1. -Setup eines Funke SQL-Kontext
  2. Lesen Sie Ihre Datei in einen Datenrahmen
  3. Ihre Datenrahmen als Tabelle Temp Registrieren
  4. Abfrage direkt mit SQL-Syntax
  5. Speichern Sie die Ergebnisse als Objekte, Ausgabe in Dateien .. Ihre Sache

Hier ist ein Klasse-I erstellt, dies zu tun:

class SQLspark(): 

def __init__(self, local_dir='./', hdfs_dir='/users/', master='local', appname='spark_app', spark_mem=2): 
    self.local_dir = local_dir 
    self.hdfs_dir = hdfs_dir 
    self.master = master 
    self.appname = appname 
    self.spark_mem = int(spark_mem) 
    self.conf = (SparkConf() 
      .setMaster(self.master) 
      .setAppName(self.appname) 
      .set("spark.executor.memory", self.spark_mem)) 
    self.sc = SparkContext(conf=self.conf) 
    self.sqlContext = SQLContext(self.sc) 


def file_to_df(self, input_file): 
    # import file as dataframe, all cols will be imported as strings 
    df = self.sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").option("inferSchema", "true").load(input_file) 
    # # cache df object to avoid rebuilding each time 
    df.cache() 
    # register as temp table for querying, use 'spark_df' as table name 
    df.registerTempTable("spark_df") 
    return df 

# you also cast a spark dataframe as a pandas df 
def sparkDf_to_pandasDf(self, input_df): 
    pandas_df = input_df.toPandas() 
    return pandas_df 

def find_distinct(self, col_name): 
    my_query = self.sqlContext.sql("""SELECT distinct {} FROM spark_df""".format(col_name)) 
    # now do your thing with the results etc 
    my_query.show() 
    my_query.count() 
    my_query.collect() 

############### 
if __name__ == '__main__': 

# instantiate class 
# see function for variables to input 
spark = TestETL(os.getcwd(), 'hdfs_loc', "local", "etl_test", 10) 


# specify input file to process 
tsv_infile = 'path/to/file'