2016-06-30 4 views
4

Ich versuche, zwei parallele Anfragen mit Volley zu führen, um eine Datenbank zu aktualisieren (mit DBFlow). Eine Tabelle in der DB kann nur einmal ausgefüllt werden, wenn beide Anfragen gemacht und ihre Daten gespeichert wurden (FKs).Zip scheint nicht auf Observable zu warten, um Daten auszugeben

Angesichts der Beispiel, ich möchte beide Sektoren/Mitarbeiter holen/einfügen, um parallel zu laufen und dann, sobald die Einsätze abgeschlossen sind, um die Verträge zu speichern.

/** 
* Update the sectors, employees and their contracts 
* @return An Observable to watch for the process to complete 
* 
* sectorsFetch______sectorsInsert________contractsInsert 
* employeesFetch____employeesInsert___/ 
*/ 
public static Observable<Void> updateEverything() { 
    try { 
     Log.d(TAG, "Starting update..."); 
     Observable<JSONArray> employeesFetch = Observable.from(ForumAPI.getInstance().getEmployees()); 
     Observable<List<Contract>> employeesInsert = employeesFetch.flatMap(new Func1<JSONArray, Observable<List<Contract>>>() { 
      @Override 
      public Observable<List<Contract>> call(JSONArray employees) { 
       Log.d(TAG, "Employee list fetched"); 
       return saveEmployees(employees); 
      } 
     }); 

     Observable<JSONArray> sectorsFetch = Observable.from(ForumAPI.getInstance().getSectors()); 
     Observable<Void> sectorsInsert = sectorsFetch.flatMap(new Func1<JSONArray, Observable<Void>>() { 
      @Override 
      public Observable<Void> call(JSONArray sectors) { 
       Log.d(TAG, "Sector list fetched"); 
       return saveSectors(sectors); 
      } 
     }); 

     return Observable.zip(sectorsInsert, employeesInsert, new Func2<Void, List<Contract>, Void>() { 
      @Override 
      public Void call(Void aVoid, List<Contract> contracts) { 
       Log.d(TAG, "Sectors and employees saved. Saving contracts"); 
       return saveContracts(contracts); 
      } 
     }); 

    } catch (InterruptedException | ExecutionException e) { 
     Log.e(TAG, e.getMessage()); 
     return Observable.error(e); 
    } 
} 

Hinweis: ForumAPI getEmployees/Sektoren gibt Zukunft sowohl zurück.

Bellow ist die sichere Methoden.

/** 
* Parse and save an array of sectors 
* @param jsonSectors The array of sector to save 
* @return An Observable to watch for the process to complete. 
*/ 
private static Observable<Void> saveSectors(JSONArray jsonSectors) { 
    Log.d(TAG, "Transforming JSON sectors to object"); 
    List<Sector> sectList = new ArrayList<>(); 
    try { 
     for (int i = 0; i < jsonSectors.length(); i++) { 
      JSONObject jsonSect = jsonSectors.getJSONObject(i); 
      Sector sect = Sector.build(jsonSect); 
      sectList.add(sect); 
     } 
     Log.d(TAG, sectList.size() + " sectors fetched. Saving..."); 
     ForumDB.getDB().executeTransaction(
       FastStoreModelTransaction.insertBuilder(
         FlowManager.getModelAdapter(Sector.class) 
       ).addAll(sectList).build()); 

     Log.d(TAG, "Sector list saved"); 
    } catch (JSONException e) { 
     Log.e(TAG, "Unable to parse sector list. " + e.getMessage()); 
     return Observable.error(e); 
    } 
    return Observable.empty(); 
} 
/** 
* Parse and save an array of sectors 
* @param jsonSectors The array of sector to save 
* @return An Observable to watch for the process to complete. 
*/ 
private static Observable<Void> saveSectors(JSONArray jsonSectors) { 
    Log.d(TAG, "Transforming JSON sectors to object"); 
    List<Sector> sectList = new ArrayList<>(); 
    try { 
     for (int i = 0; i < jsonSectors.length(); i++) { 
      JSONObject jsonSect = jsonSectors.getJSONObject(i); 
      Sector sect = Sector.build(jsonSect); 
      sectList.add(sect); 
     } 
     Log.d(TAG, sectList.size() + " sectors fetched. Saving..."); 
     ForumDB.getDB().executeTransaction(
       FastStoreModelTransaction.insertBuilder(
         FlowManager.getModelAdapter(Sector.class) 
       ).addAll(sectList).build()); 

     Log.d(TAG, "Sector list saved"); 
    } catch (JSONException e) { 
     Log.e(TAG, "Unable to parse sector list. " + e.getMessage()); 
     return Observable.error(e); 
    } 
    return Observable.empty(); 
} 

/** 
* Parse and save an array of employees 
* @param jsonEmployees The array of employee to save 
* @return An Observable to watch for the process to complete. 
*/ 
private static Observable<List<Contract>> saveEmployees(JSONArray jsonEmployees) { 
    Log.d(TAG, "Transforming JSON employees to object"); 
    List<Person> empList = new ArrayList<>(); 
    List<Contract> contractList = new ArrayList<>(); 
    try { 
     for (int i = 0; i < jsonEmployees.length(); i++) { 
      JSONObject jsonEmp = jsonEmployees.getJSONObject(i); 
      Person emp = Person.build(jsonEmp); 
      empList.add(emp); 
      JSONArray jsonContracts = jsonEmp.getJSONArray("sectors"); 
      for (int j = 0; j < jsonContracts.length(); j++) { 
       Contract contract = new Contract(); 
       contract.setSectorId(jsonContracts.getJSONObject(j).getInt("id")); 
       contract.setPersonForumId(emp.getForumId()); 
       contractList.add(contract); 
      } 
     } 
     Log.d(TAG, empList.size() + " employees fetched. Saving..."); 
     ForumDB.getDB().executeTransaction(
       FastStoreModelTransaction.insertBuilder(
         FlowManager.getModelAdapter(Person.class) 
       ).addAll(empList).build()); 
     Log.d(TAG, "Employee list saved"); 
    } catch (JSONException e) { 
     Log.e(TAG, "Unable to parse employee list. " + e.getMessage()); 
     return Observable.error(e); 
    } 
    return Observable.just(contractList); 
} 

/** 
* Save a list of contract 
* @param contracts The list of contract to save 
* @return An Observable to watch for the process to complete. 
*/ 
private static Void saveContracts(List contracts) { 
    ForumDB.getDB().executeTransaction(
      FastStoreModelTransaction.insertBuilder(
        FlowManager.getModelAdapter(Contract.class) 
      ).addAll(contracts).build()); 
    Log.d(TAG, "Contract list saved"); 

    return null; 
} 

Das Problem ist, wenn auf dem abonnieren möchte, dass die globalen beobachtbaren von einer Android-Aktivität, OnCompleted meiner Beobachter direkt aufgerufen werden, nachdem sectorFetch Daten emittieren (weder sectorInsert noch meine zip genannt werden).

Bellow ist das Protokoll

D/com.xx.observable.DataUpdater: Starting update... 
D/com.xx.helper.ForumAPI: Requesting employee list 
D/com.xx.helper.ForumAPI: Request added to queue... 
D/com.xx.helper.ForumAPI: Requesting sector list 
D/com.xx.helper.ForumAPI: Request added to queue 
D/com.xx.observable.DataUpdater: Sector list fetched 
D/com.xx.observable.DataUpdater: Transforming JSON sectors to object 
D/com.xx.observable.DataUpdater: 8 sectors fetched. Saving... 
D/com.xx.observable.DataUpdater: Sector list saved 
D/com.xx.activity.Startup: onCompleted reached 

Ich kann nicht finden, was falsch ist. Ist eines meiner Observable etwas emittiert, so dass meine Zip aufgerufen wird, bevor es sollte?

Antwort

2

Die zip Dokumentation in 1.1.6 wurde aktualisiert, um diesen Fall zu beschreiben:

Der Betreiber abonniert hat seine Quellen, um vervollständigt sie spezifiziert und mit Spannung, wenn eine der Quellen kürzer als der Rest ist, während die anderen Quellen abbestellen. Daher ist es möglich, dass diese anderen Quellen niemals vollständig ausgeführt werden können (und somit nicht doOnCompleted() aufrufen. Dies kann auch passieren, wenn die Quellen genau gleich lang sind; wenn Quelle A vervollständigt und B verbraucht wurde und kurz vor dem Abschluss steht A, erkennt der Bediener nicht weiter Werte werden das Senden und es wird B sofort abmelden. Zum Beispiel:. zip(Arrays.asList(range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)), (a) -> a)action1 wird aber action2 wird nicht genannt werden

mit anderen Worten, zip nicht mit empty(). Sie können mit Observable.<Void>just(null) zippen und diese Schiene ignorieren.

+0

jeee .... kämpfte mit diesem stundenlang. Danke. Ich werde mehr auf den Javado konzentrieren c als nächstes das reactiveX doc. –

+0

Die ReactiveX-Site liegt außerhalb meiner direkten Kontrolle und ist ansonsten etwas langsamer, um Updates zu erhalten. – akarnokd

Verwandte Themen