Ich habe einen Datensatz mit den Spalten userId
(String), itemId
(int) und rating
(int).So kombinieren Sie Spark RDD und PairRDD in Java
+----------+----------+---------+
| userId | itemId | rating |
+----------+----------+---------+
| abc13 | 23 | 1 |
+----------+----------+---------+
| qwe34 | 56 | 3 |
+----------+----------+---------+
| qwe34 | 35 | 4 |
+----------+----------+---------+
Ich möchte die Zeichenfolge userIds auf einzigartige Long-Werte abzubilden. Ich habe versucht, die userIds mit zipWithUniqueId()
und es gibt eine pairRDD
.
+------------+----------------+
| userId | userIdMapped |
+------------+----------------+
| abc13 | 0 |
+------------+----------------+
| qwe34 | 1 |
+------------+----------------+
Ich möchte die langen Werte auf eine andere Spalte hinzufügen und den Datensatz, wie unten zu erstellen:
+----------+----------+---------+----------------+
| userId | itemId | rating | userIdMapped |
+----------+----------+---------+----------------+
| abc13 | 23 | 1 | 0 |
+----------+----------+---------+----------------+
| qwe34 | 56 | 3 | 1 |
+----------+----------+---------+----------------+
| qwe34 | 35 | 4 | 1 |
+----------+----------+---------+----------------+
Ich habe versucht, die folgenden:
JavaRDD<Feedback> feedbackRDD = spark.read().jdbc(MYSQL_CONNECTION_URL, feedbackQuery, connectionProperties)
.javaRDD().map(Feedback.mapFunc);
JavaPairRDD<String, Long> mappedPairRDD = feedbackRDD.map(new Function<Feedback, String>() {
public String call(Feedback p) throws Exception {
return p.getUserId();
}).distinct().zipWithUniqueId();
Dataset<Row> feedbackDS = spark.createDataFrame(feedbackRDD, Feedback.class);
Dataset<String> stringIds = spark.createDataset(zipped.keys().collect(), Encoders.STRING());
Dataset<Long> valueIds = spark.createDataset(zipped.values().collect(), Encoders.LONG());
Dataset<Row> longIds = valueIds.withColumnRenamed("value", "userIdMapped");
Dataset<Row> userIdMap = intIds.join(stringIds);
Dataset<Row> feedbackDSUserMapped = feedbackDS.join(userIdMap, feedbackDS.col("userId").equalTo(userIdMap.col("value")),
"inner");
//Here 'value' column contains string user ids
Der userIdMap
-Datensatz verbunden ist fälschlicherweise als unten:
+-----------------+----------------+
| userIdMapped | value |
+-----------------+----------------+
| 0 | abc13 |
+-----------------+----------------+
| 0 | qwe34 |
+-----------------+----------------+
| 1 | abc13 |
+-----------------+----------------+
| 1 | qwe34 |
+-----------------+----------------+
Daher ist die resultierende feedbackDSUserMapped
falsch.
Ich bin neu in Spark und ich bin mir sicher, dass es einen besseren Weg dafür geben muss.
Was ist der beste Weg, um den langen Wert von pairRDD
zu erhalten und auf die entsprechende userId im ursprünglichen Datensatz (RDD
) zu setzen?
Jede Hilfe würde sehr geschätzt werden.
Die Daten sind für das ALS-Modell zu verwenden.
Dies funktionierte aber nicht Zum Glück müssen die Daten für das ALS-Modell verwendet werden und monoton_increasing_id() erzeugt Werte außerhalb des int-Bereichs. Trotzdem danke für die ausführliche Antwort :) – Fleur