Abgesehen von der Tatsache, dass das Beispiel der Erzeugung von CSV nicht wirklich eine robusten Methode ist (bietet keinen richtigen escaping) Sie es ein bisschen zu überarbeiten müssen Header hinzuzufügen. Hier ist, was ich tun würde:
- ein
Flow
machen ein Source[Tweet]
mit einer Quelle von CSV-Zeilen zu konvertieren, z.B. ein Source[List[String]]
- verketten es mit einer Quelle Ihre Header als einzelne
List[String]
- passen die Einweiser eine Quelle der Zeilen statt Tweets
Hier einige Beispiel zu machen, enthält Code:
case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`,() =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
Update: Wenn Ihre getTweets
Methode eine Future
zurückgibt, können Sie einfach ihren Quellwert abbilden und die Header auf diese Weise voranstellen, zB:
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}
Danke Mikesname !! letzte frage, ich habe tatsächlich einen akka schauspieler, der die daten als quelle von mongodb auf diese weise zurückliefert: (MyView? GetStuff()). mapTo [Source [MyObject, NotUsed]] - Wie würden Sie dies auf die tweetToRow-Methode umleiten dass es eine Zukunft zurückbringt. Danke –
@HasseneBenAmara hat die Antwort aktualisiert – Mikesname
Danke noch einmal für Ihre Hilfe. Tatsächlich sieht die via() - Funktion auf diese Weise nicht korrekt aus ... Das ist, was ich bekomme, wenn ich mein Projekt erstelle ... Fehler: (155, 87) type mismatch; gefunden: akka.stream.scaladsl.Flow [MyObject, List [String], akka.NotUsed] erforderlich: akka.stream.Graph [akka.stream.FlowShape [Produkt mit java.io.Serializable ,?] ,?] Ich habe die Via-Signatur doppelt überprüft, bin mir aber nicht sicher, wie ich meinen Code umgestalten soll, damit er funktioniert. Ich bin immer noch neu in akka Streams und scala, also schätze deine Hilfe wirklich! –