2016-09-15 5 views
0

Ich brauchte in meinem Projekt etwas, das Anrufe zu meinem Server gruppieren würde, da es einige "Batch" -Routen gibt, die eine ID-Liste anstelle einer einfachen ID akzeptieren.Gleichzeitiger Zugriff auf Listen

Meine Idee war, etwas mit einem Pool zu bauen, der hin und wieder geleert wurde.

So erzeuge ich diese AbstractRepository (nicht sicher, ob es gut genannt wird):

public abstract class AbstractRepository<T>{ 

    protected Context c; 

    //interval between 2 queue emptying 
    private final int POOL_DELAY = 200; 
    protected int downloadingTaskCount = 0; 

    final protected ArrayMap<String, T> memCache = new ArrayMap<>(); 
    final protected HashSet<String> queue = new HashSet<>(); 

    final protected ArrayMap<String, List<FetchedInterface<T>>> callbackMap = new ArrayMap<>(); 

    final protected List<PoolEmptiedInterface> emptinessWatchers = new ArrayList<>(); 


    protected AbstractRepository(Context c) { 
     handler.postDelayed(downloadRoutine, POOL_DELAY); 
     this.c = c; 
    } 

    public void cache(String id) { 
     if (!memCache.containsKey(memCache)) { 
      synchronized (queue) { 
       queue.add(id); 
      } 
     } 
    } 

    public void getCache(String id, FetchedInterface<T> callback) { 

     if (memCache.containsKey(id)) { 
      callback.fetched(memCache.get(id)); 
     } else { 

      synchronized (callbackMap) { 
       if (!callbackMap.containsKey(id)) { 
        callbackMap.put(id, new ArrayList<FetchedInterface<T>>()); 
       } 
       callbackMap.get(id).add(callback); 
      } 

      synchronized (queue) { 
       queue.add(id); 
      } 

     } 
    } 

    public void getCacheIdObj(List<IdObject> idsObj, final ListFetchedInterface<T> callback) { 
     ArrayList<String> ids = new ArrayList<>(); 
     for (IdObject idObj : idsObj) { 
      ids.add(idObj.getId()); 
     } 
     getCache(ids, callback); 
    } 

    public void getCache(List<String> ids, final ListFetchedInterface<T> callback) { 
     final CountDownLatch countDownLatch = new CountDownLatch(ids.size()); 
     final ArrayList<T> array = new ArrayList<>(); 
     for (String id : ids) { 
      getCache(id, new FetchedInterface<T>() { 
       @Override 
       public void fetched(T item) { 
        array.add(item); 
        countDownLatch.countDown(); 
       } 
      }); 

     } 

     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        countDownLatch.await(); 
        callback.fetched(array); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

    } 


    /** 
    * Exists for threads that want to be notified that the user queue has been flushed. 
    */ 
    public void getNotifiedWhenQueueIsEmptied(PoolEmptiedInterface<T> callback) { 
     if (downloadingTaskCount == 0 && queue.isEmpty()) { 
      callback.poolEmpty(); 
     } else { 
      synchronized (emptinessWatchers) { 
       emptinessWatchers.add(callback); 
      } 
     } 
    } 

    protected void doIt(
      final HashSet<String> processingQueue) { 
    } 

    /** 
    * Pool Loop 
    */ 
    Handler handler = new Handler(); 

    private Runnable downloadRoutine = new Runnable() { 
     @Override 
     public void run() { 

      if (!queue.isEmpty()) { 
       final HashSet<String> processingQueue = new HashSet<>(); 

       synchronized (queue) { 
        processingQueue.addAll(queue); 
        queue.clear(); 
       } 
       downloadingTaskCount++; 
       doIt(processingQueue); 
      } 

      handler.postDelayed(downloadRoutine, POOL_DELAY); 
     } 
    }; 

Und eines seiner Kinder UserRepository

public class UserRepository extends AbstractRepository<UserCache> { 

    private static volatile UserRepository instance; 

    public static UserRepository getInstance(Context c) { 
     synchronized (UserRepository.class) { 
      if (instance == null) { 
       instance = new UserRepository(c); 
      } 
      return instance; 
     } 
    } 

    private UserRepository(Context c) { 
     super(c); 
    } 

    @Override 
    protected void doIt(final HashSet<String> processingQueue) { 
     Api.getInstance().backend.getUsersCache(new IdListArguments(new ArrayList<>(processingQueue))) 
       .enqueue(new Callback<Map<String, UserCache>>() { 
        @Override 
        public void onResponse(Call<Map<String, UserCache>> call, Response<Map<String, UserCache>> responseParent) { 
         Map<String, UserCache> response = responseParent.body(); 
         Iterator<String> it = processingQueue.iterator(); 
         while (it.hasNext()) { 

          String id = it.next(); 
          if (response.containsKey(id)) { 
           memCache.put(id, response.get(id)); 
           if (callbackMap.containsKey(id)) { 
            for (FetchedInterface callback : callbackMap.get(id)) { 
             callback.fetched(response.get(id)); 
            } 
           } 
           it.remove(); 
          } 
         } 

         for (PoolEmptiedInterface watcher : emptinessWatchers) { 
          watcher.poolEmpty(); 
         } 
         downloadingTaskCount--; 
         queue.addAll(processingQueue); 
        } 

        @Override 
        public void onFailure(Call<Map<String, UserCache>> call, Throwable t) { 
         queue.addAll(processingQueue); 
        } 
       }); 
    } 
} 

Meine Ausnahme:

Java.util.ConcurrentModificationException 
    at java.util.ArrayList$ArrayListIterator.next(ArrayList.java:573) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.constructPagerMemberUsers(MemberInteractor.java:116) 
    at com.m360.android.domain_layer.interactors.MemberInteractor.access$000(MemberInteractor.java:29) 
    at com.m360.android.domain_layer.interactors.MemberInteractor$2$1.fetched(MemberInteractor.java:64) 
    at com.m360.android.datalayer.repositories.AbstractRepository$2.run(AbstractRepository.java:99) 

Und MemberInteractor enthält nur statische Methoden, der Absturz erscheint mit dem folgenden Befehl dank 2:

Es tut mir leid, es gibt eine Menge Code, aber ich denke, alles ist relevant für meine Frage.

Was passiert dort? Ich sehe das Problem nicht.

Antwort

2

Sie erhalten eine ConcurrentModificationException zum Ändern einer Liste, die Sie durchlaufen (über die für jede Schleife).

So ist die einfache Lösung wäre in diesem Fall die foreach zu ändern, um eine Kopie der Liste zu machen, bevor Sie über sie iterieren:

for (UserCache item : new LinkedList<UserCache>(items)) { 
    users.add(new PagerMemberUser(item)); 
} 
+0

Humm Ihre Korrektur funktioniert zwar, aber ich lieber hätte synchronisiert die Modifizierungsmethode. aber ich sehe nicht, was es sein könnte. –

Verwandte Themen