OLD | NEW |
1 /* | 1 /* |
2 * Copyright 2014 Google Inc. | 2 * Copyright 2014 Google Inc. |
3 * | 3 * |
4 * Use of this source code is governed by a BSD-style license that can be | 4 * Use of this source code is governed by a BSD-style license that can be |
5 * found in the LICENSE file. | 5 * found in the LICENSE file. |
6 */ | 6 */ |
7 | 7 |
8 #include "SkOnce.h" | 8 #include "SkOnce.h" |
9 #include "SkSemaphore.h" | 9 #include "SkSemaphore.h" |
10 #include "SkSpinlock.h" | 10 #include "SkSpinlock.h" |
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
89 ~AutoLock() { fLock->release(); } | 89 ~AutoLock() { fLock->release(); } |
90 private: | 90 private: |
91 SkSpinlock* fLock; | 91 SkSpinlock* fLock; |
92 }; | 92 }; |
93 | 93 |
94 struct Work { | 94 struct Work { |
95 std::function<void(void)> fn; // A function to call | 95 std::function<void(void)> fn; // A function to call |
96 SkAtomic<int32_t>* pending; // then decrement pending afterwards. | 96 SkAtomic<int32_t>* pending; // then decrement pending afterwards. |
97 }; | 97 }; |
98 | 98 |
99 explicit ThreadPool(int threads) { | 99 explicit ThreadPool(int threads, std::function<void(void)> cleanupFn) |
| 100 : fCleanupFn(cleanupFn) { |
100 if (threads == -1) { | 101 if (threads == -1) { |
101 threads = sk_num_cores(); | 102 threads = sk_num_cores(); |
102 } | 103 } |
103 for (int i = 0; i < threads; i++) { | 104 for (int i = 0; i < threads; i++) { |
104 fThreads.push(new SkThread(&ThreadPool::Loop, this)); | 105 fThreads.push(new SkThread(&ThreadPool::Loop, this)); |
105 fThreads.top()->start(); | 106 fThreads.top()->start(); |
106 } | 107 } |
107 } | 108 } |
108 | 109 |
109 ~ThreadPool() { | 110 ~ThreadPool() { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
154 AutoLock lock(&pool->fWorkLock); | 155 AutoLock lock(&pool->fWorkLock); |
155 if (pool->fWork.empty()) { | 156 if (pool->fWork.empty()) { |
156 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). | 157 // Someone in Wait() stole our work (fWorkAvailable is an up
per bound). |
157 // Well, that's fine, back to sleep for us. | 158 // Well, that's fine, back to sleep for us. |
158 continue; | 159 continue; |
159 } | 160 } |
160 work = pool->fWork.back(); | 161 work = pool->fWork.back(); |
161 pool->fWork.pop_back(); | 162 pool->fWork.pop_back(); |
162 } | 163 } |
163 if (!work.fn) { | 164 if (!work.fn) { |
| 165 if (pool->fCleanupFn) { |
| 166 pool->fCleanupFn(); |
| 167 } |
164 return; // Poison pill. Time... to die. | 168 return; // Poison pill. Time... to die. |
165 } | 169 } |
166 work.fn(); | 170 work.fn(); |
167 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). | 171 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with
load in Wait(). |
168 } | 172 } |
169 } | 173 } |
170 | 174 |
171 // fWorkLock must be held when reading or modifying fWork. | 175 // fWorkLock must be held when reading or modifying fWork. |
172 SkSpinlock fWorkLock; | 176 SkSpinlock fWorkLock; |
173 SkTArray<Work> fWork; | 177 SkTArray<Work> fWork; |
174 | 178 |
175 // A thread-safe upper bound for fWork.count(). | 179 // A thread-safe upper bound for fWork.count(). |
176 // | 180 // |
177 // We'd have it be an exact count but for the loop in Wait(): | 181 // We'd have it be an exact count but for the loop in Wait(): |
178 // we never want that to block, so it can't call fWorkAvailable.wait(), | 182 // we never want that to block, so it can't call fWorkAvailable.wait(), |
179 // and that's the only way to decrement fWorkAvailable. | 183 // and that's the only way to decrement fWorkAvailable. |
180 // So fWorkAvailable may overcount actual the work available. | 184 // So fWorkAvailable may overcount actual the work available. |
181 // We make do, but this means some worker threads may wake spuriously. | 185 // We make do, but this means some worker threads may wake spuriously. |
182 SkSemaphore fWorkAvailable; | 186 SkSemaphore fWorkAvailable; |
183 | 187 |
184 // These are only changed in a single-threaded context. | 188 // These are only changed in a single-threaded context. |
185 SkTDArray<SkThread*> fThreads; | 189 SkTDArray<SkThread*> fThreads; |
| 190 |
| 191 std::function<void(void)> fCleanupFn; |
186 static ThreadPool* gGlobal; | 192 static ThreadPool* gGlobal; |
187 | 193 |
188 friend struct SkTaskGroup::Enabler; | 194 friend struct SkTaskGroup::Enabler; |
189 }; | 195 }; |
190 ThreadPool* ThreadPool::gGlobal = nullptr; | 196 ThreadPool* ThreadPool::gGlobal = nullptr; |
191 | 197 |
192 } // namespace | 198 } // namespace |
193 | 199 |
194 SkTaskGroup::Enabler::Enabler(int threads) { | 200 SkTaskGroup::Enabler::Enabler(int threads, std::function<void(void)> cleanupFn) |
| 201 : fCleanupFn(cleanupFn) { |
195 SkASSERT(ThreadPool::gGlobal == nullptr); | 202 SkASSERT(ThreadPool::gGlobal == nullptr); |
196 if (threads != 0) { | 203 if (threads != 0) { |
197 ThreadPool::gGlobal = new ThreadPool(threads); | 204 ThreadPool::gGlobal = new ThreadPool(threads, cleanupFn); |
198 } | 205 } |
199 } | 206 } |
200 | 207 |
201 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } | 208 SkTaskGroup::Enabler::~Enabler() { |
| 209 if (!ThreadPool::gGlobal && fCleanupFn) { |
| 210 fCleanupFn(); |
| 211 } |
| 212 delete ThreadPool::gGlobal; |
| 213 } |
202 | 214 |
203 SkTaskGroup::SkTaskGroup() : fPending(0) {} | 215 SkTaskGroup::SkTaskGroup() : fPending(0) {} |
204 | 216 |
205 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } | 217 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending
); } |
206 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPend
ing); } | 218 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPend
ing); } |
207 void SkTaskGroup::batch(int N, std::function<void(int)> fn) { | 219 void SkTaskGroup::batch(int N, std::function<void(int)> fn) { |
208 ThreadPool::Batch(N, fn, &fPending); | 220 ThreadPool::Batch(N, fn, &fPending); |
209 } | 221 } |
210 | 222 |
OLD | NEW |