2016-09-17 3 views
5

von einem pyspark SQL Datenrahmen wiepyspark, Funken:., Wie letzte Zeile auszuwählen und auch, wie pyspark Datenrahmen durch den Index

name age city 
abc 20 A 
def 30 B 

Wie für den Zugriff auf die letzte Reihe bekommen (wie von df.limit (1) Ich kann die erste Zeile des Datenrahmens in einen neuen Datenrahmen bringen).

Und wie kann ich auf die Datenframezeilen per Index zugreifen.wie Zeilennr. 12 oder 200.

In Pandas kann ich

df.tail(1) # for last row 
df.ix[rowno or index] # by index 
df.loc[] or by df.iloc[] 

ich nur neugierig bin tun, wie pyspark Datenrahmen in einer solchen Art und Weise oder alternative Möglichkeiten zuzugreifen.

Dank

Antwort

3

Wie die letzte Reihe zu bekommen.

Lang und hässliche Art und Weise, die davon ausgeht, dass alle Spalten sind oderable:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id 
) 

last_row = (df 
    .withColumn("_id", monotonically_increasing_id()) 
    .select(max(struct("_id", *df.columns)) 
    .alias("tmp")).select(col("tmp.*")) 
    .drop("_id")) 

Wenn nicht alle Spalten sein, um Sie können versuchen:

with_id = df.withColumn("_id", monotonically_increasing_id()) 
i = with_id.select(max_("_id")).first()[0] 

with_id.where(col("_id") == i).drop("_id") 

Hinweis. Es gibt last Funktion in pyspark.sql.functions/`o.a.s.sql.Funktionen, aber unter Berücksichtigung description of the corresponding expressions ist es keine gute Wahl hier.

wie kann ich Zugriff auf die Datenrahmen Reihen von index.like

Sie können nicht. Spark DataFrame und zugänglich nach Index. You can add indices using zipWithIndex und filter später. Denken Sie nur daran, diese O (N) Operation.

+0

Hallo mit erstellen, bin zur Handhabung i die letzte row by autoincrement ID Spalte hinzufügen Weg oder für kleine df, ich war mit toPandas(). Tail (1). Wie auch immer, danke für die Antwort. Und dieser Indexzugriff von Datenframes, die ich gefragt habe, ist wegen, manchmal muss ich einen Spaltenwert ersetzen (durch irgendeinen Col-Wert Gleichheitsbedingung und dazu tue ich Hilfe von einem udf). Aber wenn ich nur eine Instanz (bestimmte Index Nr. Zeile) ersetzen möchte, dann hatte ich keine Möglichkeiten, das zu tun. Jetzt kann ich "zipWithIndex" wie vorgeschlagen verwenden. Vielen Dank. – Satya

0

Verwenden Sie die folgenden eine Indexspalte zu erhalten, die monoton steigende enthält, einzigartig, und aufeinanderfolgenden ganzen Zahlen, die nicht wie monotonically_increasing_id() Arbeit. Die Indizes werden in der gleichen Reihenfolge wie colName Ihres DataFrame aufsteigend.

import pyspark.sql.functions as F 
from pyspark.sql.window import Window as W 

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow) 

df = df\ 
.withColumn('int', F.lit(1))\ 
.withColumn('index', F.sum('int').over(window))\ 
.drop('int')\ 

den folgenden Code verwenden am Schwanz zu sehen, oder die letzten rownums des Datenrahmen.

rownums = 10 
df.where(F.col('index')>df.count()-rownums).show() 

den folgenden Code verwenden in den Reihen von start_row zu end_row die Datenrahmen zu suchen.

start_row = 20 
end_row = start_row + 10 
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show() 

zipWithIndex() ist eine RDD-Methode, die monoton steigende, einzigartig, und aufeinanderfolgenden ganze Zahlen zurückkehrt, scheint aber viel langsamer zu sein in einer Art und Weise zu implementieren, wo Sie zurück zu Ihrem ursprünglichen Datenrahmen geändert mit einer id-Spalte zu bekommen.

2

So erhalten Sie die letzte Zeile.

Wenn Sie eine Spalte, die Sie Datenrahmen, zum Beispiel „Index“ zu bestellen, dann eine einfache Möglichkeit, um die letzte Aufzeichnung SQL verwenden verwenden können: 1) bestellen Sie Ihre Tabelle im Auftrag und absteigend 2 nehmen) 1. Wert aus dieser Bestellung

df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec) 
latest_rec.show() 

Und wie kann ich die Datenrahmen Zeilen zugreifen, indem index.like Zeile nicht. 12 oder 200.

ähnlicher Weise können Sie Datensatz in einer beliebigen Zeile

row_number = 12 
df.createOrReplaceTempView("table_df") 
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1""" 
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number)) 
latest_rec.show() 

Wenn Sie nicht bekommen, „Index“ Spalte können Sie es

from pyspark.sql.functions import monotonically_increasing_id 

df = df.withColumn("index", monotonically_increasing_id()) 
+0

Vielen Dank für die gut erklärte Antwort. groß, um jetzt einen neuen Ansatz. – Satya

+0

'monoton_increasing_id()' [Dokumentation] (http://spark.apache.org/docs/2.2.0/api/python/pypark.sql.html#pypark.sql.functions.monotonically_increasing_id) *** "Die aktuelle Implementation setzt die Partitions-ID in die oberen 31 Bits und die Datensatznummer innerhalb jeder Partition in die unteren 33 Bits. "*** Dies funktioniert daher nicht so, wie Sie es für große Datenrahmen, die über verschiedene Partitionen hinweg gespeichert sind, halten. Sie können nicht auf die letzte Zeile Ihres Dataframes verweisen, es sei denn, es befindet sich alles in einer Partition. – Clay

+0

@Clay Der letzte Teil war mehr ergänzend. Aber wenn ein großer DataFrame wirklich RIESIG ist, dh nicht "monoton_increasing_id()' s Annahme ** "Datenrahmen hat weniger als 1 Milliarde Partitionen, und jede Partition hat weniger als 8 Milliarden Datensätze" **, dann könnte man verwenden sql 'ROW_NUMBER() OVER (PARTITION VON xxx ORDER BY yyy)' als Alternative. –