61 Star 343 Fork 417

infraboard / go-course

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
channel.md 12.84 KB
一键复制 编辑 原始数据 按行查看 历史
Mr.Yu 提交于 2021-08-07 12:12 . 修改

Chnanel的使用

我们除了可以通过共享内存进行goroutine之间的通信, Go还提供一个特殊的数据类型也可以用于多个goroutine之间通信: chan

虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。

Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据

Channel遵循了先进先出的设计(FIFO),具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利

因此我们可以把channel当作队列来使用, 这也是Goroutine的通信方式,理解比较直观。

发送与接收消息

channel是指针类型的数据类型,使用 chan表示, 通过make来分配内存。例如

ch := make(chan int)

这表示创建一个channel,这个channel中只能保存int类型的数据。也就是说一端只能向此channel中放进int类型的值,另一端只能从此channel中读出int类型的值。

需要注意,chan TYPE才表示channel的类型。所以其作为参数或返回值时,需指定为xxx chan int类似的格式

1.往channel中发送消息

ch <- VALUE

2.从channel中获取消息

<-ch               // 取出消息,直接扔掉
value := <-ch      // 从ch中读取一个值并保存到value变量中
value,ok = <-ch    // 从ch读取一个值,判断是否读取成功,如果成功则保存到value变量中
for v := range ch  // 通过for range语法来取值, 知道ch关闭时退出循环

简单总结为:

  • send: 当ch出现在<-的左边
  • recv: 当ch出现在<-的右边

比如: alice -> bob 发送一个消息(hello, this is alice)

package main

import (
	"fmt"
	"time"
)

// sender 不停的想chann里面发送数据
func sender(ch chan string) {
	ch <- "hello"
	ch <- "this"
	ch <- "is"
	ch <- "alice"
}

// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
	for v := range ch {
		fmt.Println(v)
	}
}

func main() {
	ch := make(chan string)
	go sender(ch) // sender goroutine
	go recver(ch) // recver goroutine

	// 这个可以使用同步组 waitGroup来进行同步等待
	time.Sleep(1 * time.Second)
}

关闭channel

上面是最基础的一套逻辑, 那我们如何来处理 通话结束喃?(sender退出)

我们可以在消费发送完了过后,发送一条特别的消息用于说明此次通信结束,比如EOF

// sender 不停的想chann里面发送数据
func sender(ch chan string) {
	ch <- "hello"
	ch <- "this"
	ch <- "is"
	ch <- "alice"

	// 发送通话结束
	ch <- "EOF"
	close(ch)
}

// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
	for v := range ch {
		// 处理通话结束
		if v == "EOF" {
			return
		}
		fmt.Println(v)
	}
}

func main() {
	ch := make(chan string)
	go sender(ch) // sender goroutine
	go recver(ch) // recver goroutine

	// 这个可以使用同步组 waitGroup来进行同步等待
	time.Sleep(1 * time.Second)
}

这样的话, 是不是就完整了? 我们似乎忘了挂电话这个逻辑, 对比与channel 就是我们忘记关闭channel了, 如果我们只申请,不关闭的话,会造成内存泄露(chan 没有被释放)

我们使用close这个内置函数来关闭channel:

close(ch)

结合我们上面的逻辑, 就是需要sender方来关闭channel(sender 挂电话)

// sender 不停的想chann里面发送数据
func sender(ch chan string) {
	ch <- "hello"
	ch <- "this"
	ch <- "is"
	ch <- "alice"
	// 发送通话结束
	ch <- "EOF"
	close(ch)
}

如果由recver提前把通道关闭了,会发生什么?

// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string) {
	for v := range ch {
		// 处理通话结束
		if v == "hello" {
			close(ch)
			return
		}
		fmt.Println(v)
	}
}

send方会panic, 导致sender方崩溃掉:

panic: send on closed channel

goroutine 18 [running]:
main.sender(0xc00008c060)
	/Users/g7/Workspace/go-course/day10/channel/basic/basic.go:10 +0x55
created by main.main
	/Users/g7/Workspace/go-course/day10/channel/basic/basic.go:32 +0x5c
exit status 2

因此我们在关闭channel时 有如下注意事项:

  • 关闭channel后,send操作将导致painc
  • 关闭channel后,recv操作将返回对应类型的0值以及一个状态码false
  • 如果使用close(),建议条件允许的情况下加上defer, 只在sender端上显式使用close()关闭channel。因为关闭通道意味着没有数据再需要发送

无缓冲管道(unbuffered channel)

ch := make(chan int)

像上面这种 直接创建出来, 没有指定容量的channel 我们就叫做无缓冲管道,

无缓冲管道特点:

  • sender端向channel中send一个数据,然后阻塞,直到receiver端将此数据receive
  • receiver端一直阻塞,直到sender端向channel发送了一个数据

我们可以看出来无缓冲管道是 阻塞的, 常用于同步通信模式, 比如 我们利用这个特性来编写一个同步等待

// recver 循环读取chan里面的数据,直到channel关闭
func recver(ch chan string, down chan struct{}) {
	defer func() {
		down <- struct{}{}
	}()

	for v := range ch {
		// 处理通话结束
		if v == "EOF" {
			return
		}
		fmt.Println(v)
	}
}

func main() {
	ch := make(chan string)

	down := make(chan struct{})
	go sender(ch)       // sender goroutine
	go recver(ch, down) // recver goroutine

	<-down
}

同步编程案例: 利用unbuffered同步的特性,我们可以完成多个协程间的同步协作, 比如之前讲的: 协程A和B交替打印内容

func A() {
    fmt.Print("1")
    fmt.Print("2")
    fmt.Print("3")
}

func B() {
    fmt.Print("A")
    fmt.Print("B")
    fmt.Print("C")
}

具体过程:

package main

import (
	"fmt"
	"time"
)

func A(startA, startB chan struct{}) {
	a := []string{"1", "2", "3"}
	index := 0
	for range startA {
		if index > 2 {
			return
		}
		fmt.Println(a[index])
		index++
		startB <- struct{}{}
	}
}

func B(startA, startB chan struct{}) {
	b := []string{"x", "y", "z"}
	index := 0
	for range startB {
		fmt.Println(b[index])
		index++
		startA <- struct{}{}
	}
}

func main() {
	startA, startB := make(chan struct{}), make(chan struct{})
	go A(startA, startB)
	go B(startA, startB)

	startA <- struct{}{}
	time.Sleep(1 * time.Second)
}

同步的噩梦

我们在一个协程里面 使用同步管道发送信息如下:

func TestDeadLock(t *testing.T) {
	ch := make(chan string)
	// send
	{
		ch <- "hello"
	}

	// receive
	{
		fmt.Println(<-ch)
	}
}

当channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作时,也就是说两端都因为对方而使得自己当前处于阻塞状态,这时将会出现死锁问题。

更通俗地说,只要所有goroutine都被阻塞,就会出现死锁

要修复这个问题 我们有2种办法:

  1. 将其中一方变成一个协程, 这样就是A->B, 就不是死循环了
func TestDeadLockV1(t *testing.T) {
	ch := make(chan string)
	// send
	go func() {
		ch <- "hello"
	}()

	// receive
	{
		fmt.Println(<-ch)
	}
}

当然还有一种方式 使用带缓冲管道

func TestDeadLockV2(t *testing.T) {
	ch := make(chan string, 1)
	// send
	{
		ch <- "hello"
	}

	// receive
	{
		fmt.Println(<-ch)
	}
}

带缓冲管道(buffered channel )

buffered channel有两个属性:容量和长度:和slice的capacity和length的概念是一样的

  • capacity:表示bufffered channel最多可以缓冲多少个数据
  • length:表示buffered channel当前已缓冲多少个数据

创建buffered channel的方式为make(chan TYPE,CAP), 比如我们创建一个容量为5的 int chan:

ch := make(chan int, 5)
len(ch)  // 通过len 获取channel当前已缓冲的数据个数

sender端可以向channel中send多个数据(只要channel容量未满),容量满之前不会阻塞 receiver端按照队列的方式(FIFO,先进先出)从buffered channel中按序receive其中数据

总结下buffered channel的特性:

  • 为满之前是 非阻塞、异步模式
  • 满了之后是 阻塞的, 是同步模式

对比我们之前的例子, 现在就不是打电话了, 现在就是短信, 或者留言:

// 沿用之前同步模式的例子, 现在我们把信道改成 带缓冲的
func BufferedChan() {
	ch := make(chan string, 5)

	down := make(chan struct{})
	go sender(ch)       // sender goroutine
	go recver(ch, down) // recver goroutine

	<-down
}

猜猜recver能获取到消息吗?

由于是异步的, 我们需要等到recver把消息处理完成后 才能关闭,如果我们提前关闭,而recver还没有取出消息,就会导致消息丢失,因此我们需要怎么办?

我们处理完成后给sender发送一条消息,告诉他 已经处理完成:

package basic

import "fmt"

func senderV2(ch chan string, down chan struct{}) {
	ch <- "hello"
	ch <- "this"
	ch <- "is"
	ch <- "alice"
	// 发送通话结束
	ch <- "EOF"

	// 同步模式等待recver 处理完成
	<-down
	// 处理完成后关闭channel
	close(ch)
}

// recver 循环读取chan里面的数据,直到channel关闭
func recverV2(ch chan string, down chan struct{}) {
	defer func() {
		down <- struct{}{}
	}()

	for v := range ch {
		// 处理通话结束
		if v == "EOF" {
			return
		}
		fmt.Println(v)
	}
}

func BufferedChan() {
	ch := make(chan string, 5)

	down := make(chan struct{})
	go senderV2(ch, down) // sender goroutine
	go recverV2(ch, down) // recver goroutine

	<-down
}

buffered channel异步队列请求示例

我们参考Go的老版调度实现一个任务工作队列

我们实现的功能大致如下:

  • 有(最多)4个worker,每个worker是一个goroutine,它们有worker ID。
  • 每个worker都从一个队列中取出待执行的任务(Task),进行执行。
  • 队列容量为10, 也就是最多只允许10个任务进行排队
  • 任务的执行方式很简单:随机睡眠0-1秒钟,并将任务标记为完成。
package chnanel

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Task struct {
	ID         int
	JobID      int
	Status     string
	CreateTime time.Time
}

func (t *Task) Run() {
	sleep := rand.Intn(1000)
	time.Sleep(time.Duration(sleep) * time.Millisecond)
	t.Status = "Completed"
}

var wg sync.WaitGroup

// worker的数量,即使用多少goroutine执行任务
const workerNum = 3

func RunTaskWithPool() {
	wg.Add(workerNum)

	// 创建容量为10的buffered channel
	taskQueue := make(chan *Task, 10)

	// 激活goroutine,执行任务
	for workID := 0; workID <= workerNum; workID++ {
		go worker(taskQueue, workID)
	}

	// 将待执行任务放进buffered channel,共15个任务
	for i := 1; i <= 15; i++ {
		taskQueue <- &Task{
			ID:         i,
			JobID:      100 + i,
			CreateTime: time.Now(),
		}
	}

	wg.Wait()

	//记得关闭channel
	close(taskQueue)
}

// 从buffered channel中读取任务,并执行任务
func worker(in chan *Task, workID int) {
	defer wg.Done()
	for v := range in {
		fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
		v.Run()
		fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
	}
}

channel的方向

准确来说,channel是有方向的(数据流向), 如果我们错误的使用了, 比如本来是要从channel中读取数据的,但是我们去错误的进行了写操作,这很容易造成死锁.

因此我们把channel作为参数传达的时候,可以指定方向(读-out或者写-in)

  • in <-chan int:表示channel in通道只用于接收数据
  • out chan<- int:表示channel out通道只用于发送数据

我们优化下我们之前的worker, 因为之前的worker, 如果是可以执行写的

func produceTask(out chan<- *Task) {
	// 将待执行任务放进buffered channel,共15个任务
	for i := 1; i <= 15; i++ {
		out <- &Task{
			ID:         i,
			JobID:      100 + i,
			CreateTime: time.Now(),
		}
	}
}

// 从buffered channel中读取任务,并执行任务
func worker(in <-chan *Task, workID int) {
	defer wg.Done()
	for v := range in {
		fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
		v.Run()
		fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
	}
}

思考? 如何参考最新版的调度器实现一个新版本

  • 本地队列 / 全局队列
  • 调度(添加任务的时候,直接调度)

为了获得更好的性能, 我们可能选择的队列实现方式有:

  • 使用链表实现 FIFO
  • 使用heap实现优先级队列
  • 无锁队列...

总结

  • channel基本用法(读/写/关闭)
  • 无缓冲chnannel与同步通信
  • 缓冲channel与异步通信
  • channel的方向
Go
1
https://gitee.com/infraboard/go-course.git
git@gitee.com:infraboard/go-course.git
infraboard
go-course
go-course
19a3f401ff21

搜索帮助

53164aa7 5694891 3bd8fe86 5694891