2016-06-03 5 views
3

Mein Szenario wird möglicherweise anhand eines Beispiels näher erläutert. Sagen wir, ich folgende Daten hatte:Hinzufügen einer zusätzlichen Spalte, die den Unterschied zwischen dem nächsten Unterschied einer vorherigen Spalte darstellt

Type Time A 1 B 3 A 5 B 9

ich eine zusätzliche Spalte jeder Zeile hinzufügen möchten, die den minimalen absoluten Wertdifferenz zwischen allen Spalten des gleichen Typs darstellt. Für die erste Zeile ist die minimale Differenz zwischen allen Zeiten vom Typ A 4, also wäre der Wert 4 für die Spalten 1 und 3 und ebenfalls 6 für die Spalten 2 und 4.

Ich mache das in Spark und Spark SQL, so wäre die Anleitung dort nützlicher, aber wenn es durch einfaches SQL erklärt werden müsste, wäre das auch eine große Hilfe.

Antwort

1

Ein möglicher Ansatz ist die Verwendung von Fensterfunktionen.

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{lag, min, abs} 

val df = Seq(
    ("A", -10), ("A", 1), ("A", 5), ("B", 3), ("B", 9) 
).toDF("type", "time") 

Zuerst durch die Zeit zwischen aufeinanderfolgenden Reihen sortiert bestimmen Differenz lässt:

// Partition by type and sort by time 
val w1 = Window.partitionBy($"Type").orderBy($"Time") 

// Difference between this and previous 
val diff = $"time" - lag($"time", 1).over(w1) 

Dann Minimum über alle Differentiale für eine bestimmte Art finden:

// Partition by time unordered and take unbounded window 
val w2 = Window.partitionBy($"Type").rowsBetween(Long.MinValue, Long.MaxValue) 

// Minimum difference over type 
val minDiff = min(diff).over(w2) 

df.withColumn("min_diff", minDiff).show 


// +----+----+--------+ 
// |type|time|min_diff| 
// +----+----+--------+ 
// | A| -10|  4| 
// | A| 1|  4| 
// | A| 5|  4| 
// | B| 3|  6| 
// | B| 9|  6| 
// +----+----+--------+ 

Wenn Ihr Ziel ist ein zu finden ist Mindestabstand zwischen der aktuellen Zeile und jeder anderen Zeile in einer Gruppe können Sie einen ähnlichen Ansatz verwenden

import org.apache.spark.sql.functions.{lead, when} 

// Diff to previous 
val diff_lag = $"time" - lag($"time", 1).over(w1) 

// Diff to next 
val diff_lead = lead($"time", 1).over(w1) - $"time" 

val diffToClosest = when(
    diff_lag < diff_lead || diff_lead.isNull, 
    diff_lag 
).otherwise(diff_lead) 

df.withColumn("diff_to_closest", diffToClosest) 

// +----+----+---------------+ 
// |type|time|diff_to_closest| 
// +----+----+---------------+ 
// | A| -10|    11| 
// | A| 1|    4| 
// | A| 5|    4| 
// | B| 3|    6| 
// | B| 9|    6| 
// +----+----+---------------+ 
0

Sie sollten so etwas wie dies versuchen:

val sc: SparkContext = ... 
val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._ 

val input = sc.parallelize(Seq(
    ("A", 1), 
    ("B", 3), 
    ("A", 5), 
    ("B", 9) 
)) 

val df = input.groupByKey().flatMap { case (key, values) => 
    val smallestDiff = values.toList.sorted match { 
    case firstMin :: secondMin :: _ => secondMin - firstMin 
    case singleVal :: Nil => singleVal // Only one record for some `Type` 
    } 

    values.map { value => 
    (key, value, smallestDiff) 
    } 
}.toDF("Type", "Time", "SmallestDiff") 

df.show() 

Ausgang:

+----+----+------------+ 
|Type|Time|SmallestDiff| 
+----+----+------------+ 
| A| 1|   4| 
| A| 5|   4| 
| B| 3|   6| 
| B| 9|   6| 
+----+----+------------+ 
1

in SQL Server getestet 2008

Tabelle d ( Typ varchar (25) erstellen, Zeit int )

insert into d 
values ('A',1), 
('B',3), 
('A',5), 
('B',9) 

--solution one, calculation in query, might not be smart if dataset is large. 
select * 
, (select max(time) m from d as i where i.type = o.type) - (select MIN(time) m from d as i where i.type = o.type) dif 
from d as o 

--or this 
select d.*, diftable.dif from d inner join 
(select type, MAX(time) - MIN(time) dif 
from d group by type) as diftable on d.type = diftable.type 
Verwandte Themen