2016-09-22 6 views
0

Ich habe eine einfache Abfrage wie folgt:Siddhi bekomme ich keine Antwort von Aggregatabfrage

define stream myEventStream (userId string, price int); 
define stream outputStream (avgPrice double, userId string); 

@info(name = 'aQuery') 
from myEventStream#window.time(5000) 
select avg(price) as avgPrice, userId group by userId 
insert into outputStream; 

Wenn ich die Abfrage Rückruf füge ich keine Antwort von ihm bekommt.

runtime.addCallback("aQuery", new QueryCallback() { 
     @Override 
     public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { 
      EventPrinter.print(timeStamp, inEvents, removeEvents); 
     } 
    }); 

Ich produzieren Nachrichten in einem anderen Thread:

final AtomicInteger counter = new AtomicInteger(0); 
    final Random rnd = new Random(System.currentTimeMillis()); 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    executor.submit(() -> { 
     while (counter.getAndIncrement() < 100) { 
      try { 
       handler.send(new Object[]{"user1", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user2", rnd.nextInt(100)}); 
       handler.send(new Object[]{"user3", rnd.nextInt(100)}); 
       System.out.println("Sent: " + counter.get()); 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       throw new RuntimeException(e); 
      } 
     } 
    }); 

ich je 5 Sekunden zur Folge haben werde erwartet. Was fehlt mir hier? Bitte helfen Sie. Danke.

Antwort

0

Nun, das Problem war, dass ich die Laufzeit direkt nach dem Code heruntergefahren habe, der den Ereignisstrom erzeugt. Nichtsdestotrotz war ich in der Lage, Nachrichten zur Laufzeit zu senden, obwohl es bereits heruntergefahren war. Keine Ausnahmen.

Verwandte Themen