-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessor.go
More file actions
146 lines (137 loc) · 3.53 KB
/
processor.go
File metadata and controls
146 lines (137 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Package parallel implements helpers for fast processing of line oriented inputs.
package parallel
import (
"bufio"
"bytes"
"io"
"log"
"runtime"
"sync"
"time"
)
// SimpleTransformerFunc converts bytes to bytes.
type SimpleTransformerFunc func([]byte) []byte
// TransformerFunc takes a slice of bytes and returns a slice of bytes and a
// an error. A common denominator of functions that transform data.
type TransformerFunc func([]byte) ([]byte, error)
// ToTransformerFunc takes a simple transformer and wraps it so it can be used in
// places where a TransformerFunc is expected.
func ToTransformerFunc(f SimpleTransformerFunc) TransformerFunc {
return func(b []byte) ([]byte, error) {
return f(b), nil
}
}
// Processor can process lines in parallel.
type Processor struct {
BatchSize int
RecordSeparator byte
NumWorkers int
SkipEmptyLines bool
Verbose bool
R io.Reader
W io.Writer
F TransformerFunc
}
// New is a preferred way to create a new parallel processor.
var New = NewProcessor
// NewProcessor creates a new line processor.
func NewProcessor(r io.Reader, w io.Writer, f TransformerFunc) *Processor {
return &Processor{
BatchSize: 10000,
RecordSeparator: '\n',
NumWorkers: runtime.NumCPU(),
SkipEmptyLines: true,
R: r,
W: w,
F: f,
}
}
// RunWorkers allows to quickly set the number of workers.
func (p *Processor) RunWorkers(numWorkers int) error {
p.NumWorkers = numWorkers
return p.Run()
}
// Run starts the workers, crunching through the input.
func (p *Processor) Run() error {
// wErr signals a worker or writer error. If an error occurs, the items in
// the queue are still process, just no items are added to the queue. There
// is only one way to toggle this, from false to true, so we don't care
// about synchronisation.
var wErr error
// worker takes []byte batches from a channel queue, executes f and sends
// the result to the out channel.
worker := func(queue chan [][]byte, out chan []byte, f TransformerFunc, wg *sync.WaitGroup) {
defer wg.Done()
for batch := range queue {
for _, b := range batch {
r, err := f(b)
if err != nil {
wErr = err
}
out <- r
}
}
}
// writer buffers writes.
writer := func(w io.Writer, bc chan []byte, done chan bool) {
bw := bufio.NewWriter(w)
for b := range bc {
if _, err := bw.Write(b); err != nil {
wErr = err
}
}
if err := bw.Flush(); err != nil {
wErr = err
}
done <- true
}
var (
queue = make(chan [][]byte)
out = make(chan []byte)
done = make(chan bool)
total int64
started = time.Now()
wg sync.WaitGroup
)
go writer(p.W, out, done)
for i := 0; i < p.NumWorkers; i++ {
wg.Add(1)
go worker(queue, out, p.F, &wg)
}
batch := NewBytesBatchCapacity(p.BatchSize)
br := bufio.NewReader(p.R)
for {
b, err := br.ReadBytes(p.RecordSeparator)
if err == io.EOF {
break
}
if err != nil {
return err
}
if len(bytes.TrimSpace(b)) == 0 && p.SkipEmptyLines {
continue
}
batch.Add(b)
if batch.Size() == p.BatchSize {
if p.Verbose {
log.Printf("parallel: dispatched %d lines (%0.2f lines/s)",
total, float64(total)/time.Since(started).Seconds())
}
total += int64(p.BatchSize)
// To avoid checking on each loop, we only check for worker or
// write errors here.
if wErr != nil {
break
}
queue <- batch.Slice()
batch.Reset()
}
}
queue <- batch.Slice()
batch.Reset()
close(queue)
wg.Wait()
close(out)
<-done
return wErr
}