본문 바로가기

Go

고(golang) 비동기 큐 구현

웹에서 고용량 파일을 다룰때 비동기 시스템은 거의 필수적입니다.
고 언어는 Channel 이라는 기능으로 이를 쉽게 구축할 수 있습니다.
사용한 프레임워크는 Gin 입니다.

go get github.com/gin-gonic/gin@master

쉘에서 Gin 프레임워크를 가져옵니다.

// main.go
r := gin.New()

srv := &http.Server{
    Handler:      r,
    Addr:         fmt.Sprintf(":%d", PORT),
    ReadTimeout:  30 * time.Second,
    WriteTimeout: 30 * time.Second,
}    

r.GET("/file-handler",func(c *gin.Context) {
    queue.AsyncQueue.Enqueue(queue.FileUploadInfo{
        User: *User,
        File: *os.File,
    }
    c.json(http.statusOk, "파일을 넘겨줬습니다.")
}

// Start Server
go func() {
    log.Println("Starting Server")
    if err := srv.ListenAndServe(); err != nil {
        log.Fatal(err)
    }
}()

Gin을 이용해 라우팅을 설정하고 서버를 엽니다.
main 함수에서 눈여겨볼 부분은 file-handler 부분입니다.
queue.AsyncQueue.Enqueue 에 FileUploadInfo 부분을 이용해 정보를 넘겨줍니다.

package queue

var AsyncQueue *Queue
func Init() {
    asyncQueue = NewQueue()
}

type Queue struct {
    FileUploadChannel chan FileUploadInfo

    workingChannel chan bool
}

func NewQueue() *Queue {
    FileUploadChannel := make(chan FileUploadInfo, 1000)

    workingChannel := make(chan bool , 10000)    

    return &Queue{

    FileUploadChannel : FileUploadChannel,

    workingChannel: workingChannel,
}

func (e *Queue) Work() {
    defer func() {
        if r:= recover(); r != nil {
            go e.Work()
        }    
    }()

    for {
        select {
        case uploadInfo := <- e.FileUploadChannel:
            e.workingChannel <- true
            // FileUploadHandler()
            time.Sleep(time.Second*5)
            <-e.workingChannel
        }
    }
}

func(e *Queue) Enqueue(uploadInfo FileUploadInfo) {
    e.FileUPloadChannel <- uploadInfo
}

type FileUploadInfo struct {
    user *User
    file *os.File
}

workignChannel 은 마지막 main.go 에 추가될 Work() 함수에서 한번에 몇개의 비동기큐를 허용할지 정합니다.
사용자가 file-hanlder API를 호출하게 되면 workingChannel 에 true 를 보내주고 대기합니다.
FileUploadHandler 함수를 실행한 후 처리를 다하면 대기열을 풀게 됩니다.

pacakge main

func main() {
    osSignal = make(chan os.Signal, 10000)
    signal.Notify(osSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

    queue.Init()

    r := gin.New()

    srv := &http.Server{
        Handler:      r,
        Addr:         fmt.Sprintf(":%d", PORT),
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
    }    

    // Start Server
    go func() {
        log.Println("Starting Server")
        if err := srv.ListenAndServe(); err != nil {
            log.Fatal(err)
        }
    }()

    <-osSignal

    for i := 0; i < 300; i++ {
        go queue.VimeoQueue.Work()
    }

    // Wait untuk there is no active job in the queue
    for queue.AsyncQueue.Size() > 0 {
        time.Sleep(time.Millisecond * 500)
    }
}

최종 메인 함수

// main.go
osSignal = make(chan os.Signal, 1000)
signal.Notify(osSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

<-osSignal

비동기 큐를 가진 웹서버가 배포가 되어있다고 가정해보면
새로운 배포가 이루어지기 전에 비동기 큐에 있는 모든 작업들을 배출하고 나서 배포가 진행이 되어야 데이터의 유실이 없습니다.
그 기능을 Go 에선 os.Signal 이 담당합니다.