채널
채널의 동작 원리
- 채널은 오케스트레이션을 위해서 존재한다.
- 채널을 통해서 고루틴간에 워크플로우를 정해줄 수 있다.
- 큐처럼 fifo로 처리되지만, 큐가 아닌 이벤트 발생을 알려주는 신호라고 생각하는게 맞다.
- 물론 데이터 없이도 신호를 줄 수 있다(close를 통해서)
- 채널은 2종류가 있다.
- buffer가 없는 채널
- buffer가 있는 채널
- buffer가 없는 채널
- 데이터가 전달되었다는 것을 보장 받을 수 있다.
- 대신 좀더 느리다.
- 채널에는 신호를 주는 고루틴과 데이터를 보내는 고루틴이 있고
- 데이터를 보내고 신호를 보내는데, 신호가 응답이 없으면 데이터는 lock상태가 되고, 신호가 올때까지 대기한다.
- buffer가 있는 채널
- 데이터가 전달 되었다는 것을 보장 받을 수 없다.
- 대신 빠르다.
- 데이터를 넣어버리고 바로 가버린다.
- 물론 버퍼가 1개인 채널은 1개 이상 들어오면 기다리게된다.
- 우리는 잘 작동하는 상태가 아닌, 에러가 생길때를 대비해서 프로그래밍해야된다.
- 버퍼가 있는 채널은 버퍼로 인해서 에러를 찾기 어려워 질 수 있기때문에, 그것에 대비해서 프로그래밍을 잘짜야된다.
버퍼없는 채널: 데이터를 담아 신호 주기
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("\n=> Basics of a send and receive\n")
basicSendRecv()
fmt.Printf("\n=> Close a channel to signal an event\n")
signalClose()
}
func basicSendRecv() {
ch := make(chan string) // make를 통해서 채널을 만들고, 내부로 전달될 데이터의 타입은 string이다.
// ch에는 channel을 가르키는 포인터 값이 저장된다.
go func() {
ch <- "hello" // ch에 "hello"라는 string값을 넣어준다.
}()
fmt.Println(<-ch) // ch는 채널에서 다른 값이 들어올때까지 대기하고있다가, 값이 들어오면 출력한다.
}
func signalClose() {
ch := make(chan struct{})
go func() {
time.Sleep(100 * time.Millisecond)
fmt.Println("signal event")
close(ch) //채널을 닫는다. 채널을 닫게되면, 무조건 출력이 false로 나오게된다. 이제 역할이 끝났다라는것을 알리기위해서
//(이때 채널은 2가지 값을 리턴하며 false,와 전달되는 데이터의 zero value가 리턴된다.)
// 채널은 두번 닫을 수 없다.
}()
<-ch //계속 nil값과 false값이 나오므로 대기에서 넘어가게된다.
fmt.Println("event received")
}
=> Basics of a send and receive
hello
=> Close a channel to signal an event
signal event
event received
버퍼없는 채널: 이중신호
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
fmt.Printf("\n=> Double signal\n")
signalAck() // 어떻게 이벤트를 신호로 줄 수 있고, 승인을 기다리다가 처리하는지를 보여준다.
// + 신호가 왔다는 것뿐만아니라, 처리가 완료되었다는 것도 알 수 있다 = 이중신호
fmt.Printf("\n=> Select and receive\n")
selectRecv()
fmt.Printf("\n=> Select and send\n")
selectSend()
fmt.Printf("\n=> Select and drop\n")
selectDrop()
}
func signalAck() {
ch := make(chan string)
go func() {
fmt.Println(<-ch) // 1. 이벤트 발생전까지 대기한다 //3.1 이벤트를 받는다
ch <- "ok done" // 4. 작업 완료 이벤트를 보낸다
}()
ch <- "do this" // 2. 이벤트를 발생시킨다
fmt.Println(<-ch) // 3.2 작업완료 이벤트를 기다린다.
}
=> Double signal
do this
ok done
버퍼없는 채널: select와 받기
func selectRecv() {
ch := make(chan string)
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "work" //work를 보낸다
}()
select { //select를 통해서 어느 채널을 통해서 값이 들어올지 고를 수 있다. 케이스에 값이 들어오게되면 그걸 선택하고
//들어오기전까지는 대기하고 있는다.
case v := <-ch:
fmt.Println(v)
case <-time.After(100 * time.Millisecond):
fmt.Println("timed out")
} // 0.1초안에 값이 안들어오면 기다리지 않고 select문을 종료한다.
}
=> Select and receive
work
- 이벤트 루프를 만들때 select를 쓰는것은 좋은 선택이다.
- 이 코드는 매우 흔한 버그가 있다.
- 0.1초가 지나면 받는 채널은 종료하게 되어있다. 그에따라서 work는 보낼 채널이 없어지고 무한 대기하게 된다.
- 고루틴 누수(leak)가 생긴 것이다.
- 이럴때 해결방안으로는 1개짜리 버퍼가 있는 채널을 쓰는 것이다.
버퍼없는 채널: select와 보내기
func selectSend() {
ch := make(chan string)
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
fmt.Println(<-ch)
}()
select {
case ch <- "work":
fmt.Println("send work")
case <-time.After(100 * time.Millisecond):
fmt.Println("timed out")
}
}
=> Select and send
work
send work
- 이때도 0.1초가 지나면 동일한 무한 대기가 발생할 것이다.
- 1개짜리 버퍼로 해결 할 수 있다.
버퍼있는 채널: select와 버리기
func selectDrop() {
ch := make(chan int, 5)
go func() {
for v := range ch { // 3.v는 ch값을 받아서 range를 돌리도록 한다.(값이 들어오는 동안 for문을 못돌리나???) check
// 값이 들어오는 동안 lock이 걸리나?? check
fmt.Println("recv", v) // 3.1 for문이 돌아
}
}()
for i := 0; i < 20; i++ {
select {
case ch <- i: //1. for문에의해서 i를 ch로 넣어준다 2. 5개를 넣어주면 버퍼가 가득차게된다
fmt.Println("send work", i)
default:
fmt.Println("drop", i) // 3.1이 동작하는 사이 나머지 값들은 ch에 들어갈 수 없으므로 만족하는게 없게되고 default에 들어가게된다.
}
}
close(ch)
}
=> Select and drop
send work 0
send work 1
send work 2
send work 3
send work 4
send work 5
drop 6
drop 7
drop 8
drop 9
drop 10
drop 11
drop 12
drop 13
drop 14
drop 15
recv 0
recv 1
recv 2
recv 3
recv 4
recv 5
drop 16
send work 17
send work 18
send work 19
버퍼가 있는 채널: fan-out
- 일이 20개가 있다. 그걸 데이터베이스에 작업해서 처리하려는데 1개씩 처리하면 오래걸린다.
- 그래서 고루틴 10개를 만들어서 2개씩 처리하게 만든다.
- 이러한 패턴을 fan-out패턴이라고 한다.
- 10개의 고루틴을 만들고, 그 고루틴이 모두 끝날때 까지 기다리는 놈을 만든다.
- 그리고 모든 고루틴이 끝났을때, 그 이후로 프로그램을 진행시킨다.
package main
import (
"fmt"
"log"
"math/rand"
"time"
)
type result struct {
id int
op string
err error
}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
const routines = 10
const inserts = routines * 2
ch := make(chan result, inserts) //20개 버퍼를 갖는 채널을 만든다
waitInserts := inserts
for i := 0; i < routines; i++ { //10개의 고루틴을 실행시키기
go func(id int) {
ch <- insertUser(id)
ch <- insertTrans(id)
}(i)
}
for waitInserts > 0 { //20번 돌리겠다.
r := <-ch // r값을 받을때까지 기다린다 + 10개의 고루틴이 모두 끝날때까지 기다린다
log.Printf("N: %d ID: %d OP: %s ERR: %v", waitInserts, r.id, r.op, r.err)
waitInserts--
}
log.Println("Inserts Complete")
}
func insertUser(id int) result {
r := result{
id: id,
op: fmt.Sprintf("insert USERS value (%d)", id),
}
if rand.Intn(10) == 0 {
r.err = fmt.Errorf("Unable to insert %d into USER table", id)
}
return r
}
func insertTrans(id int) result {
r := result{
id: id,
op: fmt.Sprintf("insert TRANS value (%d)", id),
}
if rand.Intn(10) == 0 {
r.err = fmt.Errorf("Unable to insert %d into USER table", id)
}
return r
}
2020/08/24 18:18:19 N: 20 ID: 0 OP: insert USERS value (0) ERR: <nil>
2020/08/24 18:18:19 N: 19 ID: 0 OP: insert TRANS value (0) ERR: <nil>
2020/08/24 18:18:19 N: 18 ID: 1 OP: insert USERS value (1) ERR: <nil>
2020/08/24 18:18:19 N: 17 ID: 1 OP: insert TRANS value (1) ERR: <nil>
2020/08/24 18:18:19 N: 16 ID: 2 OP: insert USERS value (2) ERR: <nil>
2020/08/24 18:18:19 N: 15 ID: 2 OP: insert TRANS value (2) ERR: Unable to insert 2 into USER table
2020/08/24 18:18:19 N: 14 ID: 3 OP: insert USERS value (3) ERR: Unable to insert 3 into USER table
2020/08/24 18:18:19 N: 13 ID: 3 OP: insert TRANS value (3) ERR: <nil>
2020/08/24 18:18:19 N: 12 ID: 4 OP: insert USERS value (4) ERR: <nil>
2020/08/24 18:18:19 N: 11 ID: 4 OP: insert TRANS value (4) ERR: <nil>
2020/08/24 18:18:19 N: 10 ID: 5 OP: insert USERS value (5) ERR: <nil>
2020/08/24 18:18:19 N: 9 ID: 5 OP: insert TRANS value (5) ERR: <nil>
2020/08/24 18:18:19 N: 8 ID: 6 OP: insert USERS value (6) ERR: <nil>
2020/08/24 18:18:19 N: 7 ID: 6 OP: insert TRANS value (6) ERR: <nil>
2020/08/24 18:18:19 N: 6 ID: 7 OP: insert USERS value (7) ERR: <nil>
2020/08/24 18:18:19 N: 5 ID: 7 OP: insert TRANS value (7) ERR: Unable to insert 7 into USER table
2020/08/24 18:18:19 N: 4 ID: 8 OP: insert USERS value (8) ERR: <nil>
2020/08/24 18:18:19 N: 3 ID: 8 OP: insert TRANS value (8) ERR: <nil>
2020/08/24 18:18:19 N: 2 ID: 9 OP: insert USERS value (9) ERR: <nil>
2020/08/24 18:18:19 N: 1 ID: 9 OP: insert TRANS value (9) ERR: <nil>
2020/08/24 18:18:19 Inserts Complete
select
- select를 사용해서 간단한 프로그램을 만들어보자
package main
import (
"errors"
"log"
"os"
"os/signal"
"time"
)
const timeoutSeconds = 3 * time.Second //프로그램 종료전 3초 시간주기
var (
sigChan = make(chan os.Signal, 1) //운영체제의 신호를 받는다 ctrl c를 받으면 종료하기위해서 만든다.
timeout = time.After(timeoutSeconds) //프로그램이 타임아웃될때 쓰려고 만든다.
complete = make(chan error) //처리가 완료되었다를 알리기위해서 만든다.
shutdown = make(chan struct{}) //시스템 전체에 전파하기위해서 만든다.
)
func main() {
log.Println("Starting Process")
signal.Notify(sigChan, os.Interrupt) // 인터럽트관련 신호를 전부 받는다. + notify함수를 쓰면 sigChan으로 신호를 줄것이다.
// 신호받을 준비가 될때까지 기다리지 않는다. 받지못한다면 그냥 버려질 것이다.
// 그래서 1개짜리 버퍼가 있는 채널을 사용하였다.
log.Println("Launching Processors")
go processor(complete) //프로세서를 고루틴으로 돌리고 변수는 complete채널을 넣는다
- 여기 있는 메인 고루틴은 이벤트 루프 안에 있고 프로그램이 종료될 때까지 무한히 루프를 돌 것이다. 고르기(select)에는 세가지 케이스가 있는데 이는 우리가 신호를 받으려고 하는 채널이 동시에 3개 있다는 뜻이다. sigChan과 timeout, complete가 있다.
ControlLoop: // for문으로 무한로프를 만들고 3가지 케이스를 선택적으로 받는다. sigChan timeout complete
for {
select {
case <-sigChan:
log.Println("OS INTERRUPT")
close(shutdown) // shutdown채널을 닫음으로 시스템 전체를 종료시킬수 있도록 한다.
sigChan = nil //여러번 shutdown이 close되는걸 막기위해서 sigChan의 주솟값을 nil값으로 변경시킨다.
//그러면 여러번 ctrl c를 눌러도 정상작동할 것이다. 그렇지않으면 닫은 채널을 또 닫으려하기때문에 panic이 발생할 것이다.
case <-timeout:
log.Println("Timeout - Killing Program")
//os.Exit will terminate the program immediately.
os.Exit(1) //시간이 경과하면 종료시켜버린다.
case err := <-complete: //시간안에 성공한 것이다.
log.Printf("Task Completed: Error[%s]", err)
break ControlLoop //for문의 처음으로 돌아가기위해서 쓴다. 그래야지 process ended라는 로그가 안찍힌다.
}
}
log.Println("Process Ended")
}
func processor(complete chan<- error) { // 데이터 보내기용 전용 채널이란 의미다
log.Println("Processor - Starting")
var err error
defer func() { //우리는 프로그램이 어떻게 끝나도 에러를 전달하고 끝내고 싶다. 그렇기에 defer를 써서 무조건 마지막에 실행하도록 보장해준다.
if r := recover(); r != nil { //패닉을 회복시키고 종료하고싶다.// 패닉이 들어오면 recover시킨다
log.Println("Processor - Panic", r)
}
complete <- err
}()
err = doWork()
log.Println("Processor - Completed")
}
func doWork() error {
log.Println("Processor - Task 1")
time.Sleep(2 * time.Second)
if checkShutdown() { // 모든 과정에서 종료하라는 말을 들었는지 확인한다-> 채널이 닫혀있는지 여부를 확인해서
return errors.New("Early Shutdown")
}
log.Println("Processor - Task 2")
time.Sleep(1 * time.Second)
if checkShutdown() { // 모든 과정에서 종료하라는 말을 들었는지 확인한다-> 채널이 닫혀있는지 여부를 확인해서
return errors.New("Early Shutdown")
}
log.Println("Processor - Task 3")
time.Sleep(1 * time.Second)
return nil
}
func checkShutdown() bool {
select {
case <-shutdown: //종료하라는 메세지를 받은적이 있는지 체크
log.Println("checkShutdown - Shutdown Early")
return true
default:
return false //종료하라는 메세지가 없는 경우 그냥 패스
}
}
2020/08/24 18:31:27 Starting Process
2020/08/24 18:31:27 Launching Processors
2020/08/24 18:31:27 Processor - Starting
2020/08/24 18:31:27 Processor - Task 1
2020/08/24 18:31:29 Processor - Task 2
2020/08/24 18:31:30 Timeout - Killing Program
exit status 1
- 프로그램이 돌고 있을 때 Ctrl C을 누르면 OS INTERRUPT라고 뜨고 프로그램은 일찍 종료한다.
2020/08/24 18:21:02 Starting Process
2020/08/24 18:21:02 Launching Processors
2020/08/24 18:21:02 Processor - Starting
2020/08/24 18:21:02 Processor - Task 1
^C2020/08/24 18:21:03 OS INTERRUPT
2020/08/24 18:21:04 checkShutdown - Shutdown Early
2020/08/24 18:21:04 Processor - Completed
2020/08/24 18:21:04 Task Completed: Error[Early Shutdown]
2020/08/24 18:21:04 Process Ended
- Ctrt \을 눌러서 종료 신호를 보내면 모든 고루틴의 전체 스택 트레이스(stack trace)를 받을 수 있다.
2020/08/24 18:31:44 Starting Process
2020/08/24 18:31:44 Launching Processors
2020/08/24 18:31:44 Processor - Starting
2020/08/24 18:31:44 Processor - Task 1
2020/08/24 18:31:46 Processor - Task 2
^\SIGQUIT: quit
PC=0x7fff70c3e882 m=0 sigcode=0
goroutine 0 [idle]:
runtime.pthread_cond_wait(0x12201e8, 0x12201a8, 0x7ffe00000000)
/usr/local/go/src/runtime/sys_darwin.go:378 +0x39
runtime.semasleep(0xffffffffffffffff, 0x7ffeefbff678)
/usr/local/go/src/runtime/os_darwin.go:63 +0x85
runtime.notesleep(0x121ffa8)
/usr/local/go/src/runtime/lock_sema.go:173 +0xe0
runtime.stoplockedm()
/usr/local/go/src/runtime/proc.go:2068 +0x88
runtime.schedule()
/usr/local/go/src/runtime/proc.go:2469 +0x485
runtime.park_m(0xc00007cd80)
/usr/local/go/src/runtime/proc.go:2610 +0x9d
runtime.mcall(0x108ca06)
/usr/local/go/src/runtime/asm_amd64.s:318 +0x5b
goroutine 1 [select]:
main.main()
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:6
7 +0x278
goroutine 19 [syscall]:
os/signal.signal_recv(0x108ebb1)
/usr/local/go/src/runtime/sigqueue.go:144 +0x96
os/signal.loop()
/usr/local/go/src/os/signal/signal_unix.go:23 +0x30
created by os/signal.init.0
/usr/local/go/src/os/signal/signal_unix.go:29 +0x4f
goroutine 5 [sleep]:
runtime.goparkunlock(...)
/usr/local/go/src/runtime/proc.go:310
time.Sleep(0x3b9aca00)
/usr/local/go/src/runtime/time.go:105 +0x157
main.doWork(0xc000054768, 0x1)
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:157 +0x14a
main.processor(0xc000096060)
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:138 +0xbc
created by main.main
/Users/hoanhan/work/hoanhan101/ultimate-go/go/concurrency/channel_6.go:58 +0x160
rax 0x104
rbx 0x2
rcx 0x7ffeefbff498
rdx 0x200
rdi 0x12201e8
rsi 0x20100000300
rbp 0x7ffeefbff530
rsp 0x7ffeefbff498
r8 0x0
r9 0xa0
r10 0x0
r11 0x202
r12 0x12201e8
r13 0x16
r14 0x20100000300
r15 0x10863dc0
rip 0x7fff70c3e882
rflags 0x203
cs 0x7
fs 0x0
gs 0x0
exit status 2