2013-08-09 5 views
9

Angenommen, ich möchte eine veränderbare Map in Scala verwenden, um zu verfolgen, wie oft ich einige Strings gesehen habe. In einem Kontext, single-threaded, dann ist dies einfach:Thread-sichere Umwandlung eines Wertes in eine veränderbare Map

import scala.collection.mutable.{ Map => MMap } 

class Counter { 
    val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def add(s: String): Unit = counts(s) += 1 
} 

Leider ist dies nicht Thread-sicher, da die get und die update atomar nicht passieren.

Concurrent maps hinzufügen a few atomic operations zum wandelbar Karte API, aber nicht die, die ich brauche, die wie folgt aussehen würde:

def replace(k: A, f: B => B): Option[B] 

Ich weiß, ich ScalaSTM ‚s nutzen können TMap:

import scala.concurrent.stm._ 

class Counter { 
    val counts = TMap.empty[String, Int] 

    def add(s: String): Unit = atomic { implicit txn => 
    counts(s) = counts.get(s).getOrElse(0) + 1 
    } 
} 

Aber (für jetzt), das ist immer noch eine extra Abhängigkeit. Andere Optionen würden Akteure (eine weitere Abhängigkeit), Synchronisation (möglicherweise weniger effizient) oder Javas atomic references (less idiomatic) einschließen.

Im Allgemeinen würde ich veränderbare Karten in Scala vermeiden, aber ich habe gelegentlich solche Dinge gebraucht, und zuletzt habe ich den STM-Ansatz benutzt (anstatt nur die Daumen zu drücken und zu hoffen, dass ich nicht komme) von der naiven Lösung gebissen).

Ich weiß, dass es hier eine Reihe von Kompromissen gibt (zusätzliche Abhängigkeiten vs. Leistung vs. Klarheit usw.), aber gibt es in Scala 2.10 so etwas wie eine "richtige" Antwort auf dieses Problem?

+1

was über ein Akka Schauspieler, die auf die wandelbaren Karte schreibt? 'Counter.add' sendet nur eine Feuer-und-Vergessen-Nachricht an sie. Was das Lesen betrifft, können sie je nach Bedarf gleichzeitig stattfinden oder auch durch den Akteur gehen. – gourlaysama

Antwort

3

Die einfachste Lösung ist definitiv die Synchronisation. Wenn es nicht zu viel Konkurrenz gibt, ist die Leistung möglicherweise nicht so schlecht.

Andernfalls könnten Sie versuchen, Ihre eigene STM-ähnliche replace Implementierung zu erstellen. So etwas könnte dazu führen:

object ConcurrentMapOps { 
    private val rng = new util.Random 
    private val MaxReplaceRetryCount = 10 
    private val MinReplaceBackoffTime: Long = 1 
    private val MaxReplaceBackoffTime: Long = 20 
} 
implicit class ConcurrentMapOps[A, B](val m: collection.concurrent.Map[A,B]) { 
    import ConcurrentMapOps._ 
    private def replaceBackoff() { 
    Thread.sleep((MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime)).toLong) // A bit crude, I know 
    } 

    def replace(k: A, f: B => B): Option[B] = { 
    m.get(k) match { 
     case None => return None 
     case Some(old) => 
     var retryCount = 0 
     while (retryCount <= MaxReplaceRetryCount) { 
      val done = m.replace(k, old, f(old)) 
      if (done) { 
      return Some(old) 
      } 
      else {   
      retryCount += 1 
      replaceBackoff() 
      } 
     } 
     sys.error("Could not concurrently modify map") 
    } 
    } 
} 

Beachten Sie, dass Kollisionsprobleme auf einen bestimmten Schlüssel lokalisiert sind. Wenn zwei Threads auf dieselbe Map zugreifen, aber an unterschiedlichen Schlüsseln arbeiten, haben Sie keine Kollisionen und die Ersetzungsoperation wird immer beim ersten Mal erfolgreich ausgeführt. Wenn eine Kollision erkannt wird, warten wir ein wenig (eine zufällige Zeitspanne, um die Wahrscheinlichkeit zu minimieren, dass Threads für immer denselben Schlüssel kämpfen) und versuchen es erneut.

Ich kann nicht garantieren, dass dies produktionsfertig ist (ich habe es gerade geworfen), aber das könnte den Trick machen.

UPDATE: Natürlich (wie Ionuţ G. Stan wies darauf hin), wenn alles, was Sie wollen Inkrement/Wert verringern, java ConcurrentHashMap bereits bietet thoses Operationen in einem Schloss frei. Meine obige Lösung gilt, wenn Sie eine allgemeinere replace Methode benötigen, die die Transformationsfunktion als Parameter verwenden würde.

+0

Ich bemerkte in der Karte Code wechselte er zu ThreadLocalRandom https://github.com/scala/scala/blob/master/src/library/scala/collection/concurrent/TrieMap.scala#L473 –

10

Wie wäre es mit diesem? Angenommen, Sie brauchen im Moment keine allgemeine replace Methode, nur einen Zähler.

Sie erhalten eine bessere Leistung als die Synchronisierung auf der gesamten Karte, und Sie erhalten auch atomare Inkremente.

+0

Danke-ich interessiere mich für die allgemeiner Fall, aber es ist gut zu sehen, dass dies so einfach ist. –

+0

Dies ist die richtige Lösung und nutzt die Java-Bibliotheken mit sehr hoher Performance. –

+1

Ich bin gespannt, ob es einen Grund gibt, nach ConcurrentHashMap statt nach concurrent.TrieMap zu greifen. Ich habe keine Meinung, nur dass das Forum eine Werbung für API ist. –

2

Sie fragen nach Ärger, wenn Ihre Karte nur als val sitzt.Wenn es Ihren Anwendungsfall erfüllt, würde ich etwas wie

für Low-Contention-Verwendung empfehlen. Für eine hohe Konkurrenz sollten Sie eine gleichzeitige Karte verwenden, die diese Verwendung unterstützt (z. B. java.util.concurrent.ConcurrentHashMap) und die Werte in AtomicWhatever umbrechen.

2

Wenn Sie ok sind mit Zukunft basierte Schnittstelle zu arbeiten:

trait SingleThreadedExecutionContext { 
    val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) 
} 

class Counter extends SingleThreadedExecutionContext { 
    private val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def get(s: String): Future[Int] = future(counts(s))(ec) 

    def add(s: String): Future[Unit] = future(counts(s) += 1)(ec) 
} 

-Test wird wie folgt aussehen:

class MutableMapSpec extends Specification { 

    "thread safe" in { 

    import ExecutionContext.Implicits.global 

    val c = new Counter 
    val testData = Seq.fill(16)("1") 
    await(Future.traverse(testData)(c.add)) 
    await(c.get("1")) mustEqual 16 
    } 
} 
+0

Dies ist überhaupt nicht Thread-sicher. Während Sie einen einzelnen Writer zu einem Zeitpunkt garantieren, können Sie immer noch Threads lesen, während die Map geändert wird –

+0

Wie ich verstehe, sind alle Operationen - lesen, schreiben, gemischt -, die ec als Kontext verwenden, threadsicher. Ops außerhalb dieses Kontextes sind nicht Thread-sicher. Wir freuen uns von anderen zu hören, ob dieses Verständnis stimmt. –

+0

Aber die Sache ist, das Lesen wird direkt ausgeführt: Wenn Sie auf 'c.counts' zugreifen, verwenden Sie den' ExecutionContext' überhaupt nicht. –

Verwandte Themen