Ich möchte benutzerdefinierte Source[ByteSting]
in Akka Stream implementieren. Diese Quelle sollte nur Daten aus der bereitgestellten Datei und innerhalb des angegebenen Byte-Bereichs lesen und sie stromabwärts propagieren.Implementieren benutzerdefinierte Akka Streams Quelle basierend auf ActorPublisher
Zuerst dachte ich, dass dies durch die Implementierung von Actor, der in ActorPublisher mischt, getan werden kann. Diese Implementierung ist analog zu akka.stream.impl.io.FilePublisher
, die die gesamte Datei von gelieferten Pfad liest, statt nur die Daten von bestimmten Bytes reichen:
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.{Path, StandardOpenOption}
import akka.actor.{ActorLogging, DeadLetterSuppression, Props}
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.NonFatal
class FilePublisher(pathToFile: Path, startByte: Long, endByte: Long) extends ActorPublisher[ByteString]
with ActorLogging{
import FilePublisher._
private val chunksToBuffer = 10
private var bytesLeftToRead = endByte - startByte + 1
private var fileChannel: FileChannel = _
private val buffer = ByteBuffer.allocate(8096)
private var bufferedChunks: Vector[ByteString] = _
override def preStart(): Unit = {
try {
log.info("Starting")
fileChannel = FileChannel.open(pathToFile, StandardOpenOption.READ)
bufferedChunks = readAhead(Vector.empty, Some(startByte))
log.info("Chunks {}", bufferedChunks)
} catch {
case NonFatal(ex) => onErrorThenStop(ex)
}
}
override def postStop(): Unit = {
log.info("Stopping")
if (fileChannel ne null)
try fileChannel.close() catch {
case NonFatal(ex) => log.error(ex, "Error during file channel close")
}
}
override def receive: Receive = {
case Request =>
readAndSignalNext()
log.info("Got request")
case Continue =>
log.info("Continuing reading")
readAndSignalNext()
case Cancel =>
log.info("Cancel message got")
context.stop(self)
}
private def readAndSignalNext() = {
log.info("Reading and signaling")
if (isActive) {
bufferedChunks = readAhead(signalOnNext(bufferedChunks), None)
if (isActive && totalDemand > 0) self ! Continue
}
}
@tailrec
private def signalOnNext(chunks: Vector[ByteString]): Vector[ByteString] = {
if (chunks.nonEmpty && totalDemand > 0) {
log.info("Signaling")
onNext(chunks.head)
signalOnNext(chunks.tail)
} else {
if (chunks.isEmpty && bytesLeftToRead > 0) {
onCompleteThenStop()
}
chunks
}
}
@tailrec
private def readAhead(currentlyBufferedChunks: Vector[ByteString], startPosition: Option[Long]): Vector[ByteString] = {
if (currentlyBufferedChunks.size < chunksToBuffer) {
val bytesRead = readDataFromChannel(startPosition)
log.info("Bytes read {}", bytesRead)
bytesRead match {
case Int.MinValue => Vector.empty
case -1 =>
log.info("EOF reached")
currentlyBufferedChunks // EOF reached
case _ =>
buffer.flip()
val chunk = ByteString(buffer)
buffer.clear()
bytesLeftToRead -= bytesRead
val trimmedChunk = if (bytesLeftToRead >= 0) chunk else chunk.dropRight(bytesLeftToRead.toInt)
readAhead(currentlyBufferedChunks :+ trimmedChunk, None)
}
} else {
currentlyBufferedChunks
}
}
private def readDataFromChannel(startPosition: Option[Long]): Int = {
try {
startPosition match {
case Some(position) => fileChannel.read(buffer, position)
case None => fileChannel.read(buffer)
}
} catch {
case NonFatal(ex) =>
log.error(ex, "Got error reading data from file channel")
Int.MinValue
}
}
}
object FilePublisher {
private case object Continue extends DeadLetterSuppression
def props(path: Path, startByte: Long, endByte: Long): Props = Props(classOf[FilePublisher], path, startByte, endByte)
}
aber stellt sich heraus, dass, wenn ich Source
durch meine FilePublisher
wie diese gesichert materialisieren:
val fileSource = Source.actorPublisher(FilePublisher.props(pathToFile, 0, fileLength))
val future = fileSource.runWith(Sink.seq)
nichts passiert und die Quelle weiter Daten nicht weiter verbreitet.
Gibt es einen anderen richtigen Weg, um Source
basierend auf meinem FilePublisher
materialisieren oder sollte ich nicht diese API verwenden und benutzerdefinierte Bearbeitungsstufe wie beschrieben here einfach implementieren?
Das Problem mit dem CustomStage-Ansatz ist, dass seine triviale Implementierung IO in dieser Phase sofort ausführen wird. Ich denke, ich könnte IO von der Bühne zu einem benutzerdefinierten Thread-Pool oder -Aktor bewegen, aber dies würde eine Art von Synchronisation zwischen der Bühne und dem Akteur erfordern. Danke.
Danke, das ist ein gültiger Punkt. Ich habe geplant, die Verwendung von separaten Dispatcher in Zukunft hinzuzufügen. – thereisnospoon