1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
// 懒加载 m
if g.m == nil {
g.m = make(map[string]*call)
}
// 判断函数是否已经被调用过
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
// 调用过的话则等待函数执行完毕
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
// 将函数调用加入 map
g.m[key] = c
g.mu.Unlock()
// 执行函数
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
// 创建 channel
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
// 新开 goroutine 执行函数
go g.doCall(c, key, fn)
return ch
}
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
// 用于标记是否正常返回
normalReturn := false
// 用于标识是否触发了 recover
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// 如果既没有正常执行完毕,又没有 recover ,则需要直接退出
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
// 函数已经执行完毕了,call也就没用了
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
// 如果返回的是 panic 错误,为了避免这个错误被上层 recover捕获而造成 channel 死锁,
// 因此需要再开一个 goroutine 进行 panic
go panic(e)
// 阻塞当前 goroutine 避免被垃圾回收
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// 当前 goroutine 已经退出,不需要再进行处理
} else {
// 返回结果到 chan
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
// 使用匿名函数的目的是为了在内部再使用一个 defer 用来捕获 panic
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
// 构建 panic 错误
c.err = newPanicError(r)
}
}
}()
// 执行函数返回结果
c.val, c.err = fn()
normalReturn = true
}()
// 判断是否 panic
if !normalReturn {
recovered = true
}
}
func newPanicError(v interface{}) error {
// 获取堆栈信息
stack := debug.Stack()
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
|