0

Ich lerne die Reaktiv-Stream-API von Java 9.Wie übertrage ich eine Nachricht an einen anderen Abonnenten?

Wie hat es die Verleger und Abonnenten, und der Abonnent abonniert dem Verleger und der Abonnent implementiert auch die folgenden Meldungen:

public class TestSubscriber<T> implements Subscriber<T> { 

    @Override 
    public void onComplete() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onError(Throwable arg0) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onNext(T arg0) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onSubscribe(Subscription arg0) { 
     // TODO Auto-generated method stub 

    } 

} 

Ich habe Keine Methode im Subscriber gefunden, die die Nachricht an einen anderen Teilnehmer weiterleitet/überträgt. Irgendwelche Vorschläge?

+0

Interessant, aber was soll das für erforderlich? Ich meine, wenn wir über ein [Abonnement] sprechen (https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscription.html), ist es zwischen einem Abonnenten und einem Herausgeber im Allgemeinen. – nullpointer

+0

Sagen Sie, wenn ein Abonnent die Nachricht ändern/ändern und an einen anderen Abonnenten übergeben möchte – KayV

+1

Erstens, ich denke, Sie können diese https://stackoverflow.com/questions/47936868/can-a-subscriber-act-as-a zusammenführen -publisher und behalte eins mit der detaillierten Frage. – nullpointer

Antwort

1

Dies kann die Flow.Processor erfolgen Umsetzung wie folgt:

import java.util.concurrent.Flow; 
import java.util.concurrent.Flow.Subscription; 
import java.util.concurrent.SubmissionPublisher; 
import java.util.function.Function; 

public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { 

    private Function<T, R> function; 
    private Flow.Subscription subscription; 

    public MyTransformer(Function<T, R> function) { 
     super(); 
     this.function = function; 
    } 

    @Override 
    public void onComplete() { 
     System.out.println("Transformer Completed"); 
    } 

    @Override 
    public void onError(Throwable e) { 
     e.printStackTrace(); 
    } 

    @Override 
    public void onNext(T item) { 
     System.out.println("Transformer Got : "+item); 
     submit(function.apply(item)); 
     subscription.request(1); 

    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     subscription.request(1); 
    } 


} 

Und wenn Sie anrufen sie wie folgt verwendet werden:

public class TestTransformer { 

    public static void main(String... args) { 
     SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
     MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt); 

     TestSubscriber<Integer> subscriber = new TestSubscriber<>(); 
     List<String> items = List.of("1", "2", "3"); 

     List<Integer> expectedResult = List.of(1, 2, 3); 

     publisher.subscribe(transformProcessor); 
     transformProcessor.subscribe(subscriber); 
     items.forEach(publisher::submit); 
     publisher.close(); 

    } 
} 
Verwandte Themen