2016-05-29 6 views
0

Ich muss die Methode handle() von SomeObjectHandler implementieren, die SomeObject an ein äußeres System delegiert (siehe unten). SomeObject mit korrekten hashCode- und equals-Methoden. Method-Handle (SomeObject someObject) kann von vielen Threads (z. B. 10) aufgerufen werden. Das äußere System kann nicht gleich mit someObject zur gleichen Zeit funktionieren, aber es bricht ab, wenn das System versucht, gleichzeitig mit someObject zu arbeiten. Ich muss diese Klasse implementieren, um gleichzeitige Verarbeitung von gleichem aheadObject zu verhindern. Und selbst wenn einige Objekte gleich sind, sollten sie alle verarbeitet werden.Senden von Nachrichten aus vielen Threads in Java

Jetzt denke ich, dass ich etwas wie Warteschlange aus der gleichzeitigen Bibliothek verwenden muss, aber ich weiß nicht welche.

UPD: Ich brauche nur Standard-Java-Bibliotheken. Und wenn es möglich ist, maximalen Durchsatz zu erreichen.

Antwort

0

Ich bin mir nicht 100% sicher, ob ich Ihre Frage vollständig beantwortet habe, aber ich denke, es gibt mehrere Möglichkeiten, dieses Problem anzugehen.

1) Wie Sie bereits erwähnt haben, können Sie eine Warteschlange zum Einfügen von Objekten verwenden und sicherstellen, dass das äußere System die Objekte synchron verarbeitet, da Ihr äußeres System, wie gesagt, keine gleichen Objekte gleichzeitig verarbeiten kann.

2) Behandeln Sie dies im Absendercode selbst. Ich habe das mehrmals versucht. Hier ist ein Codeausschnitt. Der Vorteil dieses Ansatzes besteht darin, dass nur gleiche Objekte synchronisiert werden. Stellen Sie sicher, dass Sie den Eliminierungsteil auch im Endblock behandeln. Entschuldigung, wenn der Code nicht sauber ist. Ich bin neu bei diesem :)

Map<SomeObject,Integer> objMap=new ConcurrentHashMap<SomeObject,Integer>(); 

public void handle(SomeObject someObject) { 
synchronized(this.class) 
{ 
Integer count=objMap.get(someObject); 
if(count==null) 
{ 
    count=0; 
} 
objMap.put(someObject,++count); 
} 

synchronized(objectMap.get(someObject) 
{ 
    outerSystem.process(someObject); 

    Integer count=objMap.get(someObject); 
    if(count>1) 
{ 
    objMap.put(someObject,--count); 
} 
else 
{ 
    objectMap.remove(someObject); 
} 
} 

} 
+0

Danke für deinen Versuch, so etwas könnte Arbeit sein. Aber ich denke, dass es eine bessere Lösung gibt. Es wäre schön, maximalen Durchsatz zu erreichen. Wenn wir synchronisieren/sperren für jede Aktion, kann es stark reduziert werden – AskProgram

0

RxJava kann hier helfen. Es ist sehr gut im Umgang mit Datenströmen, insbesondere bei asynchronen Transformationen, und spricht bei Bedarf mit der Warteschlange, unter der Sie gerade arbeiten (ohne über synchronisierte Modifikatoren zu blockieren!). Um Ihr Problem zu lösen würde ich so etwas tun:

public class SomeHandler{ 

    private final OuterSystem outerSystem; 

    private final PublishSubject<SomeObject> subject; 

    public SomeHandler() { 
     subject 
      // handle calls from multiple threads (non-blocking) 
      .serialized() 
      // buffer in memory if not keeping up  
      .onBackpressureBuffer() 
      // use equals/hashCode to order messages (the queues you referred to) 
      .groupBy(x -> x) 
      .flatMap(g -> 
       g.doOnNext(x -> outerSystem.process(x)) 
       // process groups in parallel 
       .subscribeOn(Schedulers.computation())) 
      // do something if an error occurs 
      .doOnError(e -> e.printStackTrace()) 
      // start consuming data when arrives 
      .subscribe(); 
    } 

    public void handle(SomeObject someObject) { 
     subject.doOnNext(someObject); 
    } 
} 
+0

Vielen Dank für Ihre Antwort, es könnte wirklich funktionieren. Aber ich brauche nur Standart-Java-Bibliotheken (Es ist mein Fehler, ich musste darüber schreiben) – AskProgram

0

Wenn ich Ihr Problem richtig verstehen müssen Sie serielle Ausführung von „equal-Objekte“ gewährleisten, während „ungleiche Objekte“ können in parallell verarbeitet werden. Eine Möglichkeit, dies zu erreichen, besteht darin, N Prozessoren anzuordnen und die Arbeitslast entsprechend einer bestimmten deterministischen Eigenschaft der Objekte zu verteilen.

In Ihrem Fall, wenn zwei Objekte gleich sind, muß ihr Hash-Codes gleich sein, so dass die hasCode() modulo N verwendet werden könnte, um die Last über N Testamentsvollstrecker, wobei jeder Executor enthält einen einzigen Thread nur zu verbreiten:

public class SomeHandler { 
    static int N = ...; 
    // Each executor is an Executors.newSingleThreadScheduledExecutor() 
    Executor[N] executors = ....; 
    OuterSystem system; 

    public void handle(SomeObject so) { 
     executors[so.hashCode() % N].execute(() -> system.process(so)); 
    } 
} 
+0

Netter Ansatz.) sollte übrigens sein und wie das OP sagte, dass "handle" von mehreren Threads aufgerufen wird, würde ich all diese Felder in 'SomeHandler' final machen. –

Verwandte Themen