2017-04-02 5 views
4

Jetzt habe ich eine Observable<Observable<Integer>, wie kann ich es in Observable<int[]>, die das n-ary kartesische Produkt enthält übertragen?n-ary Kartesisches Produkt inRxJava

Zum Beispiel:

Observable<Observable<Integer> ob = Observable.just(
    Observable.just(0,1), 
    Observable.just(2,3), 
    Observable.just(4,5) 
); 
ob...... -> (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5) 

Antwort

1

Zu allererst Sie eine feste Anzahl von Eingangs Observable s benötigen. Zweitens gibt es keine Notwendigkeit zum Blockieren, aber es besteht wahrscheinlich die Notwendigkeit zum Zwischenspeichern, da die 2., 3. usw. Observable s mehrmals verbraucht werden müssen.

import java.util.*; 

import io.reactivex.Observable; 

public class Cartesian { 

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) { 
     return sources.toList().flatMapObservable(list -> cartesian(list)); 
    } 

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) { 
     if (sources.size() == 0) { 
      return Observable.<int[]>empty(); 
     } 
     Observable<int[]> main = sources.get(0).map(v -> new int[] { v }); 

     for (int i = 1; i < sources.size(); i++) { 
      int j = i; 
      Observable<Integer> o = sources.get(i).cache(); 
      main = main.flatMap(v -> { 
       return o.map(w -> { 
        int[] arr = Arrays.copyOf(v, j + 1); 
        arr[j] = w; 
        return arr; 
       }); 
      }); 
     } 

     return main; 
    } 

    public static void main(String[] args) { 
     cartesian(Observable.just(
      Observable.just(0, 1), 
      Observable.just(2, 3), 
      Observable.just(4, 5) 
     )) 
     .subscribe(v -> System.out.println(Arrays.toString(v))); 
    } 
} 
0

cartesianischen Produkt in einer asynchronen Art und Weise zu schaffen, ist hart oder in einem gewissen Sinne ist unmöglich. Und wenn Blockierung in Ordnung ist, können Sie so etwas wie dieses

public class Main 
{ 

    static class ProductIterator<T> implements Iterator<T[]> 
    { 
     private final List<List<T>> componentsList; 
     private final Class<T> componentClass; 
     private final int[] indices; 
     private boolean hasNext; 

     public ProductIterator(List<List<T>> componentsList, Class<T> componentClass) 
     { 
      this.componentsList = componentsList; 
      this.componentClass = componentClass; 
      this.indices = new int[componentsList.size()]; 
      this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 
     } 

     @Override 
     public boolean hasNext() 
     { 
      return hasNext; 
     } 

     @Override 
     public T[] next() 
     { 
      T[] res = (T[]) Array.newInstance(componentClass, componentsList.size()); 
      for (int i = 0; i < componentsList.size(); i++) 
      { 
       res[i] = componentsList.get(i).get(indices[i]); 
      } 

      // move next 
      indices[0]++; 
      for (int i = 0; i < componentsList.size() - 1; i++) 
      { 
       if (indices[i] == componentsList.get(i).size()) 
       { 
        indices[i] = 0; 
        indices[i + 1]++; 
       } 
      } 
      hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 

      return res; 
     } 
    } 

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass) 
    { 
     return Observable.fromIterable(new Iterable<T[]>() 
     { 
      @Override 
      public Iterator<T[]> iterator() 
      { 
       // postpone blocking up until iterator is requested 
       // and by this point we can't postpone anymore 
       Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList(); 
       return new ProductIterator<T>(componentsList.blockingGet(), componentClass); 
      } 
     }); 
    } 

    public static void main(String[] args) throws Exception 
    { 
     Observable<Observable<Integer>> ob = Observable.just(
       Observable.just(0, 1), 
       Observable.just(2, 3), 
       Observable.just(4, 5) 
     ); 

     Observable<Integer[]> product = product(ob, Integer.class); 
     product.forEach(a -> System.out.println(Arrays.toString(a))); 
    } 
} 

tun Es ist möglich, diesen Code zu verbessern, nicht blockiert werden, aber Sie werden immer noch alle Ergebnisse aus allen Observable s-Cache müssen und Code wird viel komplizierter. Wahrscheinlich ist Blockieren für Sie nicht akzeptabel, der Versuch, ein kartesisches Produkt zu bekommen, ist sowieso eine schlechte Idee.

0

Nun, ich kann es selbst lösen. Aber gibt es einen eleganteren Weg?
(Die toArray Methode konvertieren ein Observable<T> zu T[])

Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) { 
     List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last(); 
     return Observable.create(new SyncOnSubscribe<int[], int[]>() { 
      @Override 
      protected int[] generateState() { 
       int[] array = new int[list.size()]; 
       Arrays.fill(array, 0); 
       return array; 
      } 

      @Override 
      protected int[] next(int[] state, Observer<? super int[]> observer) { 
       int[] next = new int[list.size()]; 
       for (int i = 0; i < next.length; i++) { 
        next[i] = list.get(i)[state[i]]; 
       } 
       observer.onNext(next); 
       state[state.length - 1]++; 
       for (int i = state.length - 1; i >= 0; i--) { 
        int delta = list.get(i).length - state[i]; 
        if (delta > 0) { 
         break; 
        } else if (delta == 0) { 
         state[i] = 0; 
         if (i == 0) { 
          observer.onCompleted(); 
          break; 
         } 
         state[i - 1]++; 
        } 
       } 
       return state; 
      } 
     }); 
    } 
Verwandte Themen