ich diesen Funken Programm haben, und ich werde versuchen, es nur die relevanten Teile funktioniert sehr gut auf meinem lokalen RechnerSpark-Programm gibt ungerade Ergebnisse, wenn auf Standalone-Cluster lief
# Split by delimiter ,
# If the file is in unicode, we need to convert each value to a float in order to be able to
# treat it as a number
points = sc.textFile(filename).map(lambda line: [float(x) for x in line.split(",")]).persist()
# start with K randomly selected points from the dataset
# A centroid cannot be an actual data point or else the distance measure between a point and
# that centroid will be zero. This leads to an undefined membership value into that centroid.
centroids = points.takeSample(False, K, 34)
#print centroids
# Initialize our new centroids
newCentroids = [[] for k in range(K)]
tempCentroids = []
for centroid in centroids:
tempCentroids.append([centroid[N] + 0.5])
#centroids = sc.broadcast(tempCentroids)
convergence = False
ncm = NCM()
while(not convergence):
memberships = points.map(lambda p : (p, getMemberships([p[N]], centroids.value, m)))
cmax = memberships.map(lambda (p, mus) : (p, getCMax(mus, centroids.value)))
# Memberships
T = cmax.map(lambda (p, c) : (p, getMemberships2([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)))
I = cmax.map(lambda (p, c) : (p, getIndeterminateMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
F = cmax.map(lambda (p, c) : (p, getFalseMemberships([p[N]], centroids.value, m, delta, weight1, weight2, weight3, c)[0]))
# Components of new centroids
wTm = T.map(lambda (x, t) : ('onekey', scalarPow(m, scalarMult(weight1, t))))
#print "wTm = " + str(wTm.collect())
print "at first reduce"
sumwTm = wTm.reduceByKey(lambda p1, p2 : addPoints(p1, p2))
#print "sumwTm = " + str(sumwTm.collect())
wTmx = T.map(lambda (x, t) : pointMult([x[N]], scalarPow(m, scalarMult(weight1, t))))
print "adding to cnumerator list"
#print wTmx.collect()
cnumerator = wTmx.flatMap(lambda p: getListComponents(p)).reduceByKey(lambda p1, p2 : p1 + p2).values()
print "collected cnumerator, now printing"
#print "cnumerator = " + str(cnumerator.collect())
#print str(sumwTm.collect())
# Calculate the new centroids
sumwTmCollection = sumwTm.collect()[0][1]
cnumeratorCollection = cnumerator.collect()
#print "sumwTmCollection = " + str(sumwTmCollection)
#cnumeratorCollection =cnumerator.collectAsMap().get(0).items
print "cnumeratorCollection = " + str(cnumeratorCollection)
for i in range(len(newCentroids)):
newCentroids[i] = scalarMult(1/sumwTmCollection[i], [cnumeratorCollection[i]])
centroids = newCentroids
# Test for convergence
convergence = ncm.test([centroids[N]], [newCentroids[N]], epsilon)
#convergence = True
# Replace our old centroids with the newly found centroids and repeat if convergence not met
# Clear out space for a new set of centroids
newCentroids = [[] for k in range(K)]
Dieses Programm zu begrenzen, jedoch Es verhält sich nicht wie erwartet, wenn es auf einem eigenständigen Cluster ausgeführt wird. Es wirft nicht notwendigerweise einen Fehler auf, aber was es tut, ergibt eine andere Ausgabe als die, die ich beim Ausführen auf meinem lokalen Rechner erhalte. Der Cluster und die 3 Knoten scheinen gut zu funktionieren. Ich habe das Gefühl, das Problem ist, dass ich aktualisieren centroids
, die eine Python-Liste ist, und es ändert sich jedes Mal durch die while-loop
. Ist es möglich, dass nicht jeder Knoten die neueste Kopie dieser Liste hat? Ich denke so, also habe ich versucht, eine broadcast variable
zu verwenden, aber diese kann nicht aktualisiert werden (nur lesen). Ich habe auch versucht, eine accumulator
, aber das sind nur für Ansammlungen. Ich habe auch versucht, die Python-Listen als eine Datei auf hdfs für jeden Knoten zu speichern, um Zugriff zu haben, aber das hat nicht gut funktioniert. Glaubst du, ich verstehe das Problem richtig? Ist hier noch etwas los? Wie kann ich Code erhalten, der auf meinem lokalen Computer funktioniert, aber nicht auf einem Cluster?
Entschuldigung, ich kann nicht herausfinden, wo Sie Ihre Centroids in Ihrem Code aktualisieren .. können Sie dies bitte zu mir markieren? Danke – mgaido
Danke für das Betrachten. Es ist in Richtung der Unterseite. 'centroids = newCentroids'. –
Ich würde mehr motiviert sein, Ihre Frage zu beantworten, wenn Sie Ihren Code bereinigt/verkleinert haben, und, noch wichtiger, ein Beispiel der Daten bereitgestellt haben, mit denen Ihr Skript unterschiedliche Ausgaben im Cluster liefert. –