forked from mixpanel/obs
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathio.go
More file actions
81 lines (69 loc) · 1.9 KB
/
io.go
File metadata and controls
81 lines (69 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package obs
import (
"io"
"context"
)
type readCloser struct {
rc io.ReadCloser
fr FlightRecorder
ctx context.Context
fs FlightSpan
done DoneFunc
total int64
}
// NewReadCloserWithSpan wraps an io.ReadCloser, starting the Span when Read is first called and annotating the Trace with the total bytes read when Close is called.
func NewReadCloserWithSpan(ctx context.Context, rc io.ReadCloser, fr FlightRecorder) io.ReadCloser {
return &readCloser{fr: fr, rc: rc, ctx: ctx}
}
func (rc *readCloser) initFS() {
if rc.fs == nil {
rc.fs, rc.ctx, rc.done = rc.fr.WithNewSpan(rc.ctx, "Read")
}
}
func (rc *readCloser) Read(p []byte) (int, error) {
rc.initFS()
n, err := rc.rc.Read(p)
rc.total += int64(n)
rc.fs.IncrBy("bytes_read", float64(n))
return n, err
}
func (rc *readCloser) Close() error {
rc.initFS()
err := rc.rc.Close()
rc.fs.TraceSpan().SetTag("total_read", rc.total)
rc.fs.TraceSpan().SetTag("close_error", err)
rc.done()
return err
}
type writeCloser struct {
wc io.WriteCloser
fr FlightRecorder
ctx context.Context
fs FlightSpan
done DoneFunc
total int64
}
// NewWriteCloserWithSpan wraps an io.WriteCloser, starting the Span when Write is first called and annotating the Trace with the total bytes written when Close is called.
func NewWriteCloserWithSpan(ctx context.Context, wc io.WriteCloser, fr FlightRecorder) io.WriteCloser {
return &writeCloser{fr: fr, wc: wc, ctx: ctx}
}
func (wc *writeCloser) initFS() {
if wc.fs == nil {
wc.fs, wc.ctx, wc.done = wc.fr.WithNewSpan(wc.ctx, "Write")
}
}
func (wc *writeCloser) Write(p []byte) (int, error) {
wc.initFS()
n, err := wc.wc.Write(p)
wc.total += int64(n)
wc.fs.IncrBy("bytes_written", float64(n))
return n, err
}
func (wc *writeCloser) Close() error {
wc.initFS()
err := wc.wc.Close()
wc.fs.TraceSpan().SetTag("total_written", wc.total)
wc.fs.TraceSpan().SetTag("close_error", err)
wc.done()
return err
}