2016-11-16 3 views
3

Ich habe einen Datensatz mit den Spalten userId (String), itemId (int) und rating (int).So kombinieren Sie Spark RDD und PairRDD in Java

+----------+----------+---------+ 
| userId | itemId | rating | 
+----------+----------+---------+ 
| abc13 | 23 | 1 | 
+----------+----------+---------+ 
| qwe34 | 56 | 3 | 
+----------+----------+---------+ 
| qwe34 | 35 | 4 | 
+----------+----------+---------+ 

Ich möchte die Zeichenfolge userIds auf einzigartige Long-Werte abzubilden. Ich habe versucht, die userIds mit zipWithUniqueId() und es gibt eine pairRDD.

+------------+----------------+ 
| userId | userIdMapped | 
+------------+----------------+ 
| abc13 |  0  | 
+------------+----------------+ 
| qwe34 |  1  | 
+------------+----------------+ 

Ich möchte die langen Werte auf eine andere Spalte hinzufügen und den Datensatz, wie unten zu erstellen:

+----------+----------+---------+----------------+ 
| userId | itemId | rating | userIdMapped | 
+----------+----------+---------+----------------+ 
| abc13 | 23 | 1 |  0  | 
+----------+----------+---------+----------------+ 
| qwe34 | 56 | 3 |  1  | 
+----------+----------+---------+----------------+ 
| qwe34 | 35 | 4 |  1  | 
+----------+----------+---------+----------------+ 

Ich habe versucht, die folgenden:

JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties) 
      .javaRDD().map(Feedback.mapFunc); 
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() { 
    public String call(Feedback p) throws Exception { 
     return p.getUserId(); 
    }).distinct().zipWithUniqueId(); 
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class); 
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING()); 
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());  
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped"); 
Dataset<Row> userIdMap = intIds.join(stringIds);  
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")), 
      "inner"); 
//Here 'value' column contains string user ids 

Der userIdMap-Datensatz verbunden ist fälschlicherweise als unten:

+-----------------+----------------+ 
| userIdMapped |  value  | 
+-----------------+----------------+ 
|   0  |  abc13  | 
+-----------------+----------------+ 
|   0  |  qwe34  | 
+-----------------+----------------+ 
|   1  |  abc13  | 
+-----------------+----------------+ 
|   1  |  qwe34  | 
+-----------------+----------------+ 

Daher ist die resultierende feedbackDSUserMapped falsch.

Ich bin neu in Spark und ich bin mir sicher, dass es einen besseren Weg dafür geben muss.

Was ist der beste Weg, um den langen Wert von pairRDD zu erhalten und auf die entsprechende userId im ursprünglichen Datensatz (RDD) zu setzen?

Jede Hilfe würde sehr geschätzt werden.

Die Daten sind für das ALS-Modell zu verwenden.

Antwort

0

Gelöst es StringIndexer mit:

StringIndexer indexer = new StringIndexer() 
       .setInputCol("userId") 
       .setOutputCol("userIdMapped"); 
Dataset<Row> userJoinedDataSet = indexer.fit(feedbackDS).transform(feedbackDS); 
1

Sie können Folgendes versuchen. Weisen Sie eine eindeutige ID eine eingebaute Funktion und die Verknüpfung mit dem Original-Datensatz

/** 
* Created by RGOVIND on 11/16/2016. 
*/ 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.sql.*; 

import java.util.ArrayList; 
import java.util.List; 

public class SparkUserObjectMain { 
    static public void main(String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local").setAppName("Stack Overflow App"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     SQLContext sqlContext = new SQLContext(sc); 
     List<UserObject> users = new ArrayList<UserObject>(); 

     //seed the data 
     UserObject user1 = new UserObject("abc13", "23", "1"); 
     UserObject user2 = new UserObject("qwe34", "56", "3"); 
     UserObject user3 = new UserObject("qwe34", "35", "4"); 
     users.add(user1); 
     users.add(user2); 
     users.add(user3); 

     //how to encode the object ? 
     Encoder<UserObject> userObjectEncoder = Encoders.bean(UserObject.class); 
     //Create the user dataset 
     Dataset<UserObject> usersDataSet = sqlContext.createDataset(users, userObjectEncoder); 
     //assign unique id's 
     Dataset<Row> uniqueUsersWithId = usersDataSet.dropDuplicates("userId").select("userId").withColumn("id", functions.monotonically_increasing_id()); 
     //join with original 
     Dataset<Row> joinedDataSet = usersDataSet.join(uniqueUsersWithId, "userId"); 
     joinedDataSet.show(); 

    } 
} 

Die Bohne:

/** 
* Created by RGOVIND on 11/16/2016. 
*/ 
public class UserObject { 

    private String userId; 
    private String itemId; 
    private String rating; 

public UserObject(){ 

    } 

    public UserObject(String userId, String itemId, String rating) { 
     this.userId = userId; 
     this.itemId = itemId; 
     this.rating = rating; 
    } 

    public String getUserId() { 

     return userId; 
    } 

    public void setUserId(String userId) { 
     this.userId = userId; 
    } 

    public String getItemId() { 
     return itemId; 
    } 

    public void setItemId(String itemId) { 
     this.itemId = itemId; 
    } 

    public String getRating() { 
     return rating; 
    } 

    public void setRating(String rating) { 
     this.rating = rating; 
    } 

} 

Drucke:

+------+------+------+------------+ 
|userId|itemId|rating|   id| 
+------+------+------+------------+ 
| abc13| 23|  1|403726925824| 
| qwe34| 56|  3|901943132160| 
| qwe34| 35|  4|901943132160| 
+------+------+------+------------+ 
+0

Dies funktionierte aber nicht Zum Glück müssen die Daten für das ALS-Modell verwendet werden und monoton_increasing_id() erzeugt Werte außerhalb des int-Bereichs. Trotzdem danke für die ausführliche Antwort :) – Fleur