Ich habe eine Pyspark-App. Ich kopierte eine Hive-Tabelle in meine hdfs-Verzeichnis, & in Python I sqlContext.sql
eine Abfrage für diese Tabelle. Jetzt ist diese Variable ein Datenframe, den ich rows
rufe. Ich muss die rows
zufällig mischen, also musste ich sie in eine Liste der Reihen rows_list = rows.collect()
umwandeln. Also dann ich shuffle(rows_list)
, die die Listen an Ort und Stelle mischt. Ich nehme die Menge von zufälligen Reihen Ich brauche x
:Speichern einer Liste von Zeilen zu einem Hive-Tisch in pyspark
for r in range(x): allrows2add.append(rows_list[r])
Jetzt möchte ich als hive Tabelle speichern allrows2add oder anhängen eine vorhandene hive Tabelle (je nachdem, was leichter zu tun ist). Das Problem ist, dass ich das nicht tun kann:
all_df = sc.parallelize(allrows2add).toDF()
Cant dies tun, Schema kann nicht ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
ohne im ganzen Schema setzen zu entnehmen. Das Schema von rows
hat 117 Spalten, daher möchte ich sie nicht eingeben. Gibt es eine Möglichkeit, das Schema rows
zu extrahieren, um mir zu helfen, allrows2add einen Datenrahmen zu machen oder irgendwie als eine Hive-Tabelle zu speichern? Ich kann rows.printSchema()
tun, aber nicht sicher, wie es in einem Schema-Format als eine Variable zu bekommen toDF()
passieren, ohne all dieses Textes
Dank
Hinzufügen for-Schleife info
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
Vielen Dank das funktioniert. Ich ging mit dem Zugriff auf den "Schema" -Wert. Eine Sache, die ich versuche zu umgehen, ist, warum ist das so langsam (ob es Datenfeld in eine Liste von Zeilen konvertiert wird, oder einfache Dinge wie das Schreiben in eine Parkett-Datei oder versuchen, eine Bienenstock-Tabelle anhängen) - aber das könnte etwas mit meinem sein System gegen die API selbst. – KBA
Ohne auf Ihre Daten/Code zu schauen, kann ich nicht sicher sein. Ist Ihre Eingabedatei in Partitionen aufgeteilt? Wenn es sich um eine einzelne Partition handelt, wird Spark das Laden nicht parallelisieren. –
Ich bearbeite gerade meinen ursprünglichen Beitrag, um mehr Informationen zu zeigen. Ich bin ziemlich neu in Spark, also bin ich mir nicht 100% ig sicher, aber ich habe meine Eingaben von einer Hive-Tabelle (die ich vom Hive-Server in mein hdfs-Verzeichnis kopiert habe) geladen.Wenn du irgendwelche Vorschläge hast oder mich auf Ressourcen verweisen kannst (ich habe das auch in Scala versucht, also wird Scala-Code in Ordnung sein) - das wäre großartig! Vielen Dank – KBA