Ich brauchte in meinem Projekt etwas, das Anrufe zu meinem Server gruppieren würde, da es einige "Batch" -Routen gibt, die eine ID-Liste anstelle einer einfachen ID akzeptieren.Gleichzeitiger Zugriff auf Listen
Meine Idee war, etwas mit einem Pool zu bauen, der hin und wieder geleert wurde.
So erzeuge ich diese AbstractRepository
(nicht sicher, ob es gut genannt wird):
public abstract class AbstractRepository<T>{
protected Context c;
//interval between 2 queue emptying
private final int POOL_DELAY = 200;
protected int downloadingTaskCount = 0;
final protected ArrayMap<String, T> memCache = new ArrayMap<>();
final protected HashSet<String> queue = new HashSet<>();
final protected ArrayMap<String, List<FetchedInterface<T>>> callbackMap = new ArrayMap<>();
final protected List<PoolEmptiedInterface> emptinessWatchers = new ArrayList<>();
protected AbstractRepository(Context c) {
handler.postDelayed(downloadRoutine, POOL_DELAY);
this.c = c;
}
public void cache(String id) {
if (!memCache.containsKey(memCache)) {
synchronized (queue) {
queue.add(id);
}
}
}
public void getCache(String id, FetchedInterface<T> callback) {
if (memCache.containsKey(id)) {
callback.fetched(memCache.get(id));
} else {
synchronized (callbackMap) {
if (!callbackMap.containsKey(id)) {
callbackMap.put(id, new ArrayList<FetchedInterface<T>>());
}
callbackMap.get(id).add(callback);
}
synchronized (queue) {
queue.add(id);
}
}
}
public void getCacheIdObj(List<IdObject> idsObj, final ListFetchedInterface<T> callback) {
ArrayList<String> ids = new ArrayList<>();
for (IdObject idObj : idsObj) {
ids.add(idObj.getId());
}
getCache(ids, callback);
}
public void getCache(List<String> ids, final ListFetchedInterface<T> callback) {
final CountDownLatch countDownLatch = new CountDownLatch(ids.size());
final ArrayList<T> array = new ArrayList<>();
for (String id : ids) {
getCache(id, new FetchedInterface<T>() {
@Override
public void fetched(T item) {
array.add(item);
countDownLatch.countDown();
}
});
}
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
callback.fetched(array);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* Exists for threads that want to be notified that the user queue has been flushed.
*/
public void getNotifiedWhenQueueIsEmptied(PoolEmptiedInterface<T> callback) {
if (downloadingTaskCount == 0 && queue.isEmpty()) {
callback.poolEmpty();
} else {
synchronized (emptinessWatchers) {
emptinessWatchers.add(callback);
}
}
}
protected void doIt(
final HashSet<String> processingQueue) {
}
/**
* Pool Loop
*/
Handler handler = new Handler();
private Runnable downloadRoutine = new Runnable() {
@Override
public void run() {
if (!queue.isEmpty()) {
final HashSet<String> processingQueue = new HashSet<>();
synchronized (queue) {
processingQueue.addAll(queue);
queue.clear();
}
downloadingTaskCount++;
doIt(processingQueue);
}
handler.postDelayed(downloadRoutine, POOL_DELAY);
}
};
Und eines seiner Kinder UserRepository
public class UserRepository extends AbstractRepository<UserCache> {
private static volatile UserRepository instance;
public static UserRepository getInstance(Context c) {
synchronized (UserRepository.class) {
if (instance == null) {
instance = new UserRepository(c);
}
return instance;
}
}
private UserRepository(Context c) {
super(c);
}
@Override
protected void doIt(final HashSet<String> processingQueue) {
Api.getInstance().backend.getUsersCache(new IdListArguments(new ArrayList<>(processingQueue)))
.enqueue(new Callback<Map<String, UserCache>>() {
@Override
public void onResponse(Call<Map<String, UserCache>> call, Response<Map<String, UserCache>> responseParent) {
Map<String, UserCache> response = responseParent.body();
Iterator<String> it = processingQueue.iterator();
while (it.hasNext()) {
String id = it.next();
if (response.containsKey(id)) {
memCache.put(id, response.get(id));
if (callbackMap.containsKey(id)) {
for (FetchedInterface callback : callbackMap.get(id)) {
callback.fetched(response.get(id));
}
}
it.remove();
}
}
for (PoolEmptiedInterface watcher : emptinessWatchers) {
watcher.poolEmpty();
}
downloadingTaskCount--;
queue.addAll(processingQueue);
}
@Override
public void onFailure(Call<Map<String, UserCache>> call, Throwable t) {
queue.addAll(processingQueue);
}
});
}
}
Meine Ausnahme:
Java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListIterator.next(ArrayList.java:573)
at com.m360.android.domain_layer.interactors.MemberInteractor.constructPagerMemberUsers(MemberInteractor.java:116)
at com.m360.android.domain_layer.interactors.MemberInteractor.access$000(MemberInteractor.java:29)
at com.m360.android.domain_layer.interactors.MemberInteractor$2$1.fetched(MemberInteractor.java:64)
at com.m360.android.datalayer.repositories.AbstractRepository$2.run(AbstractRepository.java:99)
Und MemberInteractor enthält nur statische Methoden, der Absturz erscheint mit dem folgenden Befehl dank 2:
Es tut mir leid, es gibt eine Menge Code, aber ich denke, alles ist relevant für meine Frage.
Was passiert dort? Ich sehe das Problem nicht.
Humm Ihre Korrektur funktioniert zwar, aber ich lieber hätte synchronisiert die Modifizierungsmethode. aber ich sehe nicht, was es sein könnte. –