2017-10-02 4 views
0

Ich habe Daten, die ich über http erhalten, diese Daten müssen von zwei verschiedenen Funktionen verarbeitet werden. Es ist wichtig, dass sie von jeder Funktion nacheinander verarbeitet werden. In der Datei zum Beispiel: 1,2,3,4,5. Und die Datenbank verzeichnete auch 1,2,3,4,5. Als ein Fifo-Modell. Jetzt habe ich so ein Problem ... Die Daten, die ich habe, läuft kontinuierlich und manchmal kann die Datenbank meine Anfrage erfüllen, die Daten ziemlich lange zu aktualisieren, deshalb kann ich die Datei nicht rechtzeitig aktualisieren. Es ist wichtig für mich, dass die Daten der Datei oder Datenbank hinzugefügt wurden, wenn es möglich ist. Ich könnte gepufferte Kanäle verwenden, aber ich kann nicht wissen, wie viele Daten auf die Verarbeitung in der Warteschlange warten können, möchte ich nicht angeben, die Größe des Puffers ist sicherlich groß. Ich habe versucht, der NewData-Funktion mehr Goroutine hinzuzufügen, aber in diesem Fall werden meine Daten nicht sequentiell geschrieben.Datenverarbeitung durch mehrere Funktionen ist asynchron

Dieser Code zeigt das Problem.

package main 

import (
    "fmt" 
    "time" 
) 

type procHandler interface { 
    Start() 
    NewData(newdata []byte) 
} 

type fileWriter struct { 
    Data chan []byte 
} 

func (proc *fileWriter) Start() { 
    proc.Data = make(chan []byte) 
    go func() { 
     for { 
      obj := <-proc.Data 

      fmt.Printf("proc %T ", proc) 
      fmt.Println(obj) 
     } 
    }() 
} 

func (proc *fileWriter) NewData(newdata []byte) { 
    proc.Data <- newdata 
} 

type sqlWriter struct { 
    Data chan []byte 
} 

func (proc *sqlWriter) Start() { 
    proc.Data = make(chan []byte) 
    go func() { 
     for { 
      obj := <-proc.Data 
      time.Sleep(5 * time.Second) 
      fmt.Printf("proc %T ", proc) 
      fmt.Println(obj) 
     } 
    }() 
} 

func (proc *sqlWriter) NewData(newdata []byte) { 
    proc.Data <- newdata 
} 

var processors = []procHandler{} 

func receiver() { 
    newDataImitateByteRange := 30 
    for i := 0; i < newDataImitateByteRange; i++ { 
     pseudoData := []byte{byte(i)} 

     for _, handler := range processors { 
      handler.NewData(pseudoData) 
     } 
    } 
} 

func main() { 
    // file writer 
    fileUpdate := &fileWriter{} 
    processors = append(processors, fileUpdate) 

    // sql writer 
    sqlUpdate := &sqlWriter{} 
    processors = append(processors, sqlUpdate) 

    sqlUpdate.Start() 
    fileUpdate.Start() 

    go receiver() 

    fmt.Scanln() 
} 

-Code funktioniert: https://play.golang.org/p/rSshsJYZ4h

Ausgang:

proc *main.fileWriter [0] 
proc *main.fileWriter [1] 
proc *main.sqlWriter [0] (sleep) 
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed) 
proc *main.sqlWriter [1] (sleep) 
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed) 
proc *main.sqlWriter [2] 
proc *main.fileWriter [4] 
proc *main.sqlWriter [3] 
proc *main.fileWriter [5] 
proc *main.sqlWriter [4] 
proc *main.fileWriter [6] 

Ich möchte:

proc *main.fileWriter [0] 
proc *main.fileWriter [1] 
proc *main.fileWriter [2] 
proc *main.fileWriter [3] 
proc *main.fileWriter [4] 
proc *main.fileWriter [5] 
proc *main.fileWriter [6] 
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.) 
proc *main.sqlWriter [1] (sleep) 
proc *main.sqlWriter [2] (sleep) 
proc *main.sqlWriter [3] (sleep) 
proc *main.sqlWriter [4] (sleep) 
proc *main.sqlWriter [5] (sleep) 
proc *main.sqlWriter [6] (sleep) 

ich Hilfe hoffen, danke!

+1

Also, warum verwenden Sie goroutines, wenn Sie wollen, dass Dinge sequenziell passieren? Rufen Sie einfach einen Prozessor für alle Ihre Daten und dann für den nächsten auf. – Peter

+0

@Peter Danke für die Antwort. Konsistente Speicherung in jeder Struktur, und nicht im Allgemeinen, muss ich sicherstellen, dass die Funktion zum Speichern jeder Struktur asynchron arbeitet. Mit anderen Worten, ich muss diese Funktionen geben und erwarte keine Antwort von ihr. (newdata) – Feanon

+0

Ich folge nicht. Sie können es sequentiell oder asynchron haben; nicht beides zugleich. – Peter

Antwort

0

Es klingt wie das, was Sie suchen, ist etwas, das wie ein Kanal funktioniert, der die Größe (wächst oder schrumpft) mit den Daten ändert, die auf ihm eingereiht sind. Dies könnte implementiert werden, indem eine Warteschlange zwischen einem Eingangs- und Ausgangskanal mit einer Routine zur Versorgung dieser Kanäle vorgesehen ist. Hier ist eine solche Lösung: https://github.com/gammazero/bigchan#bigchan

ich einen BigChan als Datenkanal verwendet haben, in Ihrem fileWriter und sqlWriter und es scheint, um die Ergebnisse zu haben Sie suchen. Folgendes ist Ihr überarbeiteter Code:

package main 

import (
    "fmt" 
    "time" 

    "github.com/gammazero/bigchan" 
) 

// Maximum number of items to buffer. set to -1 for unlimited. 
const limit = 65536 

type procHandler interface { 
    Start() 
    NewData(newdata []byte) 
} 

type fileWriter struct { 
    Data *bigchan.BigChan 
} 

func (proc *fileWriter) Start() { 
    proc.Data = bigchan.New(limit) 
    go func() { 
     for { 
      _obj := <-proc.Data.Out() 
      obj := _obj.([]byte) 

      fmt.Printf("proc %T ", proc) 
      fmt.Println(obj) 
     } 
    }() 
} 

func (proc *fileWriter) NewData(newdata []byte) { 
    proc.Data.In() <- newdata 
} 

type sqlWriter struct { 
    Data *bigchan.BigChan 
} 

func (proc *sqlWriter) Start() { 
    proc.Data = bigchan.New(limit) 

    go func() { 
     for { 
      _obj := <-proc.Data.Out() 
      obj := _obj.([]byte) 
      time.Sleep(5 * time.Second) 
      fmt.Printf("proc %T ", proc) 
      fmt.Println(obj) 
     } 
    }() 
} 
func (proc *sqlWriter) NewData(newdata []byte) { 
    proc.Data.In() <- newdata 
} 

var processors = []procHandler{} 

func receiver() { 
    newDataImitateByteRange := 30 
    for i := 0; i < newDataImitateByteRange; i++ { 
     pseudoData := []byte{byte(i)} 

     for _, handler := range processors { 
      handler.NewData(pseudoData) 
     } 
    } 
} 

func main() { 
    // file writer 
    fileUpdate := &fileWriter{} 
    processors = append(processors, fileUpdate) 

    // sql writer 
    sqlUpdate := &sqlWriter{} 
    processors = append(processors, sqlUpdate) 

    sqlUpdate.Start() 
    fileUpdate.Start() 

    go receiver() 

    fmt.Scanln() 
}