2017-10-16 3 views
0

ich kritische Frage habe ich Rocket-MQ (V4.1.0-Inkubation) Client verwenden, wie folgend:: invokeAsyncImpl aufrufen zu schnell

2017-10-16 16:18:12:457[ERROR][SimpleProducer$1.onException(SimpleProducer.java:44)] - send message to mq fail: 
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsyncImpl invoke too fast 
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeAsyncImpl(NettyRemotingAbstract.java:422) 
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:488) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:368) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:455) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:156) 
    at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:417) 
    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:51) 
    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:275) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Ich weiß nicht, Wie kann ich es reparieren, auch wenn ich viel google, ohne eine richtige Antwort.

Hier ist mein Code von Produzenten in asyn Weise:

public class SimpleProducer { 

static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class); 
static AtomicInteger total = new AtomicInteger(0); 
private final static CountDownLatch mCountDownLatch = new CountDownLatch(1); 
public static void main(String[] args){ 
    logger.info("Bootstrap start..."); 
    DefaultMQProducer producer = new DefaultMQProducer("Producer"); 
    producer.setNamesrvAddr("192.168.137.112:9876"); 
    try { 
     producer.start(); 
     producer.setRetryTimesWhenSendAsyncFailed(3); 
     long start = System.currentTimeMillis(); 
     for (int i = 0; i < 10000000; i++) 
      try { 
       { 
        Message msg = new Message("newTopic", 
          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        producer.send(msg, new SendCallback(){ 
         public void onSuccess(SendResult sendResult) { 
          if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) { 
           //MsgSendResponseCounter.factory.getInstance().onSuccess(); 
           logger.info("Succeeded send {} message, total {}", sendResult.getMsgId(), total.getAndIncrement()); 
          } 
         } 
         public void onException(Throwable ee) { 
          logger.error("send message to mq fail:", ee); 
         } 
        }); 
       } 

      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     mCountDownLatch.await(); 
     long end = System.currentTimeMillis(); 
     logger.info("Finished 10000000 msgs! elapsed time {} in all", (end - start)/1000); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    }finally{ 
     producer.shutdown(); 
    } 
} 

}

+0

wenn ich den Wert auf 0 für producer.setRetryTimesWhenSendAsyncFailed (0) gesetzt; eine neue Ausnahme wie folgt: org.apache.rocketmq.client.exception.MQClientException: warten Antwort Timeout 3000ms Für weitere Informationen, besuchen Sie bitte die URL, http://rocketmq.apache.org/docs/faq/ \t bei org .apache.rocketmq.client.impl.MQClientAPIImpl $ 1.operationComplete (MQClientAPIImpl.java:416) \t bei org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback (ResponseFuture.java:51) –

Antwort

0

Die Antwort ist eigentlich einfach, Sie zu schnell zu senden, dass die Strömungsregelschwelle erreicht.

Wenn Sie eine aysnc-Nachricht senden, wird der Client versuchen, eine Genehmigung zu akqurieren, nachdem er die Brokerantwort erhalten hat, wird er die Genehmigung freigeben.

Da Sie Nachrichten asynchron senden, wo producer.send() wird sehr schnell zurück, und Sie senden weiterhin Nachrichten in einer for-Schleife ohne Schlaf Milli, die das Problem haben wird.

einfach nur setzen Thread.sleep (100), nachdem Sie producer.send aufrufen oder das Sync senden Methode verwenden, die nicht die SendCallBack nicht benötigt