2016-04-06 12 views
0

Ich konvertiere einen einfachen R-Code in SparkR, um Spark effizient zu nutzen.Wie finde ich die Länge einer Spalte in SparkR

Ich habe die folgende Spalte CloseDate.

CloseDate 
2011-01-08 
2011-02-07 
2012-04-07 
2013-04-18 
2011-02-07 
2010-11-10 
2010-12-09 
2013-02-18 
2010-12-09 
2011-03-11 
2011-04-10 
2013-06-19 
2011-04-10 
2011-01-06 
2011-02-06 
2013-04-16 
2011-02-06 
2015-09-25 
2015-09-25 
2010-11-10 

Ich möchte die Anzahl der Zeit zählen, die dieses Datum erhöht | verringert wurde. Ich habe den untenstehenden R-Code, um das zu tun.

dateChange <- function(closeDate, dir){ 
    close_dt <- as.Date(closeDate) 
    num_closedt_out = 0 
    num_closedt_in = 0 

    for(j in 1:length(close_dt)) 
    { 
    curr <- close_dt[j] 
    if (j > 1) 
     prev <- close_dt[j-1] 
    else 
     prev <- curr 
    if (curr > prev){ 
     num_closedt_out = num_closedt_out + 1 
    } 
    else if (curr < prev){ 
     num_closedt_in = num_closedt_in + 1 
    } 
    } 
    if (dir=="inc") 
    ret <- num_closedt_out 
    else if (dir=="dec") 
    ret <- num_closedt_in 
    ret 
} 

Ich habe versucht, SparkR df $ col hier zu verwenden. Da Spark den Code langsam ausführt, habe ich während dieser Ausführung den Wert der Länge nicht erhalten und einen NaN-Fehler erhalten.

Hier ist der modifizierte Code, den ich ausprobiert habe.

DateDirChanges <- function(closeDate, dir){ 
    close_dt <- to_date(closeDate) 
    num_closedt_out = 0 
    num_closedt_in = 0 

    col_len <- SparkR::count(close_dt) 
    for(j in 1:col_len) 
    { 
    curr <- close_dt[j] 
    if (j > 1) 
     prev <- close_dt[j-1] 
    else 
     prev <- curr 
    if (curr > prev){ 
     num_closedt_out = num_closedt_out + 1 
    } 
    else if (curr < prev){ 
     num_closedt_in = num_closedt_in + 1 
    } 
    } 
    if (dir=="inc") 
    ret <- num_closedt_out 
    else if (dir=="dec") 
    ret <- num_closedt_in 
    ret 
} 

Wie kann ich die Länge einer Spalte während der Ausführung dieses Codes erhalten? Oder gibt es sonst noch etwas Besseres?

Antwort

2

Sie können nicht, weil Column einfach keine Länge hat. Im Gegensatz zu dem, was Sie in R erwarten dürfen, repräsentieren Spalten keine Daten, sondern SQL-Ausdrücke und spezifische Datentransformationen. Außerdem ist die Reihenfolge der Werte in Spark DataFrame beliebig, so dass Sie sich nicht einfach umsehen können.

Wenn Daten wie in Ihrer vorherigen Frage partitioniert werden können, können Sie Fensterfunktionen im selben Beispiel verwenden, wie ich es gezeigt habe in the answer to your previous question. Ansonsten gibt es keine effiziente Möglichkeit, dies allein mit SparkR zu handhaben.

Angenommen, es ist ein Weg, um (erforderlich) zu bestimmen und Sie können Ihre Daten partitionieren (gewünschte angemessene Leistung zu bekommen) alles, was Sie brauchen so etwas wie das ist:

SELECT 
    CAST(LAG(CloseDate, 1) OVER w > CloseDate AS INT) gt, 
    CAST(LAG(CloseDate, 1) OVER w < CloseDate AS INT) lt, 
    CAST(LAG(CloseDate, 1) OVER w = CloseDate AS INT) eq 
FROM DF 
WINDOW w AS (
    PARTITION BY partition_col ORDER BY order_col 
) 
+0

Ich glaube, ich die Daten partitionieren können als Gut. Aber da benutzen wir datediff, um die gewünschte Ausgabe zu erhalten. Aber hier muss ich eine benutzerdefinierte Funktion schreiben. Wie sollte es überprüfen, ob der Wert von seiner LAG erhöht oder verringert und es sollte nur die Anzahl der Male, die der Wert erhöht oder verringert. Diese benutzerdefinierte Funktion muss also die Daten lesen, um eine neue Spalte zu erstellen. Gibt es eine Möglichkeit, das zu tun? – sag

+0

Solange Sie eine Möglichkeit haben, die Reihenfolge (erforderlich) und Partitionierung (für die Leistung) zu bestimmen, ist es ziemlich einfach. – zero323

+0

Dies ist genau der vorherigen Frage ähnlich. Ich habe gerade in TempTable bestehen und den Lag bekommen. Dort benutzen wir datediff, um den Unterschied zu bekommen. Hier müssen wir etwas wie getIncrementCount (df $ closeDate, df $ lagCloseDate) schreiben. In dieser Funktion muss ich eine Zählung durchlaufen und aufrechterhalten. Diese Anzahl muss jedes Mal um eins erhöht werden, wenn das closeDate mehr als lagCloseDate ist. Ich habe einige der Standardfunktionen von SparkR erwähnt, aber alle rufen ein Java auf, um das zu tun. Ist es möglich, R? Entschuldigung, wenn die Frage zu dumm ist. Ich bin sehr neu in R und SparkR – sag

Verwandte Themen