2017-11-22 5 views
1

Ich habe Kryo-serialisierte Binärdaten auf S3 gespeichert (Tausende von serialisierten Objekten).Alpakka - gelesen Kryo-serialisierte Objekte aus S3

Alpakka können Sie den Inhalt als data: Source[ByteString, NotUsed] lesen. Aber Kryo-Format verwendet keine Begrenzer, so dass ich nicht jedes serialisierte Objekt in eine separate ByteString mit data.via(Framing.delimiter(...)) teilen kann.

Also, Kryo muss eigentlich die Daten lesen, um zu verstehen, wann ein Objekt endet, und es sieht nicht streaming-freundlich aus.

Ist es möglich, diesen Fall in Streaming-Mode zu implementieren, so dass ich am Ende des Tages Source[MyObject, NotUsed] bekommen?

Antwort

1

Hier ist eine Grafikstufe, die das tut. Es behandelt den Fall, wenn ein serialisiertes Objekt zwei Byte-Strings umfasst. Es muss verbessert werden, wenn Objekte groß sind (nicht mein Anwendungsfall) und kann mehr als zwei Byte-Strings in Source[ByteString, NotUsed] nehmen.

object KryoReadStage { 
    def flow[T](kryoSupport: KryoSupport, 
       `class`: Class[T], 
       serializer: Serializer[_]): Flow[ByteString, immutable.Seq[T], NotUsed] = 
    Flow.fromGraph(new KryoReadStage[T](kryoSupport, `class`, serializer)) 
} 

final class KryoReadStage[T](kryoSupport: KryoSupport, 
          `class`: Class[T], 
          serializer: Serializer[_]) 
    extends GraphStage[FlowShape[ByteString, immutable.Seq[T]]] { 

    override def shape: FlowShape[ByteString, immutable.Seq[T]] = FlowShape.of(in, out) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     setHandler(in, new InHandler { 

     override def onPush(): Unit = { 
      val bytes = 
      if (previousBytes.length == 0) grab(in) 
      else ByteString.fromArrayUnsafe(previousBytes) ++ grab(in) 

      Managed(new Input(new ByteBufferBackedInputStream(bytes.asByteBuffer))) { input => 
      var position = 0 
      val acc = ListBuffer[T]() 

      kryoSupport.withKryo { kryo => 
       var last = false 

       while (!last && !input.eof()) { 
       tryRead(kryo, input) match { 
        case Some(t) => 
        acc += t 
        position = input.total().toInt 
        previousBytes = EmptyArray 
        case None => 
        val bytesLeft = new Array[Byte](bytes.length - position) 

        val bb = bytes.asByteBuffer 
        bb.position(position) 
        bb.get(bytesLeft) 

        last = true 
        previousBytes = bytesLeft 
       } 
       } 

       push(out, acc.toList) 
      } 
      } 
     } 

     private def tryRead(kryo: Kryo, input: Input): Option[T] = 
      try { 
      Some(kryo.readObject(input, `class`, serializer)) 
      } catch { 
      case _: KryoException => None 
      } 

     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      pull(in) 
     } 
     }) 

     private val EmptyArray: Array[Byte] = Array.empty 

     private var previousBytes: Array[Byte] = EmptyArray 

    } 
    } 

    override def toString: String = "KryoReadStage" 

    private lazy val in: Inlet[ByteString] = Inlet("KryoReadStage.in") 
    private lazy val out: Outlet[immutable.Seq[T]] = Outlet("KryoReadStage.out") 

} 

Beispiel zur Nutzung:

client.download(BucketName, key) 
    .via(KryoReadStage.flow(kryoSupport, `class`, serializer)) 
    .flatMapConcat(Source(_)) 

Es nutzt einige zusätzliche Helfer unter.

ByteBufferBackedInputStream:

class ByteBufferBackedInputStream(buf: ByteBuffer) extends InputStream { 

    override def read: Int = { 
    if (!buf.hasRemaining) -1 
    else buf.get & 0xFF 
    } 

    override def read(bytes: Array[Byte], off: Int, len: Int): Int = { 
    if (!buf.hasRemaining) -1 
    else { 
     val read = Math.min(len, buf.remaining) 
     buf.get(bytes, off, read) 
     read 
    } 
    } 

} 

Managed:

object Managed { 

    type AutoCloseableView[T] = T => AutoCloseable 

    def apply[T: AutoCloseableView, V](resource: T)(op: T => V): V = 
    try { 
     op(resource) 
    } finally { 
     resource.close() 
    } 
} 

KryoSupport:

trait KryoSupport { 
    def withKryo[T](f: Kryo => T): T 
} 

class PooledKryoSupport(serializers: (Class[_], Serializer[_])*) extends KryoSupport { 

    override def withKryo[T](f: Kryo => T): T = { 
    pool.run(new KryoCallback[T] { 
     override def execute(kryo: Kryo): T = f(kryo) 
    }) 
    } 

    private val pool = { 
    val factory = new KryoFactory() { 
     override def create(): Kryo = { 
     val kryo = new Kryo 

     (KryoSupport.ScalaSerializers ++ serializers).foreach { 
      case ((clazz, serializer)) => 
      kryo.register(clazz, serializer) 
     } 

     kryo 
     } 
    } 

    new KryoPool.Builder(factory).softReferences().build() 
    } 

}