2016-07-24 13 views
0

Ich versuche, einen Radio-Server in ElixirElixir Strom an alle Abonnenten

Ein Prozess wird immer zu implementieren arbeiten und eine Datei (mp3) und veröffentlichen zu Thema lesen „: radio“, die derzeit für Testzwecke, wenn es beendet es beginnt über

Jede Verbindung zum Thema abonniert „: radio“

ich verstehe nicht, wie die Stücke an alle abonnierten Verbindungen zu senden, geschlossen die Verbindung nach 2 oder 3 Stücke

defmodule Plugtest do 
    import Plug.Conn 

    def init(opts), do: opts 

    def start() do 
    Plug.Adapters.Cowboy.http(Plugtest, []) 
    {:ok, _pid} = PubSub.start_link() 
    spawn(fn -> stream_from_file("./song.mp3", 128) end) 
    end 

    def call(conn, _opts) do 
    conn = conn 
    |> send_chunked(200) 
    |> put_resp_content_type("audio/mpeg") 

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio) 
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only 
    end 

    defp send_chunk_to_connection(conn) do 
    receive do 
     {:radio_data, data} -> 
     IO.inspect "* #{inspect self()} * [ #{inspect conn.owner} ] [ #{inspect data} ]" 
#  Enum.into(data, conn) # not working TODO send chunk to connection 
     {:ok, conn} = chunk(conn, data) 
     send_chunk_to_connection(conn) 
    end 
    end 

    defp stream_from_file(fpath, bytes) do 
    File.stream!(fpath, [], bytes) 
    |> Enum.each(fn chunk -> 
     PubSub.publish(:radio, {:radio_data, chunk}) 
    end) 
    stream_from_file(fpath, bytes) 
    end 

end 

Stacktrace:

[error] Process #PID<0.274.0> raised an exception 
     ** (MatchError) no match of right hand side value: {:error, :closed}  
      (plugtest) lib/plugtest.ex:26: Plugtest.send_chunk_to_connection/1 

Abhängigkeiten:

defp deps do 
    [{:plug, "~> 1.0"}, {:cowboy, "~> 1.0"}, {:pubsub, "~> 0.0.2"}] 
    end 

bearbeiten nach @maxdec Kommentar

defmodule Plugtest do 
    import Plug.Conn 

    @file_path "./song.mp3" 
    @port 4000 
    @chunk_size 128 

    def init(opts), do: opts 

    def start() do 
    Plug.Adapters.Cowboy.http Plugtest, [], port: @port 
    {:ok, _pid} = PubSub.start_link() 
    spawn fn -> 
     stream_from_file(@file_path, @chunk_size) 
    end 
    end 

    def call(conn, _opts) do 
    conn = conn 
    |> send_chunked(200) 
    |> put_resp_content_type("audio/mpeg") 

    :ok = PubSub.subscribe(spawn(fn -> send_chunk_to_connection(conn) end), :radio) 
# File.stream!("./song.mp3", [], 128) |> Enum.into(conn) # test purpose only 
    conn 
    end 
    defp send_chunk_to_connection(conn) do 
    receive do 
     {:radio_data, data} -> 
     case chunk(conn, data) do 
      {:ok, conn} -> send_chunk_to_connection(conn) 
      {:error, err} -> IO.puts err # do nothing, as something went wrong (client disconnection or something else...) 
     end 
    end 
    end 

    defp stream_from_file(fpath, bytes) do 
    File.stream!(fpath, [], bytes) 
    |> Enum.each(fn chunk -> 
     PubSub.publish(:radio, {:radio_data, chunk}) 
    end) 
    stream_from_file(fpath, bytes) 
    end 

end 
+0

benötige ich einzigartigen Header zu senden, um aufgeteilte Codierung macht die Arbeit für Audio über http? – IddoE

Antwort

1

Nach einem kurzen Blick, den ich denke, es gibt 2 Dinge, die Sie sollten beheben:

  1. PlugTest ist ein Plug also call/2 sollte conn zurückgeben (das ist jedoch nicht Ihr Problem). Es sollte auch blockiert, während Warten auf Ereignisse (receive):

    def call(conn, _opts) do 
        conn = conn 
        |> send_chunked(200) 
        |> put_resp_content_type("audio/mpeg") 
    
        :ok = PubSub.subscribe(self(), :radio) 
        send_chunk_to_connection(conn) 
    end 
    
  2. In send_chunk_to_connection sollten Sie tun:

    defp send_chunk_to_connection(conn) do 
        receive do 
        {:radio_data, data} -> 
         case chunk(conn, data) do 
         {:ok, conn} -> send_chunk_to_connection(conn) 
         {:error, err} -> IO.puts err; conn # do nothing, as something went wrong (client disconnection or something else...) 
         end 
        end 
    end 
    
+0

erstens, danke, es funktioniert nicht, ich änderte Call/2, habe nicht verstanden, was zu tun ist, send_chunk_to_connection/1, fügte den Code über – IddoE

+0

@ JimWest: Ich habe meine Antwort bearbeitet, um es klarer – maxdec

+0

versucht, habe "geschlossen", bekomme ich nicht 2 # send_chunk_to_connection # {: ok, conn}, wo ist die Enum.into, die Daten an Verbindung senden? (auch, editierten Code oben) – IddoE