我将把这篇文章分为两部分,在第一部分我将尝试解释管道的基本构建块,在第二篇文章中我将尝试围绕这个设计构建一个通用库。
我最近在阅读Concurrency In Go时遇到了一种叫做管道处理模式的东西。想法是您可以将逻辑功能分解为多个阶段。每个阶段都进行自己的处理,并将输出传递到下一个阶段进行处理。您可以修改彼此独立的阶段,限制阶段的速率等等。
管道基础知识
来看下这两个函数:
func Multiply(value ,multiplier int) int {
return value*multiplier
}
func Add(value,additive int) int {
return value+additive
}
这两个函数很简单,只是对数字进行运算并返回它。您可以将这些视为管道的“阶段”。为了完成管道,我们可以将两个阶段结合起来。
ints := []int{1,2,3,4}
for _,v := range ints {
fmt.Println(multiply(add(multiply(v,2),1),2))
}
在这里可以看到 stage 处理输入,处理后返回相同的类型以供进一步的 stage 处理。
并发管道
这个简单的模型现在可以扩展到利用 go 的通道和 goroutines 来同时执行阶段的处理。在我们这样做之前,我们的管道中必须有以下实体。
Generator,它将负责生成管道处理的输入。这是管道的第一阶段。
阶段,可以在其中执行实际处理。
Canceller,一种通过管道发出取消或处理结束信号的机制。
让我们从生成器开始
func generator(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done: return
case intStream <- i:
}
}
}()
return intStream
}
这个函数将生成一个 goroutine,它将在一个通道上生成数值,并且该函数只是简单地返回那个通道。在此通道上生成的值用作进一步阶段的输入。我们还在函数中传递了 done 通道来退出生成流程,这也称为毒丸模式(Poison Pill Pattern)
扩展相同的想法,我们可以重写乘法和加法函数来并发处理。
func multiply(done <-chan interface{}, intStream <-chan int, multiplier int) chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done: return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
func add(done <-chan interface{}, intStream <-chan int, adder int) chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:return
case addedStream <- i * adder:
}
}
}()
return addedStream
}
旁注:也许我们可以重写这些函数来接受接口或函数,而不是编写两次共享大部分代码的函数,但我试图在这里保持简单。
为了构建管道,我们可以将管道的这些阶段组合为如下形式:
done := make(chan interface{})
intStream := generator(done, 1, 2, 3, 4)
pipeline := add(done, multiply(done, intStream, 2), 5)
for i := range pipeline {
fmt.Println(i)
}
这段代码将在不同的管道中同时运行阶段,通过 go 通道将它们的结果传递到下一个阶段。我们可以通过阅读最后阶段的频道来获得最终结果。
更实际的用例
好的,但是更实用的东西怎么样?我们可以使用这种设计实现一个shopify的爬虫。目的是抓取shopify页面并抓取有用的信息。
这是我们可以遵循的基本设计算法。
转到应用程序目录页面,例如https://apps.shopify.com/browse/all?page=1
从每个应用程序目录页面抓取不同应用程序的 URL。
访问每个应用 URL 抓取应用信息并填充我们的结构。
遍历分页以转到下一个应用程序目录页面并重复。
在管道方面,我们需要构建这样的东西
让我们定义 App 结构
type App struct {
Rating float64
Review int
Name string
Category []string
AppLink string
PageLink string
}
我没有编写实际的抓取代码来保持简短和简单,但是可以轻松增加抓取逻辑而无需更改代码结构。
type ShopifyScraper struct {}
// Generation Stage, This stage will generate app directory URLs.
func (s *ShopifyScraper) Generate(done <-chan bool) chan App {
links := make(chan App)
go func() {
defer close(links)
for i := 0; i < 5; i++ {
select {
case <-done: return
case links <- App{PageLink: fmt.Sprintf("https://apps.shopify.com/browse/all?page=%d", i)}:
}
}
}()
return links
}
// App Directory Scraper Stage, This stage will visit directory page and scrape different app URLs from the page for further processing.
func (s *ShopifyScraper) AppDirectoryScraper(done <-chan bool, input <-chan App) chan App {
apps := make(chan App, 5)
go func() {
defer close(apps)
for app := range input {
// Extract Different App URLs by scraping using app.PageLink
select {
case <-done: return
case apps <- app:
}
}
}()
return apps
}
// App Scraper Stage, This stage will visit app page and scrape app information
func (s *ShopifyScraper) AppScraper(done <-chan bool, input <-chan App) chan App {
apps := make(chan App)
go func() {
defer close(apps)
for app := range input {
// Extract Different App Information by scraping by using app.AppLink
select {
case <-done: return
case apps <- app:
}
}
}()
return apps
}
最后,将各个阶段连接在一起以形成管道。
func (s *ShopifyScraper) Do() []AppInfo {
var result []AppInfo
done := make(chan bool)
defer close(done)
for i := range s.AppScraper(done, s.AppDirectoryScraper(done, s.Generate(done))) {
result = append(result, i)
}
return result
}
在下一篇文章中,我们将尝试将管道构建为一个库,它可以处理 (1) 管道排序 (2) 在多个 goroutine 中同时执行同一阶段 (3) 取消 (4) 可能的错误处理/重试