2017-03-10 2 views
1

Ich möchte benutzerdefinierte Source[ByteSting] in Akka Stream implementieren. Diese Quelle sollte nur Daten aus der bereitgestellten Datei und innerhalb des angegebenen Byte-Bereichs lesen und sie stromabwärts propagieren.Implementieren benutzerdefinierte Akka Streams Quelle basierend auf ActorPublisher

Zuerst dachte ich, dass dies durch die Implementierung von Actor, der in ActorPublisher mischt, getan werden kann. Diese Implementierung ist analog zu akka.stream.impl.io.FilePublisher, die die gesamte Datei von gelieferten Pfad liest, statt nur die Daten von bestimmten Bytes reichen:

import java.nio.ByteBuffer 
import java.nio.channels.FileChannel 
import java.nio.file.{Path, StandardOpenOption} 

import akka.actor.{ActorLogging, DeadLetterSuppression, Props} 
import akka.stream.actor.ActorPublisher 
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request} 
import akka.util.ByteString 

import scala.annotation.tailrec 
import scala.util.control.NonFatal 

class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString] 
    with ActorLogging{ 

    import FilePublisher._ 

    private val chunksToBuffer = 10 
    private var bytesLeftToRead = endByte - startByte + 1 
    private var fileChannel: FileChannel = _ 
    private val buffer = ByteBuffer.allocate(8096) 

    private var bufferedChunks: Vector[ByteString] = _ 

    override def preStart(): Unit = { 
    try { 
     log.info("Starting") 
     fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ) 
     bufferedChunks = readAhead(Vector.empty, Some(startByte)) 
     log.info("Chunks {}", bufferedChunks) 
    } catch { 
     case NonFatal(ex) => onErrorThenStop(ex) 
    } 
    } 

    override def postStop(): Unit = { 

    log.info("Stopping") 
    if (fileChannel ne null) 
     try fileChannel.close() catch { 
     case NonFatal(ex) => log.error(ex, "Error during file channel close") 
    } 
    } 

    override def receive: Receive = { 
    case Request => 
     readAndSignalNext() 
     log.info("Got request") 
    case Continue => 
     log.info("Continuing reading") 
     readAndSignalNext() 
    case Cancel => 
     log.info("Cancel message got") 
     context.stop(self) 
    } 

    private def readAndSignalNext() = { 

    log.info("Reading and signaling") 
    if (isActive) { 
     bufferedChunks = readAhead(signalOnNext(bufferedChunks), None) 
     if (isActive && totalDemand > 0) self ! Continue 
    } 
    } 

    @tailrec 
    private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = { 

    if (chunks.nonEmpty && totalDemand > 0) { 
     log.info("Signaling") 
     onNext(chunks.head) 
     signalOnNext(chunks.tail) 
    } else { 
     if (chunks.isEmpty && bytesLeftToRead > 0) { 
     onCompleteThenStop() 
     } 
     chunks 
    } 
    } 

    @tailrec 
    private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = { 

    if (currentlyBufferedChunks.size < chunksToBuffer) { 

     val bytesRead = readDataFromChannel(startPosition) 
     log.info("Bytes read {}", bytesRead) 
     bytesRead match { 
     case Int.MinValue => Vector.empty 
     case -1 => 
      log.info("EOF reached") 
      currentlyBufferedChunks // EOF reached 
     case _ => 
      buffer.flip() 
      val chunk = ByteString(buffer) 
      buffer.clear() 

      bytesLeftToRead -= bytesRead 
      val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt) 
      readAhead(currentlyBufferedChunks :+ trimmedChunk, None) 
     } 

    } else { 
     currentlyBufferedChunks 
    } 
    } 

    private def readDataFromChannel(startPosition: Option[Long]): Int = { 
    try { 
     startPosition match { 
     case Some(position) => fileChannel.read(buffer, position) 
     case None => fileChannel.read(buffer) 
     } 
    } catch { 
     case NonFatal(ex) => 
     log.error(ex, "Got error reading data from file channel") 
     Int.MinValue 
    } 
    } 
} 

object FilePublisher { 

    private case object Continue extends DeadLetterSuppression 

    def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte) 
} 

aber stellt sich heraus, dass, wenn ich Source durch meine FilePublisher wie diese gesichert materialisieren:

val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength)) 
val future = fileSource.runWith(Sink.seq) 

nichts passiert und die Quelle weiter Daten nicht weiter verbreitet.

Gibt es einen anderen richtigen Weg, um Source basierend auf meinem FilePublisher materialisieren oder sollte ich nicht diese API verwenden und benutzerdefinierte Bearbeitungsstufe wie beschrieben here einfach implementieren?

Das Problem mit dem CustomStage-Ansatz ist, dass seine triviale Implementierung IO in dieser Phase sofort ausführen wird. Ich denke, ich könnte IO von der Bühne zu einem benutzerdefinierten Thread-Pool oder -Aktor bewegen, aber dies würde eine Art von Synchronisation zwischen der Bühne und dem Akteur erfordern. Danke.

Antwort

0

Das Problem wurde durch einen Fehler in Mustervergleich von receive Methode verursacht: Diese Linie case Request => sollte stattdessen case Request(_) weil Request tatsächlich Fall-Klasse mit einzelnen Parametern (final case class Request(n: Long)) und nicht Fall Gegenstand, wie ich dachte.

0

Ich habe festgestellt, dass Sie derzeit keinen separaten Dispatcher für IO-Vorgänge verwenden. Here's In der Dokumentation erfahren Sie, warum das nicht zu Blockaden in Ihrer Anwendung führen kann.

Akka Streams umschließt die FilePublisher in einer FileSource mit einem bestimmten Thread-Pool-basierten Dispatcher. Sie können ihren Code für Inspiration here überprüfen.

+0

Danke, das ist ein gültiger Punkt. Ich habe geplant, die Verwendung von separaten Dispatcher in Zukunft hinzuzufügen. – thereisnospoon

Verwandte Themen