我们除了可以通过共享内存进行goroutine之间的通信, Go还提供一个特殊的数据类型也可以用于多个goroutine之间通信: chan
虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。
Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据
Channel遵循了先进先出的设计(FIFO),具体规则如下:
因此我们可以把channel当作队列来使用, 这也是Goroutine的通信方式,理解比较直观。
channel是指针类型的数据类型,使用 chan表示, 通过make来分配内存。例如
ch := make(chan int)
这表示创建一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另一端只能从此channel中读出int类型的值。
需要注意,chan TYPE才表示channel的类型。所以其作为参数或返回值时,需指定为xxx chan int类似的格式
1.往channel中发送消息
ch <- VALUE
2.从channel中获取消息
<-ch // 取出消息,直接扔掉
value := <-ch // 从ch中读取一个值并保存到value变量中
value,ok = <-ch // 从ch读取一个值,判断是否读取成功,如果成功则保存到value变量中
for v := range ch // 通过for range语法来取值, 知道ch关闭时退出循环
简单总结为:
比如: alice -> bob 发送一个消息(hello, this is alice)
package main
import (
"fmt"
"time"
)
// sender 不停的想chann里面发送数据
func sender(ch chan string) {
ch <- "hello"
ch <- "this"
ch <- "is"
ch <- "alice"
}
// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
ch := make(chan string)
go sender(ch) // sender goroutine
go recver(ch) // recver goroutine
// 这个可以使用同步组 waitGroup来进行同步等待
time.Sleep(1 * time.Second)
}
上面是最基础的一套逻辑, 那我们如何来处理 通话结束喃?(sender退出)
我们可以在消费发送完了过后,发送一条特别的消息用于说明此次通信结束,比如EOF
// sender 不停的想chann里面发送数据
func sender(ch chan string) {
ch <- "hello"
ch <- "this"
ch <- "is"
ch <- "alice"
// 发送通话结束
ch <- "EOF"
close(ch)
}
// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
for v := range ch {
// 处理通话结束
if v == "EOF" {
return
}
fmt.Println(v)
}
}
func main() {
ch := make(chan string)
go sender(ch) // sender goroutine
go recver(ch) // recver goroutine
// 这个可以使用同步组 waitGroup来进行同步等待
time.Sleep(1 * time.Second)
}
这样的话, 是不是就完整了? 我们似乎忘了挂电话这个逻辑, 对比与channel 就是我们忘记关闭channel了, 如果我们只申请,不关闭的话,会造成内存泄露(chan 没有被释放)
我们使用close这个内置函数来关闭channel:
close(ch)
结合我们上面的逻辑, 就是需要sender方来关闭channel(sender 挂电话)
// sender 不停的想chann里面发送数据
func sender(ch chan string) {
ch <- "hello"
ch <- "this"
ch <- "is"
ch <- "alice"
// 发送通话结束
ch <- "EOF"
close(ch)
}
如果由recver提前把通道关闭了,会发生什么?
// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
for v := range ch {
// 处理通话结束
if v == "hello" {
close(ch)
return
}
fmt.Println(v)
}
}
send方会panic, 导致sender方崩溃掉:
panic: send on closed channel
goroutine 18 [running]:
main.sender(0xc00008c060)
/Users/g7/Workspace/go-course/day10/channel/basic/basic.go:10 +0x55
created by main.main
/Users/g7/Workspace/go-course/day10/channel/basic/basic.go:32 +0x5c
exit status 2
因此我们在关闭channel时 有如下注意事项:
ch := make(chan int)
像上面这种 直接创建出来, 没有指定容量的channel 我们就叫做无缓冲管道,
无缓冲管道特点:
我们可以看出来无缓冲管道是 阻塞的, 常用于同步通信模式, 比如 我们利用这个特性来编写一个同步等待
// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string, down chan struct{}) {
defer func() {
down <- struct{}{}
}()
for v := range ch {
// 处理通话结束
if v == "EOF" {
return
}
fmt.Println(v)
}
}
func main() {
ch := make(chan string)
down := make(chan struct{})
go sender(ch) // sender goroutine
go recver(ch, down) // recver goroutine
<-down
}
同步编程案例: 利用unbuffered同步的特性,我们可以完成多个协程间的同步协作, 比如之前讲的: 协程A和B交替打印内容
func A() {
fmt.Print("1")
fmt.Print("2")
fmt.Print("3")
}
func B() {
fmt.Print("A")
fmt.Print("B")
fmt.Print("C")
}
具体过程:
package main
import (
"fmt"
"time"
)
func A(startA, startB chan struct{}) {
a := []string{"1", "2", "3"}
index := 0
for range startA {
if index > 2 {
return
}
fmt.Println(a[index])
index++
startB <- struct{}{}
}
}
func B(startA, startB chan struct{}) {
b := []string{"x", "y", "z"}
index := 0
for range startB {
fmt.Println(b[index])
index++
startA <- struct{}{}
}
}
func main() {
startA, startB := make(chan struct{}), make(chan struct{})
go A(startA, startB)
go B(startA, startB)
startA <- struct{}{}
time.Sleep(1 * time.Second)
}
我们在一个协程里面 使用同步管道发送信息如下:
func TestDeadLock(t *testing.T) {
ch := make(chan string)
// send
{
ch <- "hello"
}
// receive
{
fmt.Println(<-ch)
}
}
当channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作时,也就是说两端都因为对方而使得自己当前处于阻塞状态,这时将会出现死锁问题。
更通俗地说,只要所有goroutine都被阻塞,就会出现死锁
要修复这个问题 我们有2种办法:
func TestDeadLockV1(t *testing.T) {
ch := make(chan string)
// send
go func() {
ch <- "hello"
}()
// receive
{
fmt.Println(<-ch)
}
}
当然还有一种方式 使用带缓冲管道
func TestDeadLockV2(t *testing.T) {
ch := make(chan string, 1)
// send
{
ch <- "hello"
}
// receive
{
fmt.Println(<-ch)
}
}
buffered channel有两个属性:容量和长度:和slice的capacity和length的概念是一样的
创建buffered channel的方式为make(chan TYPE,CAP), 比如我们创建一个容量为5的 int chan:
ch := make(chan int, 5)
len(ch) // 通过len 获取channel当前已缓冲的数据个数
sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞 receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据
总结下buffered channel的特性:
对比我们之前的例子, 现在就不是打电话了, 现在就是短信, 或者留言:
// 沿用之前同步模式的例子, 现在我们把信道改成 带缓冲的
func BufferedChan() {
ch := make(chan string, 5)
down := make(chan struct{})
go sender(ch) // sender goroutine
go recver(ch, down) // recver goroutine
<-down
}
猜猜recver能获取到消息吗?
由于是异步的, 我们需要等到recver把消息处理完成后 才能关闭,如果我们提前关闭,而recver还没有取出消息,就会导致消息丢失,因此我们需要怎么办?
我们处理完成后给sender发送一条消息,告诉他 已经处理完成:
package basic
import "fmt"
func senderV2(ch chan string, down chan struct{}) {
ch <- "hello"
ch <- "this"
ch <- "is"
ch <- "alice"
// 发送通话结束
ch <- "EOF"
// 同步模式等待recver 处理完成
<-down
// 处理完成后关闭channel
close(ch)
}
// recver 循环读取chan里面的数据,直到channel关闭
func recverV2(ch chan string, down chan struct{}) {
defer func() {
down <- struct{}{}
}()
for v := range ch {
// 处理通话结束
if v == "EOF" {
return
}
fmt.Println(v)
}
}
func BufferedChan() {
ch := make(chan string, 5)
down := make(chan struct{})
go senderV2(ch, down) // sender goroutine
go recverV2(ch, down) // recver goroutine
<-down
}
buffered channel异步队列请求示例
我们参考Go的老版调度实现一个任务工作队列
我们实现的功能大致如下:
package chnanel
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
JobID int
Status string
CreateTime time.Time
}
func (t *Task) Run() {
sleep := rand.Intn(1000)
time.Sleep(time.Duration(sleep) * time.Millisecond)
t.Status = "Completed"
}
var wg sync.WaitGroup
// worker的数量,即使用多少goroutine执行任务
const workerNum = 3
func RunTaskWithPool() {
wg.Add(workerNum)
// 创建容量为10的buffered channel
taskQueue := make(chan *Task, 10)
// 激活goroutine,执行任务
for workID := 0; workID <= workerNum; workID++ {
go worker(taskQueue, workID)
}
// 将待执行任务放进buffered channel,共15个任务
for i := 1; i <= 15; i++ {
taskQueue <- &Task{
ID: i,
JobID: 100 + i,
CreateTime: time.Now(),
}
}
wg.Wait()
//记得关闭channel
close(taskQueue)
}
// 从buffered channel中读取任务,并执行任务
func worker(in chan *Task, workID int) {
defer wg.Done()
for v := range in {
fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
v.Run()
fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
}
}
准确来说,channel是有方向的(数据流向), 如果我们错误的使用了, 比如本来是要从channel中读取数据的,但是我们去错误的进行了写操作,这很容易造成死锁.
因此我们把channel作为参数传达的时候,可以指定方向(读-out或者写-in)
我们优化下我们之前的worker, 因为之前的worker, 如果是可以执行写的
func produceTask(out chan<- *Task) {
// 将待执行任务放进buffered channel,共15个任务
for i := 1; i <= 15; i++ {
out <- &Task{
ID: i,
JobID: 100 + i,
CreateTime: time.Now(),
}
}
}
// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
defer wg.Done()
for v := range in {
fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
v.Run()
fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
}
}
为了获得更好的性能, 我们可能选择的队列实现方式有:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。