Mein Code folgen grundsätzlich die offiziellen Tutorials und der Hauptzweck ist es, alle Nachrichten von einem Abonnement (Constants.UNFINISHEDSUBID) zu sammeln und sie auf einem anderen veröffentlichen. Aber zur Zeit habe ich ein Problem, das ich nicht lösen kann. In meinem Implementierung Aufruf subscriber.stopAsync() führt zu folgenden Ausnahme:Subscriber.stopAsync() Ergebnisse in RejectedExecutionException
Mai 04, 2017 4:59:25 PM com.google.common.util.concurrent.AbstractFuture executeListener
SCHWERWIEGEND: RuntimeException while executing runnable [email protected] with executor j[email protected]2f3c6ac4
java.util.concurrent.RejectedExecutionException: Task java.[email protected]60d40af2 rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 320]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:817)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:753)
at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:613)
at io.grpc.stub.ClientCalls$GrpcFuture.set(ClientCalls.java:458)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:437)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:428)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:76)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:514)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:431)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:546)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:52)
at io.grpc.internal.SerializingExecutor$TaskRunner.run(SerializingExecutor.java:152)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Ich habe auch bemerkt, dass die Art von Zufall, manchmal alle Nachrichten und manchmal nur wenige oder keine einziger gesammelt bekommen. Ruft subscriber.stopAsync() nicht den richtigen Weg?
Meine aktuelle Implementierung:
protected void pullUnfinished() throws Exception {
List<PubsubMessage> jobsToRepublish = new ArrayList<>();
SubscriptionName subscription =
SubscriptionName.create(Constants.PROJECTID, Constants.UNFINISHEDSUBID);
MessageReceiver receiver = new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
synchronized(jobsToRepublish){
jobsToRepublish.add(message);
}
String unfinishedJob = message.getData().toStringUtf8();
LOG.info("got message: {}", unfinishedJob);
consumer.ack();
}
};
Subscriber subscriber = null;
try {
ChannelProvider channelProvider = new PlainTextChannelProvider();
subscriber = Subscriber.defaultBuilder(subscription, receiver)
.setChannelProvider(channelProvider)
.build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
System.err.println(failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync(); //Causes the exception
}
}
publishJobs(jobsToRepublish);
}
public class PlainTextChannelProvider implements ChannelProvider {
@Override
public boolean shouldAutoClose() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean needsExecutor() {
// TODO Auto-generated method stub
return false;
}
@Override
public ManagedChannel getChannel() throws IOException {
return NettyChannelBuilder.forAddress("localhost", 8085)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}
@Override
public ManagedChannel getChannel(Executor executor) throws IOException {
return getChannel();
}
}
Versuchen Sie, ein Signal zum Stoppen hinzuzufügen. Siehe Beispielcode in ['startSync'] (http://googlecloudplatform.github.io/google-cloud-java/0.9.4/apidocs/com/google/cloud/pubsub/spi/v1/Subscriber.html#stopAsync -) um zu sehen, wie es umgesetzt wird. Wie in diesem [SO-Beitrag] (http://stackoverflow.com/a/19006386/5995040) angegeben, wird RejectedExecutionException verursacht, weil die Warteschlange voll ist und Sie keine weiteren Threads hinzufügen können oder der ThreadPool heruntergefahren wurde. Überprüfen Sie die Code-Implementierung. Hoffe das hilft. –
versuchte dies bereits, ändert nichts für mich Ich bekomme immer noch die gleiche rejectedExecutionException. Ich habe versucht, diese [Schnipsel] (https://github.com/GoogleCloudPlatform/google-cloud-java/blob/d476ef7904467233e83168b8d1f5a934a0aae711/google-cloud-examples/src/test/java/com/google/cloud/examples/pubsub/snippets/ITPubSubSnippets.java) und habe immer noch die Ausnahme, aber es scheint, als ob ich die Nachrichten erhalte, auch wenn die Ausnahme mich irritiert, irgendwelche anderen Ideen? –
Ich habe genau das gleiche Problem mit subscriber.stopAsync(); Mit der neuesten Bibliothek, hast du das herausgefunden? – hubpixel