2017-01-05 2 views
2

Kann ein DStreamtype parameter s haben?Typ-parametriere einen DStream

Wenn ja, wie?

Wenn ich versuche, lazy val qwe = mStream.mapWithState(stateSpec) auf myDStream: DStream[(A, B)] (Klasse-Parametern), erhalte ich:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)] 
    lazy val qwe = mStream.mapWithState(stateSpec) 

Antwort

2

wesentliche Teilmenge des Spark-API ClassTags (siehe Scala: What is a TypeTag and how do I use it?) implizite erfordert und PairDStreamFunctions.mapWithState ist nicht anders. Prüfen class definition:

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) 

and:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType] 
): MapWithStateDStream[K, V, StateType, MappedType] = { 
    ... 
} 

Wenn eine Funktion erstellen möchten, die auf einem generischen Paar Strömen arbeitet und verwendet mapWithState Sie mindestens ClassTags für KeyType und ValueType Typen bieten sollte:

def foo[T : ClassTag, U : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f) 

Wenn StateType und MappedType sind auch parametrisiert Sie benötigen ClassTags für diese auch:

def bar[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f) 
Verwandte Themen