news 2026/5/1 9:42:30

go.dev博客阅读-pipelines

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
go.dev博客阅读-pipelines

这篇文章 2014年3月13日发表,作者 Sameer Ajmani

通过灵活的运用chan类型,在 Go 中更高效的处理数据,这里应用领域为健壮高效的流式数据处理,并在安全性问题上做了补充,例如程序异常、内存泄漏、Gc释放等

一些开源类库也沿用了其思想,例如MapReduces、并行处理等

这篇博客要以MapReduces或者生产消费模型的思想去阅读

博客开头的示例

一个比较基础的管道使用

将一组整数通过管道依次平方,最终输出结果

// 将要计算平方的数字,依次添加到chan中,并返回该只允许读的chan// 注意:该chan是无缓冲的,gen函数运行完后,内部的goroutine会依然运行,直到处理完毕funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}// 从传入的只读chan中读取数据,计算平方,再返回chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}funcTestExample(t*testing.T){// chan数据传输:gen → sq → sq → 打印forn:=rangesq(sq(gen(2,3))){t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:39: 16 /Users/www/zero-core/mr/mr_test.go:39: 81

过程中的一些说明

  1. gensq方法中分别创建了各自的 chan 变量,用于写入数据,并返回
  2. 声明 chan 类型后,要养成 close 的习惯,close 后依然可以读,有减缓 Gc 压力
  3. sq(sq(gen(2, 3)))中,三个方法,通过传入 chan 参数实现数据流转,sq方法调用了两次
  4. gensq方法中的 chan 均为无缓冲通道,互相调用时为阻塞模型,也就意味着同一时刻只可能会有一段程序在执行(无论几核)

这里就是使用 chan 类型,实现了一个简陋的 MapReduces 过程

并行处理

官方着重提到的是并行,但至于是否多核并行还是依赖于并发实现

依旧是求平方的案例

// 原始数据无阻塞写入 chan, 注意,这里返回的时候有缓冲的 chanfuncgen(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}// 读取传入的 chan, 并计算平方, 写入 chanfuncsq(in<-chanint)<-chanint{out:=make(chanint)gofunc(){forn:=rangein{out<-n*n}close(out)}()returnout}// 将传入的n个 chan ,用 n 个 goroutine 读取, 并将其写入到 out chan 中funcmerge(cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)// 读取传入的 chan, 并将其写入到 out chan 中output:=func(c<-chanint){deferwg.Done()forn:=rangec{out<-n}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){// 将 2, 3, 4, 9 写入有缓冲的 chan,返回的 chan 用 2 个 sq 方法去接收(2个消费者)in:=gen(2,3,4,9)c1:=sq(in)c2:=sq(in)forn:=rangemerge(c1,c2){// 输出 4 9 81 16(顺序不定)t.Log(n)}}

输出

/Users/www/zero-core/mr/mr_test.go:68: 4 /Users/www/zero-core/mr/mr_test.go:68: 9 /Users/www/zero-core/mr/mr_test.go:68: 81

说明:

  1. c1、c2 相当于2个消费任务去执行,通过内部创建的 goroutinue 去模型多线程多核并行
  2. merge 方法将多个传入的 chan 输出,合并到一个 chan,保证 Reduces 阶段只会有1个输出出口
  3. ❌这里面有个不严谨漏洞,当取数据不是采用 range 方式或者 chan 数据没有取完, chan 的发送方就会阻塞

带取消功能的 chan

并行处理的代码改进,在每个方法中都引入done

funcgen(done<-chanstruct{},nums...int)<-chanint{out:=make(chanint)gofunc(){deferclose(out)for_,n:=rangenums{select{caseout<-n:case<-done:return}}}()returnout}funcsq(done<-chanstruct{},in<-chanint)<-chanint{out:=make(chanint)gofunc(){deferclose(out)forn:=rangein{select{caseout<-n*n:case<-done:return}}}()returnout}funcmerge(done<-chanstruct{},cs...<-chanint)<-chanint{varwg sync.WaitGroup out:=make(chanint)output:=func(c<-chanint){deferwg.Done()forn:=rangec{select{caseout<-n:case<-done:return}}}wg.Add(len(cs))for_,c:=rangecs{gooutput(c)}gofunc(){wg.Wait()close(out)}()returnout}funcTestExample(t*testing.T){done:=make(chanstruct{})deferclose(done)// 保证所有 goroutine 收到取消信号in:=gen(done,2,3,4,9)c1:=sq(done,in)c2:=sq(done,in)out:=merge(done,c1,c2)// 只消费2个值就退出t.Log(<-out)t.Log(<-out)// 此时 done 被 defer 关闭,所有 goroutine 安全退出}
  1. 在每个方法中,都加入了done,内部使用select来监听是否关闭,并return 释放协程
  2. 如果chan没有取完,通过 close 通知 done 的方式,保证不会存在僵尸协程泄漏

但,这个案例还有改进的一步,比如,chan 中有3个值,现在只取了1个就进行了 close 关闭,chan 随是释放了,但内部剩余的2个值可能会发生逃逸现象,等待系统 Gc 释放

如追求性能,一种写法是 close 后,通过手动读取释放,来减缓 Gc 的压力

// 不仅仅 close 还空读取deferfunc(){close(done)forrangeout{}}()

额外注意的点

在多任务消费读取生产数据时

funcgen(nums...int)<-chanint{out:=make(chanint)gofunc(){for_,n:=rangenums{out<-n}close(out)}()returnout}funcgen2(nums...int)<-chanint{out:=make(chanint,len(nums))for_,n:=rangenums{out<-n}close(out)returnout}

这两种方式实现过程结果一样,不同之处在于将生产数据变快,还是读取速度变快

gen循序渐进的放入生产计划中,gen2是一口气家在到生产计划中,具体采用哪种适业务而定

🧠🧠🧠🧠

对官方这篇博客,我的理解是

  1. 每个使用了 chan 的地方,应在适当的时候关闭且释放掉
  2. 每个使用了 chan 的地方应持续从输入 channel 读取,直到关闭或收到取消信号,而不是一口气读一口气写
  3. 不要完全依赖有缓冲的 chan 的 size 解决阻塞问题,缓冲的大小是一个容错作用
  4. 使用关闭的 channel 作为广播取消信号,通知所有上游 goroutine 停止工作。
  5. 使用 WaitGroup 时,务必确保所有任务完成后再关闭输出 channel,先 wait,再 close

原文出处 https://go.dev/blog/pipelines

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 6:57:25

如何用AI快速搭建Google镜像站?3步搞定

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 请生成一个Google镜像网站的完整项目代码&#xff0c;要求&#xff1a;1. 使用Python Flask框架作为后端 2. 前端界面简洁美观&#xff0c;包含搜索框和Logo 3. 实现搜索请求转发功…

作者头像 李华
网站建设 2026/5/1 6:57:26

IDR逆向工程工具:快速掌握Delphi程序分析的终极指南

IDR逆向工程工具&#xff1a;快速掌握Delphi程序分析的终极指南 【免费下载链接】IDR Interactive Delphi Reconstructor 项目地址: https://gitcode.com/gh_mirrors/id/IDR IDR&#xff08;Interactive Delphi Reconstructor&#xff09;作为专业的Delphi逆向工具&…

作者头像 李华
网站建设 2026/4/20 19:37:20

电商系统中的MyBatis Collection实战:订单与商品管理

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个电商订单管理系统的核心模块&#xff0c;要求&#xff1a;1. 实现订单(Order)和订单项(OrderItem)的一对多关系 2. 使用MyBatis collection处理订单详情查询 3. 包含分页查…

作者头像 李华
网站建设 2026/4/30 9:16:54

HACS组件安装VerificationMaterials错误:从排查到修复的完整流程

HACS组件安装VerificationMaterials错误&#xff1a;从排查到修复的完整流程 【免费下载链接】integration HACS gives you a powerful UI to handle downloads of all your custom needs. 项目地址: https://gitcode.com/gh_mirrors/in/integration 当你在Home Assista…

作者头像 李华