Ich habe Daten, die ich über http erhalten, diese Daten müssen von zwei verschiedenen Funktionen verarbeitet werden. Es ist wichtig, dass sie von jeder Funktion nacheinander verarbeitet werden. In der Datei zum Beispiel: 1,2,3,4,5. Und die Datenbank verzeichnete auch 1,2,3,4,5. Als ein Fifo-Modell. Jetzt habe ich so ein Problem ... Die Daten, die ich habe, läuft kontinuierlich und manchmal kann die Datenbank meine Anfrage erfüllen, die Daten ziemlich lange zu aktualisieren, deshalb kann ich die Datei nicht rechtzeitig aktualisieren. Es ist wichtig für mich, dass die Daten der Datei oder Datenbank hinzugefügt wurden, wenn es möglich ist. Ich könnte gepufferte Kanäle verwenden, aber ich kann nicht wissen, wie viele Daten auf die Verarbeitung in der Warteschlange warten können, möchte ich nicht angeben, die Größe des Puffers ist sicherlich groß. Ich habe versucht, der NewData-Funktion mehr Goroutine hinzuzufügen, aber in diesem Fall werden meine Daten nicht sequentiell geschrieben.Datenverarbeitung durch mehrere Funktionen ist asynchron
Dieser Code zeigt das Problem.
package main
import (
"fmt"
"time"
)
type procHandler interface {
Start()
NewData(newdata []byte)
}
type fileWriter struct {
Data chan []byte
}
func (proc *fileWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *fileWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
type sqlWriter struct {
Data chan []byte
}
func (proc *sqlWriter) Start() {
proc.Data = make(chan []byte)
go func() {
for {
obj := <-proc.Data
time.Sleep(5 * time.Second)
fmt.Printf("proc %T ", proc)
fmt.Println(obj)
}
}()
}
func (proc *sqlWriter) NewData(newdata []byte) {
proc.Data <- newdata
}
var processors = []procHandler{}
func receiver() {
newDataImitateByteRange := 30
for i := 0; i < newDataImitateByteRange; i++ {
pseudoData := []byte{byte(i)}
for _, handler := range processors {
handler.NewData(pseudoData)
}
}
}
func main() {
// file writer
fileUpdate := &fileWriter{}
processors = append(processors, fileUpdate)
// sql writer
sqlUpdate := &sqlWriter{}
processors = append(processors, sqlUpdate)
sqlUpdate.Start()
fileUpdate.Start()
go receiver()
fmt.Scanln()
}
-Code funktioniert: https://play.golang.org/p/rSshsJYZ4h
Ausgang:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.sqlWriter [0] (sleep)
proc *main.fileWriter [2] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [1] (sleep)
proc *main.fileWriter [3] (Display after 5 seconds when the previous channel is processed)
proc *main.sqlWriter [2]
proc *main.fileWriter [4]
proc *main.sqlWriter [3]
proc *main.fileWriter [5]
proc *main.sqlWriter [4]
proc *main.fileWriter [6]
Ich möchte:
proc *main.fileWriter [0]
proc *main.fileWriter [1]
proc *main.fileWriter [2]
proc *main.fileWriter [3]
proc *main.fileWriter [4]
proc *main.fileWriter [5]
proc *main.fileWriter [6]
proc *main.sqlWriter [0] (after 5 seconds passed the handler started execution.)
proc *main.sqlWriter [1] (sleep)
proc *main.sqlWriter [2] (sleep)
proc *main.sqlWriter [3] (sleep)
proc *main.sqlWriter [4] (sleep)
proc *main.sqlWriter [5] (sleep)
proc *main.sqlWriter [6] (sleep)
ich Hilfe hoffen, danke!
Also, warum verwenden Sie goroutines, wenn Sie wollen, dass Dinge sequenziell passieren? Rufen Sie einfach einen Prozessor für alle Ihre Daten und dann für den nächsten auf. – Peter
@Peter Danke für die Antwort. Konsistente Speicherung in jeder Struktur, und nicht im Allgemeinen, muss ich sicherstellen, dass die Funktion zum Speichern jeder Struktur asynchron arbeitet. Mit anderen Worten, ich muss diese Funktionen geben und erwarte keine Antwort von ihr. (newdata) – Feanon
Ich folge nicht. Sie können es sequentiell oder asynchron haben; nicht beides zugleich. – Peter