2017-04-18 4 views
0

Ich habe einen Dolch-Singleton-Wrapper, der meine grundlegenden Realm-Anfragen behandelt. Einer davon sieht so aus:Realm - Implementieren der asynchronen Warteschlange

public void insertOrUpdateAsync(final List<RealmMessage> messages, @Nullable final OnInsertListener listener) { 
    Realm instance = getRealmInstance(); 
    instance.executeTransactionAsync(realm -> { 
       List<RealmMessage> newMessages = insertOrUpdateMessages(realm, messages); 
      }, 
      () -> success(listener, instance), 
      error -> error(listener, error, instance)); 
} 

private List<RealmMessage> insertOrUpdateMessages(@NonNull Realm realm, @NonNull final List<RealmMessage> messages) { 
    ... 
    return realm.copyToRealmOrUpdate(unattendedMessages); 
} 

Das funktioniert gut.

Allerdings gibt es eine Ecke Fall, wo - lange Geschichte kurz - ich starte insertOrUpdateAsynch() viele, viele Male. Und nach einigen Anfragen, die ich diese:

Caused by: java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 17, active threads = 17, queued tasks = 100, completed tasks = 81]

Meine Frage ist: Wie soll ich damit umgehen, ohne ganzen Anwendungsfluss wieder aufzubauen. Meine Idee war es, eingehende Anfragen über RxJava in die Warteschlange zu stellen. Habe ich recht? Welche Betreiber sollte ich berücksichtigen und weiterbilden?

Oder nähere ich mich dem völlig falsch? Von den meisten meiner googling habe ich festgestellt, dass das Problem meist in Start-Methode wie meins in einer Schleife ist. Ich benutze keine. In meinem Fall besteht das Problem darin, dass diese Methode von mehreren Antworten gestartet wird und das Ändern aufgrund der aktuellen Backend-Implementierung unmöglich ist.

Antwort

0

Wenn Sie Ihre Anwendung nicht umgestalten möchten, können Sie einen Zähl-Semaphor verwenden. Sie werden sehen, dass zwei Threads sofort die Sperre erhalten. Der andere Thread blockiert, bis ein Anruf eine Sperre aufgibt. Es wird nicht empfohlen, acquire() ohne Timeout zu verwenden.

Um RxJava verwenden zu können, müssten Sie das Design Ihrer Anwendung ändern und die Geschwindigkeitsbegrenzung in RxJava ist nicht so einfach, weil es sich um eine Vollversion handelt.

private final Semaphore semaphore = new Semaphore(2); 

@Test 
public void name() throws Exception { 
    Thread t1 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t2 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 
    Thread t3 = new Thread(() -> { 
     doNetworkStuff(); 
    }); 

    t1.start(); 
    t2.start(); 
    t3.start(); 

    Thread.sleep(1500); 
} 

private void doNetworkStuff() { 
    try { 
     System.out.println("enter doNetworkStuff"); 

     semaphore.acquire(); 

     System.out.println("acquired"); 

     Thread.sleep(1000); 

    } catch (InterruptedException e) { 
     e.printStackTrace(); // Don't do this!! 
    } finally { 
     semaphore.release(); 
    } 
} 
Verwandte Themen