go Channel原理 (三)

news/2024/7/8 2:04:59 标签: golang, 开发语言, 后端

Channel

设计原理

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

在主流编程语言中,多个线程传递数据的方式一般都是共享内存。
在这里插入图片描述
Go 可以使用共享内存加互斥锁进行通信,同时也提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。在这里插入图片描述
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。

接收数据

两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。这是一个 生产者 - 消费者 模型,负责接收数据的 goroutine 从 channel 读取一个消息进行消费,channel 起到一个临界区/缓冲区的作用。

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // 省略 debug 内容 …………

   // 如果是一个 nil 的 channel
   if c == nil {
      // 如果不阻塞,直接返回 (false, false)
      if !block {
         return
      }
      // 否则,接收一个 nil 的 channel,goroutine 挂起
      gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
      // 不会执行到这里
      throw("unreachable")
   }

   // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
   // 当我们观察到 channel 没准备好接收:
   // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
   // 2. 缓冲型,但 buf 里没有元素
   // 之后,又观察到 closed == 0,即 channel 未关闭。
   // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
   // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
   if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
      c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
      atomic.Load(&c.closed) == 0 {
      return
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 加锁
   lock(&c.lock)

   // channel 已关闭,并且循环数组 buf 里没有元素
   // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
   // 也就是说即使是关闭状态,但在缓冲型的 channel,
   // buf 里有元素的情况下还能接收到元素
   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(unsafe.Pointer(c))
      }
      // 解锁
      unlock(&c.lock)
      if ep != nil {
         // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
         // 那么接收的值将是一个该类型的零值
         // typedmemclr 根据类型清理相应地址的内存
         typedmemclr(c.elemtype, ep)
      }
      // 从一个已关闭的 channel 接收,selected 会返回true
      return true, false
   }

   // 等待发送队列里有 goroutine 存在,说明 buf 是满的
   // 1. 非缓冲型的 channel。直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
   // 2. 缓冲型的 channel,但 buf 满了。接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

   // 缓冲型,buf 里有元素,可以正常接收
   if c.qcount > 0 {
      // 直接从循环数组里找到要接收的元素
      qp := chanbuf(c, c.recvx)

      // …………

      // 没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 清理掉循环数组里相应位置的值
      typedmemclr(c.elemtype, qp)
      // 接收游标向前移动
      c.recvx++
      // 接收游标归零
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      // buf 数组里的元素个数减 1
      c.qcount--
      // 解锁
      unlock(&c.lock)
      return true, true
   }

   if !block {
      // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
      unlock(&c.lock)
      return false, false
   }
   
   // 构造一个 sudog 并设置相应参数
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }

   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.selectdone = nil
   mysg.c = c
   gp.param = nil
   // 进入 channel 的等待接收队列
   c.recvq.enqueue(mysg)
   // 将当前 goroutine 挂起
   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

   // 被唤醒了,接着从这里继续执行一些扫尾工作
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   // 释放当前 gorountine 的 sudog
   releaseSudog(mysg)
   return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是非缓冲型的 channel
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      
      // 未忽略接收的数据 不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         // 直接拷贝数据,从 sender goroutine -> receiver goroutine
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // 缓冲型的 channel,但 buf 已满。
      // 1. 循环数组 buf 队首的元素拷贝到接收数据的地址
      // 2. 将 sender 的数据入队。
      qp := chanbuf(c, c.recvx)
      // …………
      // 将 recvx 处的数据拷贝给接收者
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }

      // sender data -> buf
      typedmemmove(c.elemtype, qp, sg.elem)
      // 更新索引
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx
   }
   sg.elem = nil
   gp := sg.g

   // 解锁
   unlockf()
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }

   // 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
   goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

从 channel 接收消息 的 核心函数是 chanrecv

跟 send 流程差不多:
特殊情况:

  1. 如果 channel 为空,那么会直接调用 runtime.gopark 挂起当前 goroutine。
  2. 如果 channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回零值。
    正常情况:
  3. 如果 channel 的 sendq 队列中存在挂起的 goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 goroutine 的数据拷贝到缓冲区。
  4. 如果 channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据。
  5. 在默认情况下会挂起当前的 goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒。
    从 channel 接收数据时,会触发 goroutine 调度的两个时机:
  6. 当 channel 为空时。
  7. 当缓冲区中不存在数据并且也不存在数据的发送者时。

http://www.niftyadmin.cn/n/5535902.html

相关文章

【你真的了解double和float吗】

&#x1f308;个人主页&#xff1a;努力学编程’ ⛅个人推荐&#xff1a;基于java提供的ArrayList实现的扑克牌游戏 |C贪吃蛇详解 ⚡学好数据结构&#xff0c;刷题刻不容缓&#xff1a;点击一起刷题 &#x1f319;心灵鸡汤&#xff1a;总有人要赢&#xff0c;为什么不能是我呢 …

C++ :lambda表达式

目录 lambda表达式书写格式&#xff1a; lambda表达式各部分说明&#xff1a; lambda的使用示范&#xff1a; 注意事项&#xff1a; 返回值类型可以省略&#xff0c;参数也可也省略&#xff1a; sort内部也可以直接写lambda表达式&#xff1a; 排序时利用lambda进行排序…

VPN是什么?

VPN&#xff0c;全称Virtual Private Network&#xff0c;即“虚拟私人网络”&#xff0c;是一种在公共网络&#xff08;如互联网&#xff09;上建立加密、安全的连接通道的技术。简单来说&#xff0c;VPN就像是一条在公共道路上铺设的“秘密隧道”&#xff0c;通过这条隧道传输…

通过SimU-Net进行同时深度学习体素分类的纵向CECT扫描肝病灶变化分析| 文献速递-深度学习自动化疾病检查

Title 题目 Liver lesion changes analysis in longitudinal CECT scans by simultaneous deep learning voxel classification with SimU-Net 通过SimU-Net进行同时深度学习体素分类的纵向CECT扫描肝病灶变化分析 01 文献速递介绍 影像学随访是对影像学研究的解读&#x…

VBA打开其他Excel文件

前言 本节会介绍通过VBA实现打开其他excel文件&#xff0c;包括模糊匹配文件名称、循环同时打开多个文件&#xff0c;并获取工作表及工作簿进行数据操作后&#xff0c;对打开的文件进行保存并关闭操作。 一、打开固定文件名称的文件 场景说明&#xff1a; 1.新建一个宏文件VBA…

【MindSpore学习打卡】应用实践-计算机视觉-SSD目标检测:从理论到实现

在计算机视觉领域&#xff0c;目标检测是一个至关重要的任务。它不仅要求识别图像中的目标物体&#xff0c;还需要精确定位这些物体的位置。近年来&#xff0c;随着深度学习技术的飞速发展&#xff0c;各种高效的目标检测算法层出不穷。SSD&#xff08;Single Shot MultiBox De…

[leetcode]minimum-absolute-difference-in-bst 二叉搜索树的最小绝对差

. - 力扣&#xff08;LeetCode&#xff09; /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(null…

音乐发行平台无加密开源源码

适用于唱片公司&#xff0c;用于接收物料&#xff0c;下载物料功能&#xff1a;个人或机构认证&#xff0c;上传专辑和歌曲&#xff0c;版税结算环境要求php7.4Nginx 1、导入数据库 2、/inc/conn.php里填写数据库密码等后台路径/admin&#xff08;可自行修改任意入口名称&…