CafeM0ca

[Go] Go channel 데이터 파이프라인 본문

Programming/Go

[Go] Go channel 데이터 파이프라인

M0ca 2021. 4. 5. 02:06
반응형

자세한건 이 블로그를 참조하자. 이런 분이 교사 하시면 모두가 점수가 높을 것이다.

이 글의 내용 전문은 다 아래 블로그에서 학습한 내용을 상기하기 위함이다.

http://cloudrain21.com/streaming-data-pipeline-with-go-channel

위 블로그의 go 파트는 웬만하면 다 읽어보자.

파이프라인은 물이 흐르는 통로를 말하는데 컴퓨터에서는 물 대신 데이터가 흐르는 통로로 이해하면 된다.

물을 공급해주는 역할과 공급받은 물을 사용하는 역할이 있을 것이다.

이를 공급쪽은 Producer 혹은 Source라고 하며 사용하는 쪽은 Consumer 혹은 Sink라고 한다. (생산자 소비자 모델을 생각해보자)

Fan-out, Fan-in

  • Fan-out
    • 여러 스트림이 있는 경우 다수의 채널로 부터 데이터를 받게 된다.
  • Fan-in
    • 여러 스트림에서 하나의 스트림으로 데이터를 보내준다.

이 때도 채널이 닫힐 때 까지 데이터를 읽게 된다. 채널이 닫힌 후 데이터가 채널에 남아있으면 그 데이터는 소비할 수 있다.

 

예제 코드. 어렵진 않다.

package main

import (
    "fmt"
    "sync"
)

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(cs))
    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    // 아래 코드를 고루틴으로 처리하지 않으면 메인스레드가 out채널에 값을 넣지 못하고 deadlock에 빠짐
    go func() {
        // outbound 채널에 경쟁적으로 데이터를 전송하고 있다.
        // 누군가는 outbound 채널을 닫아줘야한다.
        // sync.WaitGroup으로 고루틴의 전체 수를 파악하고 갯수가 됐을 때 닫아주면 된다.
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := generate(2, 3)

    // fan-in
    c1 := square(in)
    c2 := square(in)

    // fan-out
    for n := range merge(c1, c2) {
        fmt.Println(n)
    }
}

위 코드에서 main함수만 살짝 바꿔보자

func main() {
  // ... skip
  out := merge(c1, c2)
  fmt.Println(<-out)
  return
}

위 코드의 문제점은 out 채널의 값 하나만 출력하고 있다.

채널에는 2개의 값이 있는데 1개만 출력하니 메모리 릭이 발생한다.

이럴 때는 buffered 채널을 사용한다

buffered channel

func generate(nums ...int) <-chan int {
    // buffered channel
    // 처음 넣을 갯수만큼 채널의 크기를 고정함
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    // 하나만 쓸꺼라고 지정함.  몇개가 들어오든 걍 닫아버림
    out := make(chan int, 1)
}

위 방식은 버퍼의 크기를 정해야하고 버퍼의 크기가 다 찰 때도 문제라 근본적인 해결책이 아니다.

Cancellation

이 문제를 해결하기 위해 Upstream 쪽에서 데이터를 보내려고 대기하는 고루틴들에게 cancel 신호를 보내면 된다.

cancel signal을 보내기 위한 채널을 하나 만들어서 sender와 receiver 사이에 연결해주자.

receiver가 더 이상 안받겠다고 하면 sender는 데이터를 그만 보낼 것이다.

main 함수 수정해보면

func main() {
  done := make(chan struct{})
  defer close(done) // main함수가 끝나면 브로드캐스트로 더 이상 채널 운영 안한다고 시그널 보냄
  in := generate(2, 3)
  c1 := square(in)
  c2 := square(in)

  out := merge(done, c1, c2)
  fmt.Println(<-out) // 4나 9가 둘 중에 먼저 출력
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
  var wg sync.WaitGroup
  out := make(chan int)

  wg.Add(len(cs))
  for _, c := range cs {
    go func(c <-chan int) {
      defer wg.Done()
      for n := range c {
        select {
          case out <- n:
        case <- done:    
          // main에서 fmt.Println(<-out) 코드에서 out 채널 값 하나가 빠지면 main이 종료되며 done 채널이 닫힘
          // 채널이 닫히면서 out <- n이 아니라 <- done으로 오게되어 더 이상 out에 값을 넣지 아
              return
        }
      }
    }(c)
  }
  // ...
}

func square(done <- chan struct{}, in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for n := range in {
      select {
      case out <- n * n:
      case <-done:
        // square도 merge와 같은 이유
        return
      }
    }
  }()
  return out
}

지금까지 배운 것들을 토대로 파이프라인 코드를 추상화해서 작성해보면 아래와 같다.

func main() {
  pipeline := BuildPipeline(FirstStage(), SecondStage(), ThirdStage())
  go func() {
    for job := range GetJobs() {
      pipeline.Input(*job)
    }
    pipeline.Close()
  }()

  for result := range pipeline.Output() {
    log.Println(result)
  }
}

각 stage(하나의 작업을 하는 단계)를 구분하여 BuildPipeline()을 통해 파이프라인을 구축하고 각 stage를 BuildPipeline()에서 유기적으로 연결한다. 

이 방식의 장점은 stage가 늘어나도 유연하게 확장할 수 있다.

반응형
Comments