2017-06-03 14 views
1

Ich habe gestern eine ähnliche Frage gestellt - Matrix Multiplication between two RDD[Array[Double]] in Spark - aber ich habe mich entschieden, zu pyspark um dies zu tun. Ich habe einige Fortschritte beim Laden und Neuformatieren der Daten gemacht - Pyspark map from RDD of strings to RDD of list of doubles - aber die Matrix-Multiplikation ist schwierig. Lassen Sie mich meine Fortschritte teilen zuerst:Matrix Multiplikation A^T * A in PySpark

matrix1.txt 
1.2 3.4 2.3 
2.3 1.1 1.5 
3.3 1.8 4.5 
5.3 2.2 4.5 
9.3 8.1 0.3 
4.5 4.3 2.1 

es schwierig ist, Dateien zu teilen, aber das, was Datei aussieht meine matrix1.txt ist. Es ist eine durch Leerzeichen getrennte Textdatei, die die Werte einer Matrix enthält. Als nächstes ist der Code:

# do the imports for pyspark and numpy 
from pyspark import SparkConf, SparkContext 
import numpy as np 

# loadmatrix is a helper function used to read matrix1.txt and format 
# from RDD of strings to RDD of list of floats 
def loadmatrix(sc): 
    data = sc.textFile("matrix1.txt").map(lambda line: line.split(' ')).map(lambda line: [float(x) for x in line]) 
    return(data) 

# this is the function I am struggling with, it should take a line of the 
# matrix (formatted as list of floats), compute an outer product with itself 
def AtransposeA(line): 
    # pseudocode for this would be... 
    # outerprod = compute line * line^transpose  
    # return(outerprod) 

# here is the main body of my file  
if __name__ == "__main__": 
    # create the conf, sc objects, then use loadmatrix to read data 
    conf = SparkConf().setAppName('SVD').setMaster('local') 
    sc = SparkContext(conf = conf) 
    mymatrix = loadmatrix(sc) 

    # this is pseudocode for calling AtransposeA 
    ATA = mymatrix.map(lambda line: AtransposeA(line)).reduce(elementwise add all the outerproducts) 

    # the SVD of ATA is computed below 
    U, S, V = np.linalg.svd(ATA) 

    # ... 

Mein Ansatz ist wie folgt - Matrixmultiplikation A^T * A zu tun, erstelle ich eine Funktion, die äußeren Produkte der Reihen von A. Die element Summe aller der outerproducts berechnet ist das Produkt, das ich will. Ich rufe dann AtransposeA() in einer Map-Funktion auf, so wird es in jeder Zeile der Matrix ausgeführt, und schließlich verwende ich eine reduce(), um die resultierenden Matrizen hinzuzufügen.

Ich habe Probleme, darüber nachzudenken, wie die AtransposeA-Funktion aussehen sollte. Wie kann ich ein äußeres Produkt im Pyspark so machen? Vielen Dank im Voraus für Hilfe!

Antwort

0

Zuerst überlegen, warum Sie Spark dafür verwenden möchten. Es klingt, als ob alle Daten in den Speicher passen. In diesem Fall können Sie numpy und pandas sehr einfach verwenden.

Wenn Ihre Daten nicht so strukturiert sind, dass Zeilen unabhängig sind, dann kann sie wahrscheinlich nicht parallelisiert werden, indem Gruppen von Zeilen an verschiedene Knoten gesendet werden, was der einzige Sinn der Verwendung von Spark ist.

Nachdem gesagt, dass ... hier ist einige pyspark (2.1.1) Code, der ich denke, was Sie wollen.

# read the matrix file 
df = spark.read.csv("matrix1.txt",sep=" ",inferSchema=True) 
df.show() 
+---+---+---+ 
|_c0|_c1|_c2| 
+---+---+---+ 
|1.2|3.4|2.3| 
|2.3|1.1|1.5| 
|3.3|1.8|4.5| 
|5.3|2.2|4.5| 
|9.3|8.1|0.3| 
|4.5|4.3|2.1| 
+---+---+---+ 
# do the sum of the multiplication that we want, and get 
# one data frame for each column 
colDFs = [] 
for c2 in df.columns: 
    colDFs.append(df.select([ F.sum(df[c1]*df[c2]).alias("op_{0}".format(i)) for i,c1 in enumerate(df.columns) ])) 
# now union those separate data frames to build the "matrix" 
mtxDF = reduce(lambda a,b: a.select(a.columns).union(b.select(a.columns)), colDFs) 
mtxDF.show() 
+------------------+------------------+------------------+ 
|    op_0|    op_1|    op_2| 
+------------------+------------------+------------------+ 
|   152.45|118.88999999999999|    57.15| 
|118.88999999999999|104.94999999999999|    38.93| 
|    57.15|    38.93|52.540000000000006| 
+------------------+------------------+------------------+ 

Dies scheint das gleiche Ergebnis, die Sie von numpy bekommen zu sein.

a = numpy.genfromtxt("matrix1.txt") 
numpy.dot(a.T, a) 
array([[ 152.45, 118.89, 57.15], 
     [ 118.89, 104.95, 38.93], 
     [ 57.15, 38.93, 52.54]])