2017-01-10 6 views
0

Ich bin völlig neu in Pysparks und RDD. Ich versuche zu verstehen, wie rdd funktioniert und ich habe Probleme beim Zugriff auf einen Teil der Daten in einem RDD. Ich möchte ein paar Spalten aus einer vorhandenen RDD auswählen und eine neue RDD erstellen.Erstellen einer neuen RDD aus einer anderen RDD in Python

Ein Beispiel unter:

user_rdd = [Row(id=u'1', first_name=u'Steve', last_name=u'Kent', email=u'[email protected]'),Row(id=u'2', first_name=u'Margaret', last_name=u'Peace', email=u'[email protected]')] 

display(user_rdd) 

| email     | first_name | id | last_name 
| [email protected]  | Steve  | 1 | Kent 
| [email protected] | Margaret | 2 | Peace 

Wie 2 Spalten auswählen aus user_rdd und eine neue rdd wie unten schaffen?

| id | first_name | last_name | full_name 
| 1 | Steve  | Kent  | Steve Kent 
| 2 | Margaret | Peace  | Margaret Peace 

Antwort

2

Hier ist wohl die einfachste Art und Weise, was Sie zu tun, sind nach (obwohl Ihr RDD wie sieht es aus einem DataFrame abgeleitet wurde)

from pyspark.sql import Row 

user_rdd = sc.parallelize([ 
    Row(id=u'1', 
     first_name=u'Steve', 
     last_name=u'Kent', 
     email=u'[email protected]'), 
    Row(id=u'2', 
     first_name=u'Margaret', 
     last_name=u'Peace', 
     email=u'[email protected]') 
]) 

new_rdd = user_rdd.map(lambda row: Row(
     first_name=row.first_name, 
     last_name=row.last_name, 
     full_name=row.first_name + ' ' + row.last_name)) 

new_rdd.take(2) 
1

Sie können dies versuchen:

sc = SparkContext(conf=conf) 
user_rdd = [Row(id=u'1', first_name=u'Steve', last_name=u'Kent', email=u'[email protected]'), 
      Row(id=u'2', first_name=u'Margaret', last_name=u'Peace', email=u'[email protected]')] 
rdd = sc.parallelize(user_rdd).map(lambda x: Row(fullname=' '.join([x['first_name'], x['last_name']]), **x.asDict())) 
print(rdd.collect()) 
0

nicht sicher, warum Sie es durch eine RDD tun wollen, weil eine Liste von Zeilen im Wesentlichen ein Datenrahmen ist, und es ist einfacher, eine Spalte auswählen und verketten Dort. Es ist auch effizienter als die Verwendung von RDDs.

from pyspark.sql import Row 
from pyspark.sql.functions import concat_ws 

user_rdd = [Row(id=u'1', first_name=u'Steve', last_name=u'Kent', email=u'[email protected]'),Row(id=u'2', first_name=u'Margaret', last_name=u'Peace', email=u'[email protected]')] 

user_df = spark_session.createDataFrame(user_rdd) 

user_df.select(user_df.id, user_df.first_name, user_df.last_name, concat_ws(' ',user_df.first_name, user_df.last_name).alias('full_name')).show() 

Dies wird Ihnen die Ausgabe als Datenrahmen geben:

+---+----------+---------+--------------+ 
| id|first_name|last_name|  full_name| 
+---+----------+---------+--------------+ 
| 1|  Steve|  Kent| Steve Kent| 
| 2| Margaret| Peace|Margaret Peace| 
+---+----------+---------+--------------+ 
Verwandte Themen