2016-04-27 1 views
3

Ich habe eine Pyspark-App. Ich kopierte eine Hive-Tabelle in meine hdfs-Verzeichnis, & in Python I sqlContext.sql eine Abfrage für diese Tabelle. Jetzt ist diese Variable ein Datenframe, den ich rows rufe. Ich muss die rows zufällig mischen, also musste ich sie in eine Liste der Reihen rows_list = rows.collect() umwandeln. Also dann ich shuffle(rows_list), die die Listen an Ort und Stelle mischt. Ich nehme die Menge von zufälligen Reihen Ich brauche x:Speichern einer Liste von Zeilen zu einem Hive-Tisch in pyspark

for r in range(x): allrows2add.append(rows_list[r]) Jetzt möchte ich als hive Tabelle speichern allrows2add oder anhängen eine vorhandene hive Tabelle (je nachdem, was leichter zu tun ist). Das Problem ist, dass ich das nicht tun kann:

all_df = sc.parallelize(allrows2add).toDF() Cant dies tun, Schema kann nicht ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

ohne im ganzen Schema setzen zu entnehmen. Das Schema von rows hat 117 Spalten, daher möchte ich sie nicht eingeben. Gibt es eine Möglichkeit, das Schema rows zu extrahieren, um mir zu helfen, allrows2add einen Datenrahmen zu machen oder irgendwie als eine Hive-Tabelle zu speichern? Ich kann rows.printSchema() tun, aber nicht sicher, wie es in einem Schema-Format als eine Variable zu bekommen toDF() passieren, ohne all dieses Textes

Dank

Hinzufügen for-Schleife info

#Table is a List of Rows from small Hive table I loaded using 
#query = "SELECT * FROM Table" 
#Table = sqlContext.sql(query).collect() 

for i in range(len(Table)): 

    rows = sqlContext.sql(qry) 
    val1 = Table[i][0] 
    val2 = Table[i][1] 
    count = Table[i][2] 
    x = 100 - count 

#hivetemp is a table that I copied from Hive to my hfs using: 
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup"; 
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy; 

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x) 

    rows = sqlContext.sql(query) 
    rows = rows.withColumn("col4", lit(10)) 
    rows = rows.withColumn("col5", lit(some_string)) 
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server 
    rows.saveAsParquetFile("rows"+str(i)+".parquet") 
#tried this before and heck slow also 
    #rows_list = rows.collect() 
    #shuffle(rows_list) 

Antwort

11
zu analysieren, die

Wenn das Schema nicht abgeleitet werden kann, gibt es normalerweise einen Grund. toDF ist syntaktischer Zucker für die createDataFrame-Funktion, die standardmäßig nur die ersten 100 Zeilen verwendet (despite the docs sagt, dass es nur die erste Zeile verwendet), um zu bestimmen, wie das Schema sein soll. Um dies zu ändern, können Sie das Sampling-Verhältnis erhöhen in einem größeren Prozentsatz Ihrer Daten suchen:

df = rdd.toDF(sampleRatio=0.2) 
# or... 
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2) 

Es ist auch möglich, dass Ihre Stichprobe zufällig nur nehmen Zeilen mit leeren Werten für einige bestimmten Spalten. Wenn dies der Fall ist, können Sie entweder create a schema from scratch wie so:

from pyspark.sql.types import * 
# all DataFrame rows are StructType 
# can create a new StructType with combinations of StructField 
schema = StructType([ 
    StructField("column_1", StringType(), True), 
    StructField("column_2", IntegerType(), True), 
    # etc. 
]) 
df = sqlContext.createDataFrame(rdd, schema=schema) 

Alternativ können Sie das Schema aus dem vorherigen Datenrahmen erhalten Sie durch Zugriff auf den schema Wert erstellt:

schema = df1.schema 
df2 = sqlContext.createDataFrame(rdd, schema=schema) 

Beachten Sie, dass, wenn Ihr RDD-Zeilen sind keine StructType (alias) Objekte anstelle von Wörterbüchern oder Listen, Sie können keinen Datenrahmen von ihnen erstellen. Wenn Ihr RDD Reihen Wörterbücher sind, können Sie sie zu Objekte wie folgt konvertieren:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x)) 
# ** is to unpack the dictionary since the Row constructor 
# only takes keyword arguments 
+0

Vielen Dank das funktioniert. Ich ging mit dem Zugriff auf den "Schema" -Wert. Eine Sache, die ich versuche zu umgehen, ist, warum ist das so langsam (ob es Datenfeld in eine Liste von Zeilen konvertiert wird, oder einfache Dinge wie das Schreiben in eine Parkett-Datei oder versuchen, eine Bienenstock-Tabelle anhängen) - aber das könnte etwas mit meinem sein System gegen die API selbst. – KBA

+1

Ohne auf Ihre Daten/Code zu schauen, kann ich nicht sicher sein. Ist Ihre Eingabedatei in Partitionen aufgeteilt? Wenn es sich um eine einzelne Partition handelt, wird Spark das Laden nicht parallelisieren. –

+0

Ich bearbeite gerade meinen ursprünglichen Beitrag, um mehr Informationen zu zeigen. Ich bin ziemlich neu in Spark, also bin ich mir nicht 100% ig sicher, aber ich habe meine Eingaben von einer Hive-Tabelle (die ich vom Hive-Server in mein hdfs-Verzeichnis kopiert habe) geladen.Wenn du irgendwelche Vorschläge hast oder mich auf Ressourcen verweisen kannst (ich habe das auch in Scala versucht, also wird Scala-Code in Ordnung sein) - das wäre großartig! Vielen Dank – KBA

Verwandte Themen