2016-03-24 6 views
1

Ich versuche, eine Anzahl von Objekten zu einem Scala ListBuffer in einer Schleife hinzuzufügen, aber jedes Mal, wenn ich eins hinzufüge, verschwindet es bei der nächsten Iteration der Schleife.Scala - ListBuffer leert sich nach jedem Hinzufügen in Schleife

Wenn ich den Inhalt des ListBuffer drucken, bevor und nachdem er einen neuen Eintrag hinzufügen, erhalte ich die folgende Ausgabe:

Vor add: ListBuffer()

Nach Add: ListBuffer (com.me .FeatureV2 @ 20d953ba)

Vor add: ListBuffer()

Nach Add: ListBuffer ([email protected])

Vor add: ListBuffer()

Nach Add: ListBuffer ([email protected])

Code:

def generateStatistics(df: DataFrame): List[FeatureV2] = { 
    var features = ListBuffer[FeatureV2]() 
    val dataColumn = "data" 
    for (field <- df.schema.fieldNames){ 
     val columnType: String = df.select(field).dtypes(0)._2 

     if (columnType == StringType.toString){ 
     val statsDf: DataFrame = getStats(df, field, dataColumn) 
     for (row <- statsDf){ 
      println("Before add: " + features) 
      val feature = new FeatureV2() 
      feature.element = row.getString(0) 
      feature.count = row.getLong(1) 
      feature.sum = row.getDouble(2) 
      feature.max = row.getDouble(3) 
      feature.min = row.getDouble(4) 
      feature.feature = field 
      features += feature 
      println("After add: " + features) 
     } 
     } 
    } 
    features.toList 
    } 

Gelegentlich aber ich folgendes:

Vor add: ListBuffer()

Nach Add: ListBuffer ([email protected])

Vor add: ListBuffer ([email protected])

Nach Add: ListBuffer ([email protected], com .me.FeatureV2 @ 4b0df9e5)

Vor add: ListBuffer()

Nach Add: ListBuffer ([email protected])

Das sieht aus wie es tatsächlich popula ist den ListBuffer, aber es wird gelöscht. Etwas mit Müllabfuhr zu tun?

+0

Sind Sie sicher, dass Sie '' generateStatistics'' nur einmal aufrufen? –

+0

@JeanLogeart Ja, definitiv. – karoma

+1

Bei einer Randnotiz, verwenden Sie bitte ListBuffer als val und fügen Sie es mit + = ([siehe hier] (http://docs.scala-lang.org/overviews/collections/concrete-mutable-collection-classes.html) an)). Das ist nur seltsam und schwer zu verstehen, weil Sie die Objektreferenz ändern. – slouc

Antwort

3

Versuchen Sie, for (row <- statsDf) zu for (row <- statsDf.collect()) zu ändern.

Wenn Ihr Problem dadurch behoben wird, können Ihre Probleme durch die Tatsache verursacht werden, dass foreach in einem oder mehreren Threads ausgeführt wird.

for (row <- stadsDf) ist die tatsächlich Aufruf DataFrame.foreach(f: Row => Unit), die ein verteiltes ist foreach wo f auf einer beliebigen Anzahl von Threads oder Maschinen ausgeführt werden können auf Ihrem Spark-Master abhängig.

2

Eine Spark-Anwendung besteht aus einem Treiber und einem Executor. Sie steuern und erstellen Dinge aus dem Driver - die Executoren erhalten Kopien von Variablen, die im Bereich waren. So erhalten die Executors Kopien der ListBuffer. Sie hängen an ihre Kopien an, die beim Abschluss der Aufgabe verloren gehen.

Sie könnten collect() verwenden, um die Daten in den Treiber zu ziehen, um dort an die ListBuffer anzuhängen, oder Broadcast-Variablen zu verwenden.

Siehe the documentation für eine Diskussion.

0

ist die Sammlung veränderbar?

auch, wenn Sie Scala verwenden, sollte man sich bemühen FP zu machen.

df.schema.fieldNames.map {...} 

würde wahrscheinlich die Arbeit tun, die Sie brauchen. Und da Sie eine if haben, wäre vielleicht eine collect besser geeignet

Verwandte Themen