2017-10-18 5 views
-1

Ich versuche ein System, Worker-Pool/Job-Warteschlange zu erstellen, um so viele http requests wie möglich auf jedem API-Endpunkt zu behandeln. Ich schaute in diese example und bekam es funktioniert gut, außer dass ich stolperte über das Problem, dass ich nicht verstehe, wie die pool/jobqueue auf verschiedene Endpunkte zu erweitern.Golang HTTP-Anfrage-Worker-Pool

Lassen Sie uns skizzieren ein Golang http-Server, der eine Million Anfrage/min über verschiedene Endpunkte und Anforderungstypen hat GET & POST ETC.

Wie kann ich dieses Konzept erweitern? Soll ich für jeden Endpunkt verschiedene Worker-Pools und Jobs erstellen? Oder kann ich verschiedene Jobs erstellen und sie in derselben Warteschlange eingeben und denselben Pool behandeln?

Ich möchte die Einfachheit beibehalten, wenn ich einen neuen API-Endpunkt erstellen muss, muss ich keine neuen Worker-Pools erstellen, damit ich mich nur auf die API konzentrieren kann. Aber auch die Leistung ist sehr wichtig.

Der Code, auf dem ich versuche zu bauen, stammt aus dem zuvor verlinkten Beispiel, here ist ein GitHub 'Gist' von jemand anderem mit diesem Code.

+2

Go's http-Paket startet eine Go-Routine für jede eingehende Verbindung. Wenn Sie nicht über die Verarbeitung von Hintergrundjobs sprechen, scheint dies eine verschwendete Anstrengung zu sein. – squiguy

+0

Ja, das ist korrekt für die Hintergrundverarbeitung. Einige, die eine Weile dauern könnten, um zu beenden, und ich lasse eher eine unkontrollierte Menge von Goroutines los –

+0

Was ist das Problem mit Göroutinen? Sie sind im Grunde die eingebaute Implementierung von Jobwarteschlangen mit asynchroner Unterstützung. –

Antwort

0

Es ist nicht klar, warum Sie Arbeiterpool überhaupt benötigen? Wäre es nicht genug, dass die Goroutines ausreichen?

Wenn Sie von Ressourcen eingeschränkt sind, können Sie die Implementierung von rates limiting in Betracht ziehen. Wenn nicht warum nicht einfach Routinen durchlaufen wie benötigt?

Der beste Weg zu lernen ist zu lernen, wie andere gute Sachen machen.

Werfen Sie einen Blick auf https://github.com/valyala/fasthttp

Schnelle HTTP-Paket für Go. Für hohe Leistung abgestimmt. Null Speicherzuweisungen in heißen Pfaden. Bis zu 10x schneller als net/http.

Sie behaupten:

zu 200K rps von mehr als 1,5 Mio. gleichzeitige Keep-Alive-Verbindungen pro physischen Server

Das sehr beeindruckend serviert, und ich bezweifle, dass Sie tun können, besser mit pool/jobqueue.

1

Eine Sache im Voraus: Wenn Sie einen HTTP-Server betreiben (Go's Standardserver sowieso), können Sie die Anzahl der Goroutines nicht kontrollieren, ohne den Server anzuhalten und neu zu starten. Jede Anfrage startet mindestens eine Goroutine, und Sie können nichts dagegen tun. Die gute Nachricht ist, dass dies normalerweise kein Problem ist, da die Goroutines so leicht sind. Es ist jedoch vollkommen vernünftig, dass Sie die Anzahl der Gorousines, die harte Arbeit leisten, unter Kontrolle halten wollen.

Sie können einen beliebigen Wert einschließlich Funktionen in einen Kanal eingeben. Wenn also das Ziel darin besteht, Code nur in HTTP-Handlern schreiben zu müssen, lassen Sie die Jobs schließen - die Worker wissen nicht (oder interessieren sich nicht), woran sie arbeiten.

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan func() 
var smallPool chan func() 

func main() { 
    // Start two different sized worker pools (e.g., for different workloads). 
    // Cancelation and graceful shutdown omited for brevity. 

    largePool = make(chan func(), 100) 
    smallPool = make(chan func(), 10) 

    for i := 0; i < 100; i++ { 
      go func() { 
        for f := range largePool { 
          f() 
        } 
      }() 
    } 

    for i := 0; i < 10; i++ { 
      go func() { 
        for f := range smallPool { 
          f() 
        } 
      }() 
    } 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay? 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    // Imagine a JSON body containing a URL that we are expected to fetch. 
    // Light work that doesn't consume many of *our* resources and can be done 
    // in bulk, so we put in in the large pool. 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      largePool <- func() { 
        http.Get(job.URL) 
        // Do something with the response 
      } 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    // The request body is an image that we want to do some fancy processing 
    // on. That's hard work; we don't want to do too many of them at once, so 
    // so we put those jobs in the small pool. 

    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      smallPool <- func() { 
        processImage(b) 
      } 
    }() 
    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 

Dies ist ein sehr einfaches Beispiel, um den Punkt zu vermitteln. Es spielt keine Rolle, wie Sie Ihre Worker-Pools einrichten. Sie brauchen nur eine clevere Jobdefinition.Im obigen Beispiel handelt es sich um eine Schließung, aber Sie könnten beispielsweise auch eine Job-Schnittstelle definieren.

type Job interface { 
    Do() 
} 

var largePool chan Job 
var smallPool chan Job 

Nun, ich würde nicht den ganzen Arbeiter Pool Ansatz "einfache" nennen. Du hast gesagt, dein Ziel ist es, die Anzahl der Goroutines (die arbeiten) zu begrenzen. Das erfordert keine Arbeiter überhaupt; es braucht nur einen Begrenzer. Hier ist das gleiche Beispiel wie oben, aber die Verwendung von Kanälen als Semaphore zur Begrenzung der Nebenläufigkeit.

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan struct{} 
var smallPool chan struct{} 

func main() { 
    largePool = make(chan struct{}, 100) 
    smallPool = make(chan struct{}, 10) 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(largePool) light-work 
      // goroutines running. 
      largePool <- struct{}{} 
      defer func() { <-largePool }() // Let everyone that we are done 

      http.Get(job.URL) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(smallPool) hard-work 
      // goroutines running. 
      smallPool <- struct{}{} 
      defer func() { <-smallPool }() // Let everyone that we are done 

      processImage(b) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 
0

Wie zuvor auf Ihrem Server beantwortet, wird jeder Anforderungshandler in mindestens einer Goroutine ausgeführt.

Sie können jedoch weiterhin einen Worker-Pool für parallele Back-End-Aufgaben verwenden, falls erforderlich. Angenommen, einige Ihrer Http-Handler-Funktionen lösen Aufrufe an andere externe APIs aus und "aggregieren" ihre Ergebnisse, sodass die Reihenfolge der Aufrufe in diesem Fall keine Rolle spielt. Dies ist ein Szenario, in dem Sie einen Worker-Pool nutzen und Ihre verteilen können

Beispielcode-Schnipsel:

// build empty response 
    capacity := config.GetIntProperty("defaultListCapacity") 
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0) 

    // search providers 
    providers := getProvidersByCountry(country) 

    // create a slice of jobResult outputs 
    jobOutputs := make([]<-chan job.JobResult, 0) 

    // distribute work 
    for i := 0; i < len(providers); i++ { 
     job := search(providers[i], m) 
     if job != nil { 
      jobOutputs = append(jobOutputs, job.ReturnChannel) 
      // Push each job onto the queue. 
      GetInstance().JobQueue <- *job 
     } 
    } 

    // Consume the merged output from all jobs 
    out := job.Merge(jobOutputs...) 
    for r := range out { 
     if r.Error == nil { 
      mergeSearchResponse(list, r.Value.(*model.ResponseList)) 
     } 
    } 
    return list 

arbeiten, um parallel Dispatching jede Aufgabe einen Arbeiter goroutine haben sie laufen. vollständiges Beispiel eines Workerpools, der "generische" Tasks asynchron ausführt: https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

. worker pool lib verwendet: https://github.com/guilhebl/go-worker-pool