2017-02-21 3 views
0

Hallo Ich versuche herauszufinden, ob es eine Entsprechung zum RxJs-Operator gibt, zip in Xstream, oder zumindest eine Möglichkeit, das gleiche Verhalten zu erhalten. Für den Fall, dass jemand Klarheit über den Unterschied benötigt, werden die folgenden Marmor-Diagramme angezeigt.RxJs Zip-Operator entspricht in xstream?

zip in rxjs 
    |---1---2---3-----------5-> 
    |-a------b------c---d-----> 
      "zip" 
    |-1a----2b------3c-----5d-> 


whereas 'combineLatest' aka 'combine' in xstream does 

    |---1---2----------4---5-> 
    |----a---b---c---d-------> 
      "combine" 
    |-1a----2a-2b-2c-2d-4d-5d> 

Jede Hilfe wird geschätzt, da ich sehr neu in der Programmierung mit Streams bin. Vielen Dank im Voraus!

Antwort

1

Ich brauchte auch einen Zip-Operator für Xstream. Also habe ich meine eigenen aus bestehenden Betreibern erstellt. Es benötigt eine beliebige Anzahl von Streams zum Zippen.

function zip(...streams) { 
    // Wrap the events on each stream with a label 
    // so that we can seperate them into buckets later. 
    const streamsLabeled = streams 
    .map((stream$, idx) => stream$.map(event => ({label: idx + 1, event: event}))); 
    return (event$) => { 
    // Wrap the events on each stream with a label 
    // so that we can seperate them into buckets later. 
    const eventLabeled$ = event$.map(event => ({label: 0, event: event})); 
    const labeledStreams = [eventLabeled$, ...streamsLabeled]; 

    // Create the buckets used to store stream events 
    const buckets = labeledStreams.map((stream, idx) => idx) 
     .reduce((buckets, label) => ({...buckets, [label]: []}), {}); 

    // Initial value for the fold operation 
    const accumulator = {buckets, tuple: []}; 

    // Merge all the streams together and accumulate them 
    return xs.merge(...labeledStreams).fold((acc, event) => { 
     // Buffer the events into seperate buckets 
     acc.buckets[event.label].push(event); 

     // Does the first value of all the buckets have something in it? 
     // If so, then there is a complete tuple. 
     const tupleComplete = Object.keys(acc.buckets) 
     .map(key => acc.buckets[key][0]) 
     .reduce((hadValue, value) => value !== undefined 
      ? true && hadValue 
      : false && hadValue, 
     true); 

     // Save completed tuple and remove it from the buckets 
     if (tupleComplete) { 
     acc.tuple = [...Object.keys(acc.buckets).map(key => acc.buckets[key][0].event)]; 
     Object.keys(acc.buckets).map(key => acc.buckets[key].shift()); 
     } else { 
     // Clear tuple since all columns weren't filled 
     acc.tuple = []; 
     } 

     return {...acc}; 
    }, accumulator) 

    // Only emit when we have a complete tuple 
    .filter(buffer => buffer.tuple.length !== 0) 

    // Just return the complete tuple 
    .map(buffer => buffer.tuple); 
    }; 
} 

Dies kann mit komponieren verwendet werden.

foo$.compose(zip(bar$)).map(([foo, bar]) => doSomething(foo, bar))