Go 并发模式:管道 与 取消

本文翻译自 Go 官方博客中的 Go Concurrency Patterns: Pipelines and cancellation,讲述怎么用若干通道(channel)组成管道(pipeline)实现并发任务,并做到如何确保释放资源。

简介(Introduction)

Go’s concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs.
Go 的并发基元可以轻松构建流数据流管道,从而有效利用 I/O 和多个 CPU。

This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
本文介绍了此类管道的示例,重点介绍了操作失败时出现的细节,并介绍如何利索地处理故障的技巧。

什么是管道(What is a pipeline?)

There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs.
Go 中没有正式的管道定义; 它只是众多并发程序中的一种。

Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
非正式地,管道(pipeline)是由通道(channel)连接的一系列阶段,其中每个阶段是一组协程(goroutine)运行相同的函数。

In each stage, the goroutines
在每个阶段,goroutines:

  • receive values from upstream via inbound channels
    通过入站通道从上游接收值
  • perform some function on that data, usually producing new values
    对该数据执行某些功能,通常生成新值
  • send values downstream via outbound channels
    通过出站通道向下游发送值

Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively.
每个阶段都有任意数量的入站和出站通道,第一阶段和最后一个阶段除外,它们分别只有出站或入站通道。

The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
第一阶段有时称为 源头(source)生产者(producer);最后一个阶段,接收器(sink)消费者(consumer)

We’ll begin with a simple example pipeline to explain the ideas and techniques.
我们将从一个简单的示例管道开始来解释这些概念和技术。

Later, we’ll present a more realistic example.
稍后,我们将举一个更真实的示例。

平方数(Squaring numbers)

Consider a pipeline with three stages.
考虑一个具有三个阶段的管道。

The first stage, gen, is a function that converts a list of integers to a channel that emits the integers in the list.
第一个阶段, gen 是一个函数,它将整数列表转换为发出列表中整数的通道。

The gen function starts a goroutine that sends the integers on the channel and closes the channel when all the values have been sent:
gen 函数启动一个 协程(goroutine),它在通道(channel)上发送整数,并在发送所有值后关闭通道(channel):

1
2
3
4
5
6
7
8
9
10
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}

The second stage, sq, receives integers from a channel and returns a channel that emits the square of each received integer.
第二阶段,sq, 从通道接收整数,并返回一个通道,该通道发出每个接收整数的平方。

After the inbound channel is closed and this stage has sent all the values downstream, it closes the outbound channel:
在入站通道关闭且此阶段已将所有值发送到下游后,它将关闭出站通道:

1
2
3
4
5
6
7
8
9
10
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}

The main function sets up the pipeline and runs the final stage:
main 函数设置管道并运行最后阶段:

it receives values from the second stage and prints each one, until the channel is closed:
它从第二阶段接收值并打印每一个,直到通道关闭:

1
2
3
4
5
6
7
8
9
10
11
func main() {
// Set up the pipeline. // 这里可以自行猜测一下 pipeline 在 Go 中的定义了。
// 设置管道。
c := gen(2, 3)
out := sq(c)

// Consume the output.
// 消费输出。
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}

Since sqhas the same type for its inbound and outbound channels, we can compose it any number of times.
由于 sq 的入站和出站通道的类型相同,因此我们可以多次编写它。

We can also rewrite main as a range loop, like the other stages:
我们还可以重写 main 作为范围循环,就像其它阶段一样:

1
2
3
4
5
6
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}

译注:
上面的列子虽然用了 goroutine 和 channel,但不具有并发功能,依旧是一个数处理,然后再处理一个数。
接下来的例子会慢慢加上并发。
接下来会讲 扇出,扇入 的概念,如对 扇出,扇入 没有概念,
请看维基百科上对扇入的解释,特别留意该页面中的图片,有助于想象。
fanout
译注结束

扇出,扇入(Fan-out, fan-in)

Multiple functions can read from the same channel until that channel is closed; this is called fan-out.
多个函数可以从同一通道读取,直到该通道关闭;这称为 (扇出)fan-out

This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
这提供了一种在一组工作线程(在Go中可对应工作协程)之间分配工作的方法,以便并行化 CPU 使用和 I/O

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed.
函数可以从多个输入读取并继续执行,直到所有输入通道多路复用到单个通道关闭,当所有输入都关闭时关闭。

This is called fan-in.
这称为 (扇入)fan-in

We can change our pipeline to run two instances of sq, each reading from the same input channel.
我们可以更改我们的管道以运行两个 sq 实例,每个实例从相同的输入通道读取。

We introduce a new function, merge, to fan in the results:
我们引入了一个新的函数 合并(merge),用于扇入(fan in)结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
// 将 sq 的工作分配给交叉的协程,它们都从 in 取值。——译注:交叉(across),不再排队执行。
// 将从 in 中读到的值,分配给并行的协程,这就对应上面提到的 扇出(fan-out)概念。
c1 := sq(in)
c2 := sq(in)

// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4 ——译注:并行了,不确定那个会先输入。
}
}

The merge function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.
merge 通过为每个入站通道启动 goroutine 将通道列表转换为单个通道。

Once all the output goroutines have been started, merge starts one more goroutine to close the outbound channel after all sends on that channel are done.
一旦所有 output goroutine 都已启动,merge 会再启动一个 goroutine 来关闭出站通道,在该通道上的所有发送完成之后。

Sends on a closed channel panic, so it’s important to ensure all sends are done before calling close.
发送一个封闭的通道 panic,因此确保在调用 close 之前完成所有发送是很重要的。

The sync.WaitGroup type provides a simple way to arrange this synchronization:
sync.WaitGroup 类型提供了一种的方法安排这样的同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func merge(cs ...<-chan int) <-chan int {// ——译注:注意 merge 可以接收多个参数
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are done.
// This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}

停一停(Stopping short)

There is a pattern to our pipeline functions:
在我们的管道函数有一个模式:

  • stages close their outbound channels when all the send operations are done.
    所有发送操作完成后,阶段关闭其出站通道。
  • stages keep receiving values from inbound channels until those channels are closed.
    阶段继续接收来自入站通道的值,直到这些通道都关闭。

This pattern allows each receiving stage to be written as a range loop and ensures that all goroutines exit once all values have been successfully sent downstream.
此模式允许将每个接收阶段写成一个 range 循环,并确保一旦所有值都已成功发送到下游,所有 goroutine 都将退出。

But in real pipelines, stages don’t always receive all the inbound values.
但在实际管道中,阶段并不总是接收所有入站值。

Sometimes this is by design: the receiver may only need a subset of values to make progress.
有时会这样设计的:接收者可能只需要一个值的子集来取得进展。

More often, a stage exits early because an inbound value represents an error in an earlier stage.
更常见的是,阶段会提前退出,因为入站值表示较早阶段的错误。

In either case the receiver should not have to wait for the remaining values to arrive, and we want earlier stages to stop producing values that later stages don’t need.
在这两种情况下,接收方都不必等待剩余值到达,我们希望早期阶段停止生成后期不需要的值。

In our example pipeline, if a stage fails to consume all the inbound values, the goroutines attempting to send those values will block indefinitely:
在我们的示例管道中,如果某个阶段无法使用所有入站值,则尝试发送这些值的 goroutine 将无限期地阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    // Consume the first value from the output.
// 消费输出中的第一个值
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// 由于我们没有收到第二个值,
// one of the output goroutines is hung attempting to send it.
// 其中一个输出 goroutine 会被挂起一直尝试发送它(第二个值)。

// 译注
// 这里的代码是来说明 merge 的缺陷。
// 如果我们合并了两个通道(有两个值),但只从合并后的通道取出一个值,
// 那么 merge 内部的值再也发不来了,这样会造成资源泄漏。
// 译注结束
}

This is a resource leak: goroutines consume memory and runtime resources, and heap references in goroutine stacks keep data from being garbage collected.
这是资源泄漏:goroutines 消耗内存和运行时资源,goroutine 堆栈中的堆引用使数据不被垃圾收集。

Goroutines are not garbage collected; they must exit on their own.
Goroutines 不会被垃圾收集; 他们必须自己退出。

We need to arrange for the upstream stages of our pipeline to exit even when the downstream stages fail to receive all the inbound values.
即使下游阶段未能接收到所有入站值,我们也需要安排管道的上游阶段退出。

One way to do this is to change the outbound channels to have a buffer.
一种方法是将出站通道更改为具有缓冲区。

A buffer can hold a fixed number of values; send operations complete immediately if there’s room in the buffer:
缓冲区可以包含固定数量的值; 如果缓冲区中有空间,则立即发送操作:

1
2
3
4
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1

When the number of values to be sent is known at channel creation time, a buffer can simplify the code.
当在通道创建时知道要发送的值的数量时,缓冲区可以简化代码。

For example, we can rewrite gen to copy the list of integers into a buffered channel and avoid creating a new goroutine:
例如,我们可以重写 gen 以将整数列表复制到缓冲通道中,并避免创建新的 goroutine:

1
2
3
4
5
6
7
8
func gen(nums ...int) <-chan int { // 译注:注意返回类型是 <-chan int
out := make(chan int, len(nums))//译注:而这里 out 是 make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}

Returning to the blocked goroutines in our pipeline, we might consider adding a buffer to the outbound channel returned by merge:
回到我们管道中阻塞的 goroutines,我们可能会考虑为 merge 返回的出站通道添加一个缓冲区:

1
2
3
4
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...

While this fixes the blocked goroutine in this program, this is bad code.
虽然这修复了此程序中阻塞的 goroutine,但这是错误的代码。

The choice of buffer size of 1 here depends on knowing the number of values merge will receive and the number of values downstream stages will consume.
此处缓冲区大小为 1 的选择取决于知道合并将接收的值的数量以及下游阶段将消耗的值的数量。

This is fragile: if we pass an additional value to gen, or if the downstream stage reads any fewer values, we will again have blocked goroutines.
这很脆弱:如果我们将一个额外的值传递给 gen,或者如果下游阶段读取任何更少的值,我们将再次阻止 goroutines。

Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.
相反,我们需要为下游阶段提供一种方法,以向发送方指示他们将停止接受输入。

显式取消 Explicit cancellation

When main decides to exit without receiving all the values from out, it must tell the goroutines in the upstream stages to abandon the values they’re trying to send.
main 决定退出而没有从 out 中接收所有值时,它必须告诉上游阶段的 goroutines 放弃他们试图发送的值。

It does so by sending values on a channel called done.
它通过在名为 done 的通道上发送值来实现。

It sends two values since there are potentially two blocked senders:
它发送两个值,因为可能有两个阻塞的发件人:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
in := gen(2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}

The sending goroutines replace their send operation with a select statement that proceeds either when the send on out happens or when they receive a value from done.
发送 goroutines 用 select 语句替换它们的发送操作,该语句在 out 时发送或者从 done 接收值时继续。

The value type of done is the empty struct because the value doesn’t matter: it is the receive event that indicates the send on out should be abandoned.
done 的值类型是空结构,因为值无关紧要:它是接收事件,指示发送 out 应该放弃。

The output goroutines continue looping on their inbound channel, c, so the upstream stages are not blocked.
output goroutine 继续在其入站通道 c 上循环,因此上游阶段不会被阻塞。

(We’ll discuss in a moment how to allow this loop to return early.)
(我们将在稍后讨论如何让这个循环尽早返回。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...

This approach has a problem: each downstream receiver needs to know the number of potentially blocked upstream senders and arrange to signal those senders on early return.
这种方法存在一个问题:_每个_下游接收器需要知道可能被阻塞的上游发送器的数量,并安排在早期返回时发信号通知这些发送器。

Keeping track of these counts is tedious and error-prone.
跟踪这些计数是单调乏味且容易出错的。

We need a way to tell an unknown and unbounded number of goroutines to stop sending their values downstream.
我们需要一种方法来告诉未知和无限数量的 goroutine 停止向下游发送它们的值。

In Go, we can do this by closing a channel, because a receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value.
在 Go 中,我们可以通过关闭通道来完成此操作,因为关闭通道上的接收操作总是可以立即进行,从而产生元素类型的零值

This means that main can unblock all the senders simply by closing the done channel.
这意味着 main 可以通过关闭 done 的通道来解锁所有发送者。

This close is effectively a broadcast signal to the senders.
这种关闭实际上是发送者的广播信号。

We extend each of our pipeline functions to accept done as a parameter and arrange for the close to happen via a defer statement, so that all return paths from main will signal the pipeline stages to exit.
我们扩展每个管道函数以接受 done 作为参数并通过 defer 语句安排接近发生,以便来自 main 的所有返回路径将通知管道阶段退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)

in := gen(done, 2, 3)

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)

// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9

// done will be closed by the deferred call.
}

Each of our pipeline stages is now free to return as soon as done is closed.
我们的每个管道阶段现在都可以在 done 关闭后自由返回。

The output routine in merge can return without draining its inbound channel, since it knows the upstream sender, sq, will stop attempting to send when done is closed. output ensures wg.Done is called on all return paths via a defer statement:

合并中的输出例程可以返回而不会耗尽其入站通道,因为它知道上游发送器 sq 将在完成关闭时停止尝试发送。 输出确保通过 defer 语句在所有返回路径上调用 wg.Done

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...

Similarly, sq can return as soon as done is closed. sq ensures its out channel is closed on all return paths via a defer statement:
同样,sq 可以在 done 关闭后立即返回。 sq 确保通过 defer 语句在所有返回路径上关闭其 out 通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}

Here are the guidelines for pipeline construction:
以下是管道构造的指导原则:

  • stages close their outbound channels when all the send operations are done.
    所有发送操作完成后,阶段内部关闭其出站通道。
  • stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.
    阶段保持从入站通道接收值,直到这些通道关闭或发件人被解除阻塞。

Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.
管道通过确保为发送的所有值提供足够的缓冲区或通过在接收方放弃信道时显式地发送信令来发送信号,从而解锁发送方。

Digesting a tree

Let’s consider a more realistic pipeline.
让我们考虑一个更现实的管道。

MD5 is a message-digest algorithm that’s useful as a file checksum.
MD5 是一种消息摘要算法,可用作文件校验和。

The command line utility md5sum prints digest values for a list of files.
命令行实用程序 md5sum 打印文件列表的摘要值。

1
2
3
4
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go

The main function of our program invokes a helper function MD5All, which returns a map from path name to digest value, then sorts and prints the results:
我们程序的主要功能是调用辅助函数 MD5All,它将路径名称中的映射返回到摘要值,然后对结果进行排序和打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}

The MD5All function is the focus of our discussion.
MD5All 功能是我们讨论的重点。

In serial.go, the implementation uses no concurrency and simply reads and sums each file as it walks the tree.
serial.go 中,实现不使用并发,只是在遍历树时读取和求和每个文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
m := make(map[string][md5.Size]byte)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return err
}
m[path] = md5.Sum(data)
return nil
})
if err != nil {
return nil, err
}
return m, nil
}

平行消费(Parallel digestion)

In parallel.go, we split MD5All into a two-stage pipeline.
parallel.go 中,我们将 MD5All 拆分为两阶段管道。

The first stage, sumFiles, walks the tree, digests each file in a new goroutine, and sends the results on a channel with value type result:
第一个阶段 sumFiles 遍历树,在新的 goroutine 中消化每个文件,并在具有值类型结果的通道上发送结果:

1
2
3
4
5
type result struct {
path string
sum [md5.Size]byte
err error
}

sumFiles returns two channels: one for the results and another for the error returned by filepath.Walk.
sumFiles 返回两个通道:一个用于结果,另一个用于 filepath.Walk 返回的错误。

The walk function starts a new goroutine to process each regular file, then checks done.
walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 done

If done is closed, the walk stops immediately:
如果done 关闭,walk 会立即停止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}

MD5All receives the digest values from c.
MD5Allc 接收摘要值。

MD5All returns early on error, closing done via a defer:
MD5All 会在错误时提前返回,通过延迟 done 关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)

c, errc := sumFiles(done, root)

m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}

有限的并行性(Bounded parallelism)

The MD5All implementation in parallel.go starts a new goroutine for each file.
parallel.go 中的 MD5All 实现为每个文件启动一个新的 goroutine。

In a directory with many large files, this may allocate more memory than is available on the machine.
在具有许多大文件的目录中,这可能会分配比计算机上可用内存更多的内存。

We can limit these allocations by bounding the number of files read in parallel.
我们可以通过限制并行读取的文件数来限制这些分配。

In bounded.go, we do this by creating a fixed number of goroutines for reading files.
bounded.go 中,我们通过创建固定数量的goroutine来读取文件。

Our pipeline now has three stages: walk the tree, read and digest the files, and collect the digests.
我们的管道现在有三个阶段:走树,读取和消化文件,并收集摘要。

The first stage, walkFiles, emits the paths of regular files in the tree:
第一个阶段,walkFiles,发出树中常规文件的路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}

The middle stage starts a fixed number of digester goroutines that receive file names from paths and send results on channel c:
中间阶段启动固定数量的digester goroutine,从 paths 接收文件名并在通道 c 上发送 results

1
2
3
4
5
6
7
8
9
10
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}

Unlike our previous examples, digester does not close its output channel, as multiple goroutines are sending on a shared channel.
与前面的示例不同,由于多个 goroutine 正在共享通道上发送,因此 digester 不会关闭其输出通道。

Instead, code in MD5All arranges for the channel to be closed when all the digesters are done:
相反,MD5All 中的代码会在所有 digesters 完成后安排关闭通道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()

We could instead have each digester create and return its own output channel, but then we would need additional goroutines to fan-in the results.
我们可以让每个消化器(digester)创建并返回自己的输出通道,但是我们需要额外的 goroutine 来扇入(fan-in)结果。

The final stage receives all the results from c then checks the error from errc.
最后阶段从 c 接收所有 results,然后检查 errc 中的错误。

This check cannot happen any earlier, since before this point, walkFiles may block sending values downstream:
此检查不会更早发生,因为在此之前,walkFiles 可能会阻止向下游发送值:

1
2
3
4
5
6
7
8
9
10
11
12
13
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}

总结(Conclusion)

This article has presented techniques for constructing streaming data pipelines in Go.
本文介绍了在Go中构建流数据管道的技术。

Dealing with failures in such pipelines is tricky, since each stage in the pipeline may block attempting to send values downstream, and the downstream stages may no longer care about the incoming data.
处理此类管道中的故障非常棘手,因为管道中的每个阶段都可能阻止尝试向下游发送值,并且下游阶段可能不再关心传入数据。

We showed how closing a channel can broadcast a “done” signal to all the goroutines started by a pipeline and defined guidelines for constructing pipelines correctly.
我们展示了关闭一个通道如何向管道启动的所有 goroutine 广播 “done” 信号,并定义正确构建管道的准则。

Further reading(扩展阅读):

觉得文章对您有帮助,请我喝瓶肥宅快乐水可好 (๑•̀ㅂ•́)و✧
  • 本文作者: 阿彬~
  • 本文链接: https://iweixubin.github.io/posts/go/pipelines/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 免责声明:本媒体部分图片,版权归原作者所有。因条件限制,无法找到来源和作者未进行标注。
         如果侵犯到您的权益,请与我联系删除