1

Lassen Q eine verteilte Reihe Matrix in Funken sein, ich will Q' das Kreuzprodukt von Q mit seiner transponieren berechnen.Wie berechnet man das Punktprodukt zweier verteilter RowMatrix in Apache Spark?

Obwohl eine Row Matrix jedoch eine multiply() Methode hat, kann sie nur lokale Matrizen als Argument akzeptieren.

-Code Illustration (Scala):

val phi = new RowMatrix(phiRDD)   // phiRDD is an instance of RDD[Vector] 
val phiTranspose = transposeRowMatrix(phi) // transposeRowMatrix() 
              // returns the transpose of a RowMatrix 
val crossMat = ?       // phi * phiTranspose 

Bitte beachte, dass ich das Skalarprodukt Distributed RowMatrix kein verteilt man mit einem lokalen ein ausführen möchten.

Eine Lösung ist eine IndexedRowMatrix wie folgt zu verwenden:

val phi = new IndexedRowMatrix(phiRDD) // phiRDD is an instance of RDD[IndexedRow] 
val phiTranspose = transposeMatrix(phi) // transposeMatrix() 
             // returns the transpose of a Matrix 
val crossMat = phi.toBlockMatrix().multiply(phiTranspose.toBlockMatrix() 
              ).toIndexedRowMatrix() 

Allerdings möchte ich die Reihe Matrix-Methoden verwenden, wie tallSkinnyQR() und das bedeutet, dass ich crossMat zu einem RowMatrix verwandeln sholud, mit .toRowMatrix() Methode:

val crossRowMat = crossMat.toRowMatrix() 

und schließlich kann ich mich bewerben

crossRowMat.tallSkinnyQR() 

aber dieser Prozess viele Transformationen zwischen den Typen der Distributed Matrices umfasst und nach dem, was ich von MLlib Programming Guide verstand dies ist teuer:

Es ist sehr wichtig, das richtige Format zu speichern groß und verteilt zu wählen Matrizen. Das Konvertieren einer verteilten Matrix in ein anderes Format erfordert möglicherweise ein globales Shuffle, was ziemlich teuer ist.

Würde jemand bitte ausarbeiten.

Antwort

2

Nur verteilte Matrizen, die Matrix - Matrix - Multiplikation unterstützen, sind . Sie müssen Ihre Daten konvertieren entsprechend - künstliche Indizes sind gut genug:

new IndexedRowMatrix(
    rowMatrix.rows.zipWithIndex.map(x => IndexedRow(x._2, x._1)) 
).toBlockMatrix match { case m => m.multiply(m.transpose) } 
+0

Würden Sie bitte meine aktualisierte Version überprüfen. – user8547317

1

ich den Algorithmus auf dieser page genannten Zwecken verwendet werden, die auf verteilte Skalarprodukt Problem der Multiplikation Problem von Punktprodukt bewegt sich durch Vektoren äußere Produkt mit:

das äußere Produkt zwischen zwei Vektoren ist das Skalarprodukt des zweiten Vektors mit allen Elementen im ersten Vektor, wodurch eine Matrix

Meine eigene erstellte Multiplikationsfunktion (kann besser optimiert werden) für Zeilenmatrizen endete so.

def multiplyRowMatrices(m1: RowMatrix, m2: RowMatrix)(implicit ctx: SparkSession): RowMatrix = { 

// Zip m1 columns with m2 rows 
val m1Cm2R = transposeRowMatrix(m1).rows.zip(m2.rows) 

// Apply scalar product between each entry in m1 vector with m2 row 
val scalar = m1Cm2R.map{ 
case(column:DenseVector,row:DenseVector) => column.toArray.map{ 
    columnValue => row.toArray.map{ 
    rowValue => columnValue*rowValue 
    } 
} 
} 

// Add all the resulting matrices point wisely 
val sum = scalar.reduce{ 
case(matrix1,matrix2) => matrix1.zip(matrix2).map{ 
    case(array1,array2)=> array1.zip(array2).map{ 
    case(value1,value2)=> value1+value2 
    } 
} 
} 

new RowMatrix(ctx.sparkContext.parallelize(sum.map(array=> Vectors.dense(array)))) 
} 

Danach habe ich getestet sowohl meine eigene Funktion und Verwendung von Blockmatrix approaches- - mit einer 300 * 10 Matrix auf einer Maschine

meine eigene Funktion verwenden:

val PhiMat = new RowMatrix(phi) 
val TphiMat = transposeRowMatrix(PhiMat) 
val product = multiplyRowMatrices(PhiMat,TphiMat) 

Matrix Transformation:

val MatRow = new RowMatrix(phi) 
val MatBlock = new IndexedRowMatrix(MatRow.rows.zipWithIndex.map(x => IndexedRow(x._2, x._1))).toBlockMatrix() 
val TMatBlock = MatBlock.transpose 
val productMatBlock = MatBlock.multiply(TMatBlock) 
val productMatRow = productMatBlock.toIndexedRowMatrix().toRowMatrix() 

Der erste Ansatz 1 jo spannte b mit 5 Stufen und nahm 2s, um insgesamt abzuschließen. Während der zweite Ansatz spannte 4 Jobs, drei mit einer Stufe und eines mit zwei Stufen und nahm 0.323s insgesamt. Auch der zweite Ansatz übertraf den ersten bezüglich der Shuffle Read/Write-Größe.

doch bin ich immer noch von der MLlib Programming Führer Aussage verwirrt:

Es ist sehr wichtig, das richtige Format zu wählen, große und verteilt Matrizen zu speichern. Das Konvertieren einer verteilten Matrix in ein anderes -Format erfordert möglicherweise einen globalen Shuffle, der ziemlich teuer ist.

Verwandte Themen