Ich habe gestern eine ähnliche Frage gestellt - Matrix Multiplication between two RDD[Array[Double]] in Spark - aber ich habe mich entschieden, zu pyspark um dies zu tun. Ich habe einige Fortschritte beim Laden und Neuformatieren der Daten gemacht - Pyspark map from RDD of strings to RDD of list of doubles - aber die Matrix-Multiplikation ist schwierig. Lassen Sie mich meine Fortschritte teilen zuerst:Matrix Multiplikation A^T * A in PySpark
matrix1.txt
1.2 3.4 2.3
2.3 1.1 1.5
3.3 1.8 4.5
5.3 2.2 4.5
9.3 8.1 0.3
4.5 4.3 2.1
es schwierig ist, Dateien zu teilen, aber das, was Datei aussieht meine matrix1.txt ist. Es ist eine durch Leerzeichen getrennte Textdatei, die die Werte einer Matrix enthält. Als nächstes ist der Code:
# do the imports for pyspark and numpy
from pyspark import SparkConf, SparkContext
import numpy as np
# loadmatrix is a helper function used to read matrix1.txt and format
# from RDD of strings to RDD of list of floats
def loadmatrix(sc):
data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line])
return(data)
# this is the function I am struggling with, it should take a line of the
# matrix (formatted as list of floats), compute an outer product with itself
def AtransposeA(line):
# pseudocode for this would be...
# outerprod = compute line * line^transpose
# return(outerprod)
# here is the main body of my file
if __name__ == "__main__":
# create the conf, sc objects, then use loadmatrix to read data
conf = SparkConf().setAppName('SVD').setMaster('local')
sc = SparkContext(conf = conf)
mymatrix = loadmatrix(sc)
# this is pseudocode for calling AtransposeA
ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts)
# the SVD of ATA is computed below
U, S, V = np.linalg.svd(ATA)
# ...
Mein Ansatz ist wie folgt - Matrixmultiplikation A^T * A zu tun, erstelle ich eine Funktion, die äußeren Produkte der Reihen von A. Die element Summe aller der outerproducts berechnet ist das Produkt, das ich will. Ich rufe dann AtransposeA() in einer Map-Funktion auf, so wird es in jeder Zeile der Matrix ausgeführt, und schließlich verwende ich eine reduce(), um die resultierenden Matrizen hinzuzufügen.
Ich habe Probleme, darüber nachzudenken, wie die AtransposeA-Funktion aussehen sollte. Wie kann ich ein äußeres Produkt im Pyspark so machen? Vielen Dank im Voraus für Hilfe!