2017-06-01 5 views
0

Ich brauche Hilfe beim Lesen von einem lokalen Verzeichnis bei der Ausführung von Kmeans Streaming mit Pyspark. Es gibt keine gute Antwort zu diesem Thema auf StackoverflowWie aus dem lokalen Verzeichnis zu lesen, kmeans streaming pyspark

Hier ist mein Code

if __name__ == "__main__": 
    ssc = StreamingContext(sc, 1) 

    training_data_raw, training_data_df = prepare_data(TRAINING_DATA_SET) 
    trainingData = parse2(training_data_raw) 

    testing_data_raw, testing_data_df = prepare_data(TEST_DATA_SET) 
    testingData = testing_data_raw.map(parse1) 

    #print(testingData) 
    trainingQueue = [trainingData] 
    testingQueue = [testingData] 

    trainingStream = ssc.queueStream(trainingQueue) 
    testingStream = ssc.queueStream(testingQueue) 

    # We create a model with random clusters and specify the number of clusters to find 
    model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) 

    # Now register the streams for training and testing and start the job, 
    # printing the predicted cluster assignments on new data points as they arrive. 
    model.trainOn(trainingStream) 

    result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features))) 
    result.pprint() 
    ssc.textFileStream('file:///Users/userrname/PycharmProjects/MLtest/training/data/') 
    ssc.start() 
    ssc.awaitTermination() 

Dank !!

Antwort

1
from pyspark.mllib.linalg import Vectors 
trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) 

für Testbeispiele

from pyspark.mllib.regression import LabeledPoint 
def parse(lp): 
    label = float(lp[lp.find('(') + 1: lp.find(',')]) 
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) 
    return LabeledPoint(label, vec) 
testData = ssc.textFileStream("/testing/data/dir").map(parse) 
Verwandte Themen