Go 流水线模式

我将把这篇文章分为两部分,在第一部分我将尝试解释管道的基本构建块,在第二篇文章中我将尝试围绕这个设计构建一个通用库。

我最近在阅读Concurrency In Go时遇到了一种叫做管道处理模式的东西。想法是您可以将逻辑功能分解为多个阶段。每个阶段都进行自己的处理,并将输出传递到下一个阶段进行处理。您可以修改彼此独立的阶段,限制阶段的速率等等。

管道基础知识

来看下这两个函数:

func Multiply(value ,multiplier int) int { 
	return value*multiplier
}
func Add(value,additive int) int { 
	return value+additive
}

这两个函数很简单,只是对数字进行运算并返回它。您可以将这些视为管道的“阶段”。为了完成管道,我们可以将两个阶段结合起来。

ints := []int{1,2,3,4}

for _,v := range ints {
	fmt.Println(multiply(add(multiply(v,2),1),2))
}

在这里可以看到 stage 处理输入,处理后返回相同的类型以供进一步的 stage 处理。

并发管道

这个简单的模型现在可以扩展到利用 go 的通道和 goroutines 来同时执行阶段的处理。在我们这样做之前,我们的管道中必须有以下实体。

  • Generator,它将负责生成管道处理的输入。这是管道的第一阶段。

  • 阶段,可以在其中执行实际处理。

  • Canceller,一种通过管道发出取消或处理结束信号的机制。

让我们从生成器开始

func generator(done <-chan interface{}, integers ...int) <-chan int {
	intStream := make(chan int)
	go func() {
		defer close(intStream)
		for _, i := range integers {
			select {
			case <-done: return
			case intStream <- i:
			}
		}

	}()
	return intStream
}

这个函数将生成一个 goroutine,它将在一个通道上生成数值,并且该函数只是简单地返回那个通道。在此通道上生成的值用作进一步阶段的输入。我们还在函数中传递了 done 通道来退出生成流程,这也称为毒丸模式(Poison Pill Pattern)

扩展相同的想法,我们可以重写乘法和加法函数来并发处理。

func multiply(done <-chan interface{}, intStream <-chan int, multiplier int) chan int {
	multipliedStream := make(chan int)
	go func() {
		defer close(multipliedStream)
		for i := range intStream {
			select {
			case <-done: return
			case multipliedStream <- i * multiplier:
			}
		}
	}()
	return multipliedStream
}

func add(done <-chan interface{}, intStream <-chan int, adder int) chan int {
	addedStream := make(chan int)
	go func() {
		defer close(addedStream)
		for i := range intStream {
			select {
			case <-done:return
			case addedStream <- i * adder:
			}
		}
	}()
	return addedStream
}

旁注:也许我们可以重写这些函数来接受接口或函数,而不是编写两次共享大部分代码的函数,但我试图在这里保持简单。

为了构建管道,我们可以将管道的这些阶段组合为如下形式:

done := make(chan interface{})
intStream := generator(done, 1, 2, 3, 4)

pipeline := add(done, multiply(done, intStream, 2), 5)

for i := range pipeline {
    fmt.Println(i)
}

这段代码将在不同的管道中同时运行阶段,通过 go 通道将它们的结果传递到下一个阶段。我们可以通过阅读最后阶段的频道来获得最终结果。

更实际的用例

好的,但是更实用的东西怎么样?我们可以使用这种设计实现一个shopify的爬虫。目的是抓取shopify页面并抓取有用的信息。

这是我们可以遵循的基本设计算法。

转到应用程序目录页面,例如https://apps.shopify.com/browse/all?page=1

从每个应用程序目录页面抓取不同应用程序的 URL。

访问每个应用 URL 抓取应用信息并填充我们的结构。

遍历分页以转到下一个应用程序目录页面并重复。

在管道方面,我们需要构建这样的东西

让我们定义 App 结构

type App struct {
    Rating     float64
    Review     int
    Name       string
    Category   []string
    AppLink    string
    PageLink   string
}

我没有编写实际的抓取代码来保持简短和简单,但是可以轻松增加抓取逻辑而无需更改代码结构。


type ShopifyScraper struct {}



// Generation Stage, This stage will generate app directory URLs.
func (s *ShopifyScraper) Generate(done <-chan bool) chan App {

	links := make(chan App)
	go func() {
		defer close(links)
		for i := 0; i < 5; i++ {
			select {
			case <-done: return
			case links <- App{PageLink: fmt.Sprintf("https://apps.shopify.com/browse/all?page=%d", i)}:
			}
		}
	}()
	return links
}

// App Directory Scraper Stage, This stage will visit directory page and scrape different app URLs from the page for further processing.
func (s *ShopifyScraper) AppDirectoryScraper(done <-chan bool, input <-chan App) chan App {

	apps := make(chan App, 5)
	go func() {
		defer close(apps)
		for app := range input {

			// Extract Different App URLs by scraping  using app.PageLink

			select {
			case <-done: return
			case apps <- app:
			}
		}
	}()
	return apps
}

// App Scraper Stage, This stage will visit app page and scrape app information
func (s *ShopifyScraper) AppScraper(done <-chan bool, input <-chan App) chan App {
	apps := make(chan App)
	go func() {
		defer close(apps)
		for app := range input {
			
			// Extract Different App Information by scraping by using app.AppLink
			
			select {
			case <-done: return
			case apps <- app:

			}
		}
	}()
	return apps
}

最后,将各个阶段连接在一起以形成管道。

func (s *ShopifyScraper) Do() []AppInfo {

	var result []AppInfo
	done := make(chan bool)
	defer close(done)

	for i := range s.AppScraper(done, s.AppDirectoryScraper(done, s.Generate(done))) {
		result = append(result, i)
	}
	return result
}

在下一篇文章中,我们将尝试将管道构建为一个库,它可以处理 (1) 管道排序 (2) 在多个 goroutine 中同时执行同一阶段 (3) 取消 (4) 可能的错误处理/重试

原文链接

Pipeline Pattern in Go Part 1