2016-09-14 7 views
0

Verwendung Spark, habe ich eine Datenstruktur vom Typ val rdd = RDD[(x: Int, y:Int), cov:Double] in Scala, wobei jedes Element des RDD stellt ein Element aus einer Matrix mit x repräsentiert die Zeile, y, die die Säule und cov repräsentierte der Wert des Elements:eine sparsevector aus den Elementen der RDD

Ich muss SparseVectors aus Zeilen dieser Matrix erstellen. Also habe ich beschlossen, zunächst die rdd zu RDD[x: Int, (y:Int, cov:Double)] zu konvertieren und dann groupByKey verwenden, um alle Elemente einer bestimmten Zeile zu setzen zusammen wie folgt aus:

val rdd2 = rdd.map{case ((x,y),cov) => (x, (y, cov))}.groupByKey()

Jetzt muss ich die SparseVectors erstellen:

val N = 7  //Vector Size 
val spvec = {(x: Int,y: Iterable[(Int, Double)]) => new SparseVector(N.toLong, Array(y.map(el => el._1.toInt)), Array(y.map(el => el._2.toDouble)))} 
val vecs = rdd2.map(spvec) 

Dies ist jedoch der Fehler, der auftritt.

type mismatch; found :Iterable[Int] required:Int 
type mismatch; found :Iterable[Double] required:Double 

Ich vermute, dass y.map(el => el._1.toInt) eine iterable zurückkehrt, das Array kann nicht weiter angewendet werden. Ich würde mich freuen, wenn jemand dabei helfen könnte.

Antwort

0

Die einfachste Lösung ist zu RowMatrix konvertieren:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 

val rdd: RDD[((Int, Int), Double)] = ??? 

val vs: RDD[org.apache.spark.mllib.linalg.SparseVector]= new CoordinateMatrix(
    rdd.map{ 
    case ((x, y), cov) => MatrixEntry(x, y, cov) 
    } 
).toRowMatrix.rows.map(_.toSparse) 

Wenn Sie Zeilenindizes erhalten möchten, können Sie toIndexedRowMatrix statt:

import org.apache.spark.mllib.linalg.distributed.IndexedRow 

new CoordinateMatrix(
    rdd.map{ 
    case ((x, y), cov) => MatrixEntry(x, y, cov) 
    } 
).toIndexedRowMatrix.rows.map { case IndexedRow(i, vs) => (i, vs.toSparse) } 
+0

Danke. Es funktioniert für 'toRowMatrix', aber nicht für' toIndexedRowMatrix' und besagt, dass "value toSparse kein Mitglied von org.apache.spark.mllib.linalg.distributed.IndexedRow" ist. Ich möchte die Zeilenindizes beibehalten. – EdgeRover

+0

Da es 'IndexedRows' nicht' Vektoren' enthält. – zero323

Verwandte Themen