프로그래밍/golang

ultimate in go-(10) data race

나도한강뷰 2023. 1. 29. 01:35

Data race

race detection

  • 프로그램에 고루틴을 추가하면 복잡도가 올라간다. 프로그램의 모든 과정이 stateless할 수 없기때문에, 서로간에 순서나 정보가 중요하게된다.

  • 3가지 선택지가 있다.

    • waitGroup같은 메모리 공유 패키지를 이용하는 것
    • 서로간에 합리적으로 작동하도록 만드는 것(atomic하게)
    • 채널을 이용하는것
  • atomic한게 가장 빠르고, 메모리를 공유하는게 다음으로, 제일 느린것은 채널이다.

  • 물론 성능에 큰이슈가 없으면 채널을 쓰는게 베스트라고 생각한다(개인적 견해)

    package main
    import (
      "fmt"
      "runtime"
      "sync"
    )
    var counter int
    func main() {
    
      const grs = 2 
    
      var wg sync.WaitGroup // 메모리 동기화 패키지
      wg.Add(grs) //메모리 동기화 2추가
      for i := 0; i < grs; i++ { //고루틴 2개실행
          go func() {
              for count := 0; count < 2; count++ {
                  value := counter
                  runtime.Gosched() //prd환경에서는 절대 쓰지않는 코드, 다른 고루틴에게 스레드를 양보하는 행위
                  value++
                  counter = value
              }
              wg.Done()   //메모리 동기화 1감소
          }()
      }
      wg.Wait() //메모리동기화 0까지 대기
      fmt.Println("Final Counter:", counter)
    }

To identify race condition : go run -race .

==================
WARNING: DATA RACE
Read at 0x000001228340 by goroutine 8:
main.main.func1()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/data_race_1.go :65 +0x47
Previous write at 0x000001228340 by goroutine 7: main.main.func1()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/data_race_1.go
:75 +0x68
Goroutine 8 (running) created at: main.main()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/data_race_1.go :62 +0xab
Goroutine 7 (finished) created at: main.main()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/data_race_1.go :62 +0xab
==================
Final Counter: 4    //다른 고루틴에게 스레드를 양보했기때문에 value라는 값, counter라는 값이 공유되어서 같은 메모리에 값이 쌓이게 된것
Found 1 data race(s)
exit status 66

Gosched: https://stackoverflow.com/questions/13107958/what-exactly-does-runtime-gosched-do

해결방식 1. atomic

package main
import (
    "fmt"
    "runtime"
    "sync"
    "sync/atomic"
)
var counter int64
func main() {

    const grs = 2

    var wg sync.WaitGroup
    wg.Add(grs)

    for i := 0; i < grs; i++ {
        go func() {
            for count := 0; count < 2; count++ {
                atomic.AddInt64(&counter, 1) //atomic하게 계산하도록 패키지 사용+ 정확한 타입명 기입필수
                runtime.Gosched()
            }
        wg.Done()
        }()
    }
    wg.Wait()
    fmt.Println("Final Counter:", counter)
}
Final Counter: 4
  • 이때는 runtime.Gosched이 있어도, 이미 atomic에서 처리를 해버렸기때문에 영향이 없어진다.

해결방식 2. mutex

package main
import (
    "fmt"
    "sync"
)
var (
    counter int
    mutex sync.Mutex //mutex선언
)
func main() {
    const grs = 2

    var wg sync.WaitGroup
    wg.Add(grs)

    for i := 0; i < grs; i++ {
        go func() {
            for count := 0; count < 2; count++ {
                mutex.Lock() //memory 잠금
                {
                    value := counter    //couter라는 공유메모리를 점유하게됨
                    value++
                    counter = value
                }
                mutex.Unlock() //memory 잠금해제
            }
            wg.Done()
        }()
    }
    wg.Wait()
    fmt.Printf("Final Counter: %d\n", counter)
}
Final Counter: 4
  • goroutine들이 들어오는 곳에 mutex를 걸고, 먼저 들어온놈이 그곳을 점유한다.(lock)
  • 그리고 그놈이 나가면(unlock) 다른놈이 들어오는 식으로 critical area에 대해서 관리한다.

해결방식 3. RWmutex

package main
import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

var (
    data []string
    rwMutex sync.RWMutex  //read write에 대해서 구분해서 mutex를 걸수있는 패키지, 단순 mutex보다는 느리다.
    readCount int64
)
func init() {
    rand.Seed(time.Now().UnixNano())
}
func main() {
    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) //read먼저 실행될 수 있도록 조작
            writer(i) //쓴다
        }
        wg.Done()
    }()

    for i := 0; i < 8; i ++ {
        go func(i int) {
            for {
                reader(i) //계속 읽는다
            }   
        }(i)
    }

    wg.Wait()
    fmt.Println("Program Complete")
}

func writer(i int) {
    rwMutex.Lock()
    {
        rc := atomic.LoadInt64(&readCount)
        fmt.Printf("****> : Performing Write : RCount[%d]\n", rc)
        data = append(data, fmt.Sprintf("String: %d", i))
    }
    rwMutex.Unlock()
}
func reader(id int) {
    rwMutex.RLock() //쓰기작업이 이루어지지 않을때 읽을 수 있다.
    {
        rc := atomic.AddInt64(&readCount, 1)
        time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
        fmt.Printf("%d : Performing Read : Length[%d] RCount[%d]\n", id, len(data), rc)
        atomic.AddInt64(&readCount, -1)
    }
    rwMutex.RUnlock()
}

출력은 이와 유사하게 잠긴다.

0 : Performing Read : Length[0] RCount[1]
4 : Performing Read : Length[0] RCount[5]
5 : Performing Read : Length[0] RCount[6]
7 : Performing Read : Length[0] RCount[7]
3 : Performing Read : Length[0] RCount[4]
6 : Performing Read : Length[0] RCount[8]
4 : Performing Read : Length[0] RCount[8]
1 : Performing Read : Length[0] RCount[2]
2 : Performing Read : Length[0] RCount[3]
5 : Performing Read : Length[0] RCount[8]
0 : Performing Read : Length[0] RCount[8]
7 : Performing Read : Length[0] RCount[8]
7 : Performing Read : Length[0] RCount[8]
2 : Performing Read : Length[0] RCount[8]
...
1 : Performing Read : Length[10] RCount[8]
5 : Performing Read : Length[10] RCount[8]
3 : Performing Read : Length[10] RCount[8]
4 : Performing Read : Length[10] RCount[8]
6 : Performing Read : Length[10] RCount[8]
7 : Performing Read : Length[10] RCount[8]
2 : Performing Read : Length[10] RCount[8]
2 : Performing Read : Length[10] RCount[8]
  • rwmutex는 쓰는동안에 읽는것을 하지못하도록 막는다.(Rlock)
  • 그래서 처음에는 쓰기로 돌입하기전 한번씩 다 읽게 되어서 lengthrk 0인걸 읽는다.
  • 하지만 쓰기에 들어가면 쓰기들의 대기가 없어질때까지 읽지못하게되고
  • 결국 10번쓰기가 끝난뒤에 읽기가 다시 작동하게된다.
  • mutex보다는 조금더 무겁지만, 더욱 정교한 컨트롤이 가능해진다.