Go 并发编程浅析

May 20, 2024 ⋅ 20 minBack

这篇文章浅析一下 Golang 中的并发编程。

我写技术类文章,主要是写给自己看的,目的是加深对知识的理解以及方便日后查阅。 所以,此文只是作者的一些理解,非常有可能存在错误的理解,如果有错误的地方,欢迎指正。(操,根本没人会看吧,更不要说指正 :)


1. 进程、线程、协程

进程、线程、协程是操作系统中的概念,这里做一个简单的阐述。

进程,我的理解是,你开启一个应用程序就开启了一个进程。 进程是操作系统分配资源的最小单位,进程之间是相互独立的,进程之间的通信需要通过操作系统提供的 IPC 机制。 如果有两个应用程序需要运行,那么就需要有两个进程,这两个进程之间是相互独立的,不受对方阻塞、不能共享内存。

什么是线程呢?

线程是进程的执行单元,是 CPU 调度的最小单位。线程由于都在进程内,所以线程之间是共享进程资源的,线程之间的通信可以通过共享内存来实现。 由于进程之间的通信需要通过内核来实现,所以进程之间的通信成本代价会很大。而线程之间的通信由于共享内存,所以通信成本会小很多, 这也是为什么在进程基础上需要引入线程的原因。 当然线程之间的通信由于共享内存,所以会出现 Race Condition 的问题,需要通过锁机制来解决(这个后面会阐述)。

线程又分为:

  • 用户级线程
  • 内核级线程

简单来说,内核级线程是操作系统运行所存在的线程,在内核级线程中,线程的创建、销毁、切换都是由操作系统来完成的,并且可以访问计算机操作系统的软硬件资源。 而用户级线程是在用户空间中运行的线程,用户级线程的创建、销毁、切换都是由用户自己来完成的,用户级线程不能访问计算机操作系统的软硬件资源,只能访问自身线程所处的进程的资源空间

如果用户线程想要访问内核线程,必须要通过操作系统暴露出来的接口来访问。这种由应用程序(用户线程)向操作系统发出的服务请求,我们称之为「系统调用」

由于,CPU 在线程之间的切换比进程之间的切换要快很多。所谓的并发编程,是 CPU 在不同进程的线程间高速切换,从而实现在一个时间段内多个线程同时执行的效果。

什么是协程?

协程的英文名是 Coroutine,这里需要区分下文的 Goroutine 的概念。

协程是一种用户态的轻量级线程,协程是由用户自己控制的。如果说线程是物理上的执行单元,那么协程就是程序代码在逻辑上封装出来的调度单元。 一个线程中可以封装为多个协程,有了协程就使得 CPU 切换不是单独在线程上进行切换,而是在不同线程的不同协程中跑。 但一个线程中的协程执行是串行的。 所以有了协程,就可以将调度任务的粒度切分的更小,从而使得协程的调度更加高效。

综上所述:

  • 进程更多是起到隔离资源的作用,并且管理内部线程对于资源的访问
  • 线程 CPU 调度的最小单位,CPU 在线程间高速切换,实现并发执行
  • 协程是用户态的轻量级线程,是在逻辑上封装出来的调度单元

2. 同步与异步

熟悉 JavaScript 中的 Event Loop 的都对同步与异步有所了解,这里再阐述一下。

// function call
A()
B()

首先有两个函数 A 和 B,如果 A 函数执行完毕后再执行 B 函数,那么这两个函数就是同步执行的。 如果 A 函数执行的过程中,不等 A 函数执行完毕就执行 B 函数,那么这两个函数就是异步执行的。 也就是说,异步的概念是不用等待上一段代码执行完毕后,就可以执行下一段代码。

异步是如何做到的呢?

我个人的观点是:线程是执行异步的条件,也就是说异步的本质就是多线程。

在 JavaScript 中,由于 JavaScript 是一门解释性语言,需要解释器来执行。JavaScript 代码执行的环境又分为浏览器和 Node.js 环境。 如果是在浏览器环境中,渲染引擎进程中有 JS 引擎线程,渲染进程中有 GUI 渲染线程,事件触发线程,定时器线程等等。如果遇到了异步任务,就会将这个任务交给其他线程来执行, 比如,setTimeout 交给定时器线程来执行,ajax 交给网络线程来执行,事件监听交给事件触发线程来执行,最后将执行结果放到任务队列中,通过 Event Loop 机制调用回调函数。

所以 JavaScript 中的异步过程是一个「异步任务串行」的过程,即最后执行的结果是确定的(伪异步)。它是启动额外的功能性线程来辅助单线程来完成异步任务。

但是在 Golang 中,它的异步是通过 Goroutine 来实现的,这是一种「异步任务并发」的过程,即 CPU 先处理哪个线程都是不确定的、随机的,所以最后的执行结果都是不确定的

综上所述:

  • 异步的本质是开启多线程
  • JavaScript 中的异步是伪异步,执行结果确定
  • Golang 中的异步是并发执行,执行结果不确定

3. 并行与并发

并行与并发是一个很简单的概念。

简单来说:并行是同一时刻执行多个任务,是真正的同时执行任务;并发是在同一时间段内执行多个任务,在宏观层面上多个任务一起执行,但在微观的角度上,没有任务是同时一起执行的。

我个人的理解是,并行由于是多个任务在同一时刻执行,所以需要多核 CPU 来支持。 由于任务需要在同一时刻执行,所以任务之间是相互独立的,任务之间不会相互阻塞,这就导致了并行这个概念是不相互影响,没有关联的,而进程的各自独立性正好满足这个特点, 所以个人观点是,并行是多个进程之间的概念

当然单核 CPU 也可以在不同进程间来换切换,但是进程的切换是由操作系统来调度的,这个切换的开销是比较大的。所以我觉得,一般说进程是和并行绑定,线程、协程是和并发绑定。

并发是在同一时间段内执行多个任务,这个时间段内多个任务是交替执行的。并发任务的执行,是通过 CPU 在不同线程间高速切换来实现的。 所谓的高并发是服务器同时处理大量的请求,多线程是解决高并发问题的一种解决方案。但多线程之间通信会有锁的问题、线程安全的问题,所以多线程编程还是比较复杂的。


4. Go routine

Go routine (简称 G)是 Golang 语言中的协程,是最基本的并发执行单元。 G 是由 Go 语言的运行时调度器来调度的,而不是由操作系统来调度的,这样就避免了操作系统调度线程的开销。

在 Golang 之前的理念是,一个线程对应一堆协程,并且这些协程是固定对应于一个线程。 但是,Golang 的理念在于,虽然一个协程对应一个线程,但该协程有可能动态变化到其他的线程上

是否依赖内核是否支持并发
Thread
Co routine
Go routine

Golang 中,可以直接在函数或方法调用前加一个 go 关键字就可以创建一个 Go routine(G)。

go func(){

}()

Go 程序中至少有一个 G,即 main G。CPU 会在操作系统层面上的线程中不断切换,而 G 是由 Go 语言的运行时(Go runtime)调度器来调度的。 Go 运行时会自动将 m 个 G 分配到 n 个线程(Machine)上运行,实现 m:n 的调度机制。


5. GMP

GMP 是 Golang 的运行时调度器的三个核心组件。

  • G:Goroutine,即协程,是 Go 语言并发的基本单位。
  • M:Machine,即内核级线程。
  • P:Processor,即调度器,是 Go 中的调度器,用于调度 G 到 M 上运行。P 可以有多个。
image in light mode

如图所示,GMP 调度模型。GMP 的调度流程大致如下:

Go 中的 P 调度器可以有多个 P,每个 P 都有自己的本地队列,队列中存放 G,默认 P 本地队列最多可以存放 256 个 P。 如果内核线程想要运行任务,就需要和 P 进行关联,从 P 的 local queue 中获取 G。 如果 P 的本地队列中没有 G,就会从全局队列中获取 G,全局队列中的 G 是所有 P 共享的。 当从全局队列中获取 G 时,全局队列会有一把锁机制,即同时只能有一个 P 从全局队列中拿 G。 如果全局队列中也找不到 G 时,就会从其他 P 的本地队列中偷取 G,这个过程叫做「work stealing」。 P 的本地队列中也都会有一把锁机制,当有其他的 P 来窃取自己本地队列的 G 时,就会加锁,防止其他 P 来窃取自己的 G。 最终,拿到可运行的 G 后,M 运行 G,G 执行完毕后,M 会从 P 中获取下一个 G,不断重复下去。

当用 go 关键字创建一个 G 时,G 会被放到 P 的本地队列中,如果当前 P 的本地队列是满的,Go 就会将当前 P 本地队列中的一半 G (128 个 G) 都放入全局队列中。

更多相关的内容,可以参考 Golang GMP模型


6. Race Condition

对于多线程并发编程来说,最大的问题是有关「数据竞争」的问题,也就是我们常说的 Race Condition。由于异步任务的执行是不确定的, 谁先执行完谁后执行都是不确定的,所以就会出现多个线程同时访问同一个共享资源进行读写,导致的数据获取不确定的问题

在 Golang 中,如果有多个 G 同时访问同一个资源,并对这个资源进行修改,就会出现竞争问题。竞争关系使得变量的值变得不可预测, 所以当一个 G 操作一个资源或变量的时候,如果在其他的 G 中也会对这个资源进行操作,就需要用「锁机制」来让这个资源在同一时刻只能被一个 G 访问。 这就是我们常说的 Mutex「互斥锁」。

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup
var count int

func plus() {
	defer wg.Done()

	for i := 0; i < 1000; i++ {
		count++
	}
}

func main() {
	wg.Add(2)

	go plus()
	go plus()

	wg.Wait()
	fmt.Println(count)
}

如上代码所示,两个 G 同时对 count 进行加 1000 操作。如果是正常的逻辑,我们期望的结果是 2000,并且开启 G 来帮我们做这件事情。 如果没有开启 Go routine,则按照串行的方式来执行,结果是 2000。 但由于开启了 Go routine,两个 G 同时对 count 进行加 1000 操作,最终的结果是不确定的,因为 count++ 并不是一件原子操作(Atomic Operation),而是分为三步:

  • 读取 count 的值 temp := count
  • 对 temp 进行加 1 操作 temp++
  • 将 temp 的值赋给 count count = temp

所以,这一个步骤由三步组成,当两个 G 同时对 count 进行操作的时候,有可能读取到相同的初始值,然后对其进行加 1 操作,最会写会相同的结果,这样一些增加的操作就会丢失。

解决 Race Condition 的方法有很多,比如:

  • 互斥锁 Mutex
  • 原子操作 Atomic
  • 通道 Channel

这里先介绍 互斥锁 和 原子操作,Channel 通道在下一节阐述。Golang 中的 Mutex 是通过 sync 包来实现的,Mutex 有两个方法:LockUnlock,分别用来加锁和解锁。 原子操作 Atomic 是通过 sync/atomic 包来实现的,原子操作是一种不可分割的操作,要么全部执行成功,要么全部执行失败(类似于事务)。

方法1: 互斥锁

var lk sync.Mutex

func plus() {
  defer wg.Done()

  for i := 0; i < 1000; i++ {
    lk.Lock()
    count++
    lk.Unlock()
  }
}

方法2: 原子操作

import "sync/atomic"

func plus() {
  defer wg.Done()

  for i := 0; i < 1000; i++ {
    atomic.AddInt32(&count, 1)
  }
}

方法3: 通道 Channel

var wg sync.WaitGroup
var ch = make(chan int, 1)

func plus() {
	defer wg.Done()

	for i := 0; i < 1000; i++ {
		temp := <-ch
		temp++
		ch <- temp
	}
}

func main() {
	wg.Add(2)

  // ch <- 0  // 当channel 无缓冲区时 会死锁,unbuffered channel

	go plus()
	go plus()

	ch <- 0 //  需要先有接收者,再才能往通道中发送数据,否则死锁

	wg.Wait()
	fmt.Println(<-ch)
}

7. Channel

多线程并发编程中,我们可以使用共享内存的方式来进行线程间的通信。 但是共享内存的方式会有很多问题,比如 Race Condition 等问题。 为了解决这些问题,Golang 提供了 Channel 通道来进行线程间的通信。

Golang 的理念是:不要通过共享内存来通信,而是通过通信来共享内存。

Channel 是一种特殊的类型,类似于队列,声明一个 channel 的时候需要指定数据类型,比如 chan int。 Channel 存在的目的就是为了让不同的 G 之间具有通信能力,让 G1 和 G2 之间有数据交互的能力。因为,如果不进行数据交互,单纯的并发编程是没有意义的。

Channel 有两种类型:

  • unbuffered channel
  • buffered channel

无缓冲区的 Channel,是指当前的 channel 没有存储空间。 特点:只有接收方G 需要接受数据的时候,才能在发送方G中发送数据 这意味着,在发送数据的时候,必须有一个对应的 接收者G 接受数据,才能完成发送,否则就会在 发送方G 进行阻塞。 无缓冲区的 channel 可以将通信进行同步化,因此,无缓冲区通道也被称为同步通道

有缓冲区的 Channel,是指允许 channel 中存储一定数量的元素。 这意味着发送方 G 可以发送数据到 channel 中,而不需要等待接收方 G 接收数据,只有当 channel 中的元素数量达到缓冲区的容量时,发送方 G 才会阻塞。 而接收方 G 只有在 channel 中没有元素时,并且接收方 G 还想试图从 channel 中接收数据时,接收方 G 才会阻塞。

关闭 channel 可以使用 close(ch) 来进行关闭。一般都是在 发送方G 中进行 channel 的关闭。 当 channel 关闭后,就不能再继续发送数据了,但还可以接收数据。 v, ok := <-ch 可以从 channel 中读取数据,当 ok == false 时,代表关闭了 channel,此时 v 为 channel 类型的零值。 对一个关闭的channel 还可以读取值,直至通道为空。

一般来说,默认都会进行通道的关闭,这是为了更加安全。比如,如果没有关闭 channel,如果 接收方G 还想从 channel 中读取值的时候, 接收方G 就会永久阻塞,这时就会造成死锁。如果在 发送方G 中关闭了 channel,如果 接收方 G 还读取值的时候,则值为零值。

select 语句可以用来监听 channel 的信号,查看哪一个 channel 有信号,则会执行对应的 case 语句。 如果有多个 case 同时满足时,则会随机选择一个 case 语句执行。

for {
  data1, ok1 := <-ch1
  data2, ok2 := <-ch2
}

select {
case data1, ok1 := <-ch1:
  // do something
case data2, ok2 := <-ch2:
  // do something
}

// 常见的一个操作:
loop: {
  for {
    select {
      case data1, ok1 := <-ch1:
        if !ok1 {
          break loop
        }
        // do something
      case data2, ok2 := <-ch2:
        if !ok2 {
          break loop
        }
        // do something
    }
  }
}

8. Deadlock

多线程并发编程除了会带来 race condition 的问题外,还有可能会带来 deadlock(死锁) 的问题。 死锁在操作系统中是指,多个进程或线程因互相等待对方释放资源而处于永久阻塞状态,无法继续推进下去。

而在 Golang 中,死锁是指 G 被永久阻塞而无法继续推进下去,不管是发送方的G,还是接收方的G,只要是 G 并且被永久阻塞,就会造成死锁。

G 如何会被永久阻塞呢?

个人看法是:发送方G 发送数据到 channel 中,但是没有接收方G 来接收数据,这时发送方G 就会永久阻塞; 接收方G 想要从 channel 中接收数据,但是没有发送方G 来发送数据,这时接收方G 就会永久阻塞。

死锁的一些场景:

  • unbuffered channel 中,发送方G 想要发送数据,但是没有接收方G 来接收数据,则发送方G 就会永久阻塞。(场景1)
  • unbuffered channel 中,接收方G 想要接收数据,但是没有发送方G 来发送数据,则接收方G 就会永久阻塞。(场景2)
  • buffered channel 中,发送方G 想要发送数据,但是 channel 已经满了,并且没有接收方 G 接受数据,则发送方G 就会永久阻塞。(场景3)
  • buffered channel 中,接收方G 想要接收数据,但是 channel 中没有数据,并且没有发送方 G 发送数据,则接收方G 就会永久阻塞。(场景4)
  • nil channel,读取写入数据,都会永久阻塞。(场景5)
  • for range channel,如果 channel 没有关闭,接收方G 会永久阻塞。(场景6)

注意: 在 buffered channel 中,即使没有接收方G 来接收数据,发送方G 也不会永久阻塞,因为 channel 中还有缓冲区(缓冲区足够),可以存储数据。

// 场景1
func main() {
	ch := make(chan int)

  // 只有接收方G 需要接受数据的时候,才能在发送方G中发送数据
	ch <- 1 // main 卡住,死锁

	go func() {
		<-ch
	}()
}

// 场景2
func main() {
  ch := make(chan int)

  go func(){
    ch <- 1
  }()

  v1, ok1 := <-ch
  v2, ok2 := <-ch // 如果没有关闭channel,则没有值可以接收,永久阻塞

  fmt.Println(v1, ok1)
  fmt.Println(v2, ok2)
}

// 场景3
func main() {
	c := make(chan int, 2)
	c <- 1
	c <- 2
	c <- 3
}

// 场景4 类似于 场景2,只是 channel 是有缓冲区的

// 场景5 不解释...

// 场景6
func main() {
	ch := make(chan int, 2)

	wg.Add(1)
	go func() {
		defer wg.Done()
		//defer close(ch) // 如果不关闭channel,接收方G 会永久阻塞
		for i := 0; i < 10; i++ {
			ch <- i
		}
	}()

	for v := range ch {
		fmt.Println(v)
	}

	wg.Wait()
}


9. Context

Context 是 Golang 中的一个标准库,主要功能是在多个 Goroutine 之间传递上下文信息,包括:取消信号、超时控制(截止时间)、key-value 等。 相同的 context 可以运行在不同的 G 中。Context 总结一句:可以在不同的 G 中同步请求特定数据、取消信号以及处理请求截止时间。

当一个 client 向 server 发出请求的时候,假设 server 侧需要 10 秒来返回内容,但当 client 在 2 秒的时候进行中断请求(用户不想要请求结果了),这种情况下, server 还是会处理用户发出的请求。这是我们不愿意看到的,这会造成资源的浪费。所以,当用户切断请求的时候,server 需要知道用户的请求被取消,这个就是 context 最常见的场景, 即在不同的 G 之间同步取消信号。

Context 的数据结构类型如下:

type Context interface {
  Deadline() (deadline time.Time, ok bool) // 返回被取消的时间
  Done() <-chan struct{} // 当 context 被取消或超时,返回一个信号
  Err() error
  Value(key interface{}) interface{}
}

Context 分为两种类型:

  • 根 Context
  • 派生 Context

空 Context 是一个根 Context,它不能被取消,没有截止时间,没有值。context.Background() 返回一个空的 Context,它是所有 Context 树的根节点。

派生 Context 是从一个已有的 Context 中派生出来的,派生 Context 会继承父 Context 的截止时间、取消信号等信息。所以,当父 Context 取消时,子 Context 也会取消。 派生出 Context 的方法有:

1. ctx, cancel := context.WithCancel(context.Background())
2. ctx, cancel := context.WithDeadline(
      context.Background(),
      time.Now().Add(5*time.Second), // 跟一个具体的时间点
  )
3. ctx, cancel := context.WithTimeout(
      context.Background(),
      5*time.Second  // 跟一个时间段
  )

4. ctx := context.WithValue(ctx, "key", "value") // 传递共享数据

超时控制的场景:

func doSth(ctx context.Context) {
  select {
    case <-ctx.Done(): // 当 ctx 被取消或超时,返回一个只读 channel
      err := ctx.Err()
      if errors.Is(err, context.Canceled) {
        fmt.Println("context canceled")
      } else if errors.Is(err, context.DeadlineExceeded){
        fmt.Println("context deadline exceeded")
      }
    // 当过去 3s 后,会向这个 channel 发送一个当前时间
    // 监听到时间,说明已经过去 3s
    case <-time.After(3 * time.Second): // 3s 后返回一个只读 channel
      fmt.Println("doing sth")
  }
}

func main() {
  ctx := context.Background()
  ctx, cancel := context.WithCancel(ctx)

  go func() {
    // time.Sleep(5 * time.Second)
    time.Sleep(2 * time.Second)
    cancel()
  }()

  doSth(ctx)
}

上述超时控制的场景中,当 time.Sleep(2 * time.Second) 时,会在 2s 后向 ctx.Done() 发送一个信号,此时 ctx.Done() 会被 select 监听到,从而执行对应的取消信号 case。 当 time.Sleep(5 * time.Second) 时,会在 5s 后向 ctx.Done() 发送一个信号,但由于在 3s 时,select 会监听到 time.After(3 * time.Second) 返回 channel 的信号,所以会执行对应的 case 语句。


Wrapping Up

填坑「Go 与 TS 语法差异」 一文中,最后提到会写 Golang 并发编程的文章 ...

已填坑 😀

[本文谢绝一切转载,谢谢]

Lesenelir