2017-05-04 2 views
0

Mein Code folgen grundsätzlich die offiziellen Tutorials und der Hauptzweck ist es, alle Nachrichten von einem Abonnement (Constants.UNFINISHEDSUBID) zu sammeln und sie auf einem anderen veröffentlichen. Aber zur Zeit habe ich ein Problem, das ich nicht lösen kann. In meinem Implementierung Aufruf subscriber.stopAsync() führt zu folgenden Ausnahme:Subscriber.stopAsync() Ergebnisse in RejectedExecutionException

Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener 
SCHWERWIEGEND: RuntimeException while executing runnable [email protected] with executor j[email protected]2f3c6ac4 
java.util.concurrent.RejectedExecutionException: Task java.[email protected]60d40af2 rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320] 
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) 
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) 
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) 
    at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817) 
    at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753) 
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613) 
    at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458) 
    at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437) 
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428) 
    at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431) 
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546) 
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52) 
    at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Ich habe auch bemerkt, dass die Art von Zufall, manchmal alle Nachrichten und manchmal nur wenige oder keine einziger gesammelt bekommen. Ruft subscriber.stopAsync() nicht den richtigen Weg?

Meine aktuelle Implementierung:

protected void pullUnfinished() throws Exception { 
    List<PubsubMessage> jobsToRepublish = new ArrayList<>(); 
    SubscriptionName subscription = 
      SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID); 

    MessageReceiver receiver = new MessageReceiver() { 
     @Override 
     public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { 
      synchronized(jobsToRepublish){ 
       jobsToRepublish.add(message); 
      } 
      String unfinishedJob = message.getData().toStringUtf8(); 
      LOG.info("got message: {}", unfinishedJob); 
      consumer.ack(); 
     } 
    }; 

    Subscriber subscriber = null; 
    try { 
     ChannelProvider channelProvider = new PlainTextChannelProvider(); 
     subscriber = Subscriber.defaultBuilder(subscription, receiver) 
           .setChannelProvider(channelProvider) 
           .build(); 
     subscriber.addListener(new Subscriber.Listener() { 
      @Override 
      public void failed(Subscriber.State from, Throwable failure) { 
       System.err.println(failure); 
      } 
     }, MoreExecutors.directExecutor()); 
     subscriber.startAsync().awaitRunning(); 
     Thread.sleep(60000); 
    } finally { 
     if (subscriber != null) { 
      subscriber.stopAsync(); //Causes the exception 
     } 
    } 
    publishJobs(jobsToRepublish); 
} 

public class PlainTextChannelProvider implements ChannelProvider { 

    @Override 
    public boolean shouldAutoClose() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

    @Override 
    public boolean needsExecutor() { 
     // TODO Auto-generated method stub 
     return false; 
    } 

     @Override 
     public ManagedChannel getChannel() throws IOException { 
     return NettyChannelBuilder.forAddress("localhost", 8085) 
      .negotiationType(NegotiationType.PLAINTEXT) 
      .build(); 
     } 

     @Override 
     public ManagedChannel getChannel(Executor executor) throws IOException { 
     return getChannel(); 
     } 
} 
+0

Versuchen Sie, ein Signal zum Stoppen hinzuzufügen. Siehe Beispielcode in ['startSync'] (http://googlecloudplatform.github.io/google-cloud-java/0.9.4/apidocs/com/google/cloud/pubsub/spi/v1/Subscriber.html#stopAsync -) um zu sehen, wie es umgesetzt wird. Wie in diesem [SO-Beitrag] (http://stackoverflow.com/a/19006386/5995040) angegeben, wird RejectedExecutionException verursacht, weil die Warteschlange voll ist und Sie keine weiteren Threads hinzufügen können oder der ThreadPool heruntergefahren wurde. Überprüfen Sie die Code-Implementierung. Hoffe das hilft. –

+0

versuchte dies bereits, ändert nichts für mich Ich bekomme immer noch die gleiche rejectedExecutionException. Ich habe versucht, diese [Schnipsel] (https://github.com/GoogleCloudPlatform/google-cloud-java/blob/d476ef7904467233e83168b8d1f5a934a0aae711/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java) und habe immer noch die Ausnahme, aber es scheint, als ob ich die Nachrichten erhalte, auch wenn die Ausnahme mich irritiert, irgendwelche anderen Ideen? –

+0

Ich habe genau das gleiche Problem mit subscriber.stopAsync(); Mit der neuesten Bibliothek, hast du das herausgefunden? – hubpixel

Antwort

0

hatte ich genau die gleiche Problem, wenn ähnlichen Code aus einem JUnit-Test ausgeführt wird, und fand diesen related answer im Allgemeinen auf Multithreading, was darauf hindeutet, dass ein Threadpool geschlossen wird, während Hörer noch beziehst zu ihm. Ich habe auch den Code von Subscriber.java auf GitHub untersucht und ein Beispiel für den Empfang einer Anzahl von Nachrichten im JavaDoc auf startAsync() gefunden, was darauf hindeutet, dass ich auf die Beendigung von stopAsync() warten muss.

Try Ändern

subscriber.stopAsync(); 

zu

subscriber.stopAsync().awaitTerminated(); 

für mich gearbeitet.