2017-10-12 4 views
3

In jedem Intervall lade ich Tweets mit einer bestimmten Abfrage ab. Diese Tweets müssen an Dienste übergeben werden, die diese Tweets berechnen und manipulieren. Diese Dienste sind also für meinen Verlag abonniert. So publisher.hasSubscribers() gibt true zurück. Die Funktion submit oder offer ruft jedoch nicht den onNext meines Abonnenten auf. Also als ein "Fix", radiere ich durch meine Abonnenten und rufe es selbst auf. Aber das sollte nicht der Fall sein.SubmissionPublisher bei Submit wird nicht aufgerufen OnNext des Subskribenten

Dies ist der Konstruktor meines Herausgebers.

public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){ 
    super(executor, maxBufferCapacity); 
    this.searchQuery = searchQuery; 
    scheduler = new ScheduledThreadPoolExecutor(1); 
    this.tweetGetter = scheduler.scheduleAtFixedRate(
      () -> { 
       List<String> tweets = getTweets(searchQuery); 
       /* this.lastCall = LocalDateTime.now(); 
       for(Flow.Subscriber sub : this.getSubscribers()){ 
        sub.onNext(tweets); 
       }*/ 
       this.submit(tweets); 
       if(tweets.size() >= 20) this.close(); 
      }, 0, period, unit); 
} 

Das ist mein Teilnehmer

package myFlowAPI; 

import Interfaces.IProcess; 
import Services.LogToFileService; 

import java.util.List; 
import java.util.concurrent.Flow; 
import java.util.concurrent.atomic.AtomicInteger; 

public class MySubscriber implements Flow.Subscriber<List<String>> { 
private Flow.Subscription subscription; 
private AtomicInteger count; 

private IProcess processor; 

private String name; 
private int DEMAND = 0; 

public MySubscriber(String name, IProcess processor){ 
    this.name = name; 
    this.processor = processor; 
} 

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
} 


@Override 
public void onNext(List<String> item) { 
    Object result = this.processor.process(item); 
    this.readResult(result); 

    switch (this.processor.getClass().getSimpleName()){ 
     case "CalculateTweetStatsService": 
      if((Integer) result >= 20){ 
       this.subscription.cancel(); 
      } 
      break; 
    } 
} 

@Override 
public void onError(Throwable throwable) { 
    System.out.println("Error is thrown " + throwable.getMessage()); 
} 

@Override 
public void onComplete() { 
    if(this.processor instanceof LogToFileService){ 
     ((LogToFileService) processor).closeResource(); 
    } 
    System.out.println("complete"); 
} 

private void readResult(Object result){ 
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString()); 
} 
} 

Dies ist die wichtigste ist, wo ich an den Verlag

public static void main(String[] args) { 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); 

    String searchQuery; 
    try{ 
     searchQuery = args[0] != null ? args[0] : "#capgemini50"; 
    }catch (ArrayIndexOutOfBoundsException ex){ 
     searchQuery = "#capgemini50"; 
    } 

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery); 

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt")); 
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService()); 
    streamer.subscribe(subscriber1); 
    streamer.subscribe(subscriber2); 

} 

Antwort

4

Sie müssen die Teilnehmer abonnieren Sie explizit anfordern Daten z.B. bei der Zeichnung (siehe http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override 
public void onSubscribe(Flow.Subscription subscription) { 
    this.subscription = subscription; 
    this.subscription.request(1); 
} 

Samt bei der Verarbeitung in OnNext(), um den nächsten Artikel.

+2

Mein Held. Danke vielmals!! '@Override public void onNext (Liste Element) { Objekt result = this.processor.process (item); this.readResult (Ergebnis); Schalter (this.processor.getClass() getSimpleName().) { case "CalculateTweetStatsService": if ((Integer) Ergebnis> = 20) { this.subscription.cancel(); } Pause; } this.script.request (1); } ' – user1008531

Verwandte Themen