2017-03-02 6 views
1

Ich versuche, Daten von Mongodb mit Reaktivmongo-akkastream 0.12.1 zu streamen und das Ergebnis in eine CSV-Stream auf einer der Routen (mit Akka-http) zurückgeben. Ich habe implementieren, dass nach dem exemple hier:Streaming-CSV-Quelle mit AKKA-HTTP

http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-csv-streaming-example

und es sieht gut funktioniert.

Das einzige Problem, dem ich jetzt gegenüberstehe, ist, wie die Header der Ausgabe CSV-Datei hinzugefügt werden. Irgendwelche Ideen?

Dank

Antwort

2

Abgesehen von der Tatsache, dass das Beispiel der Erzeugung von CSV nicht wirklich eine robusten Methode ist (bietet keinen richtigen escaping) Sie es ein bisschen zu überarbeiten müssen Header hinzuzufügen. Hier ist, was ich tun würde:

  1. ein Flow machen ein Source[Tweet] mit einer Quelle von CSV-Zeilen zu konvertieren, z.B. ein Source[List[String]]
  2. verketten es mit einer Quelle Ihre Header als einzelne List[String]
  3. passen die Einweiser eine Quelle der Zeilen statt Tweets

Hier einige Beispiel zu machen, enthält Code:

case class Tweet(uid: String, txt: String) 

def getTweets: Source[Tweet, NotUsed] = ??? 

val tweetToRow: Flow[Tweet, List[String], NotUsed] = 
    Flow[Tweet].map { t => 
    List(
     t.uid, 
     t.txt.replaceAll(",", ".")) 
    } 

// provide a marshaller from a row (List[String]) to a ByteString 
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row => 
    Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`,() => 
    ByteString(row.mkString(",")) 
) 
} 

// enable csv streaming 
implicit val csvStreaming = EntityStreamingSupport.csv() 

val route = path("tweets") { 
    val headers = Source.single(List("uid", "text")) 
    val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow) 
    complete(headers.concat(tweets)) 
} 

Update: Wenn Ihre getTweets Methode eine Future zurückgibt, können Sie einfach ihren Quellwert abbilden und die Header auf diese Weise voranstellen, zB:

val route = path("tweets") { 
    val headers = Source.single(List("uid", "text")) 
    val rows: Future[Source[List[String], NotUsed]] = getTweets 
     .map(tweets => headers.concat(tweets.via(tweetToRow))) 
    complete(rows) 
} 
+0

Danke Mikesname !! letzte frage, ich habe tatsächlich einen akka schauspieler, der die daten als quelle von mongodb auf diese weise zurückliefert: (MyView? GetStuff()). mapTo [Source [MyObject, NotUsed]] - Wie würden Sie dies auf die tweetToRow-Methode umleiten dass es eine Zukunft zurückbringt. Danke –

+0

@HasseneBenAmara hat die Antwort aktualisiert – Mikesname

+0

Danke noch einmal für Ihre Hilfe. Tatsächlich sieht die via() - Funktion auf diese Weise nicht korrekt aus ... Das ist, was ich bekomme, wenn ich mein Projekt erstelle ... Fehler: (155, 87) type mismatch; gefunden: akka.stream.scaladsl.Flow [MyObject, List [String], akka.NotUsed] erforderlich: akka.stream.Graph [akka.stream.FlowShape [Produkt mit java.io.Serializable ,?] ,?] Ich habe die Via-Signatur doppelt überprüft, bin mir aber nicht sicher, wie ich meinen Code umgestalten soll, damit er funktioniert. Ich bin immer noch neu in akka Streams und scala, also schätze deine Hilfe wirklich! –