DPC++ Runtime
Runtime libraries for oneAPI DPC++
thread_pool.hpp
Go to the documentation of this file.
1 //===-- thread_pool.hpp - Simple thread pool --------------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #pragma once
10 
11 #include <algorithm>
12 #include <atomic>
13 #include <condition_variable>
14 #include <mutex>
15 #include <queue>
16 #include <thread>
17 #include <vector>
18 
20 
22 namespace sycl {
23 namespace detail {
24 
25 class ThreadPool {
26  std::vector<std::thread> MLaunchedThreads;
27 
28  size_t MThreadCount;
29  std::queue<std::function<void()>> MJobQueue;
30  std::mutex MJobQueueMutex;
31  std::condition_variable MDoSmthOrStop;
32  std::atomic_bool MStop;
33 
34  void worker() {
35  std::unique_lock<std::mutex> Lock(MJobQueueMutex);
36 
37  while (true) {
38  MDoSmthOrStop.wait(
39  Lock, [this]() { return !MJobQueue.empty() || MStop.load(); });
40 
41  if (MStop.load())
42  break;
43 
44  std::function<void()> Job = std::move(MJobQueue.front());
45  MJobQueue.pop();
46  Lock.unlock();
47 
48  Job();
49 
50  Lock.lock();
51  }
52  }
53 
54  void start() {
55  MLaunchedThreads.reserve(MThreadCount);
56 
57  MStop.store(false);
58 
59  for (size_t Idx = 0; Idx < MThreadCount; ++Idx)
60  MLaunchedThreads.emplace_back([this] { worker(); });
61  }
62 
63 public:
64  ThreadPool(unsigned int ThreadCount = 1) : MThreadCount(ThreadCount) {
65  start();
66  }
67 
68  ~ThreadPool() { finishAndWait(); }
69 
70  void finishAndWait() {
71  MStop.store(true);
72 
73  MDoSmthOrStop.notify_all();
74 
75  for (std::thread &Thread : MLaunchedThreads)
76  if (Thread.joinable())
77  Thread.join();
78  }
79 
80  template <typename T> void submit(T &&Func) {
81  {
82  std::lock_guard<std::mutex> Lock(MJobQueueMutex);
83  MJobQueue.emplace([F = std::move(Func)]() { F(); });
84  }
85 
86  MDoSmthOrStop.notify_one();
87  }
88 
89  void submit(std::function<void()> &&Func) {
90  {
91  std::lock_guard<std::mutex> Lock(MJobQueueMutex);
92  MJobQueue.emplace(Func);
93  }
94 
95  MDoSmthOrStop.notify_one();
96  }
97 };
98 
99 } // namespace detail
100 } // namespace sycl
101 } // __SYCL_INLINE_NAMESPACE(cl)
cl::sycl::detail::ThreadPool::submit
void submit(T &&Func)
Definition: thread_pool.hpp:80
cl::sycl::detail::ThreadPool::submit
void submit(std::function< void()> &&Func)
Definition: thread_pool.hpp:89
sycl
Definition: invoke_simd.hpp:68
cl::sycl::info::queue
queue
Definition: info_desc.hpp:229
cl::sycl::detail::ThreadPool::~ThreadPool
~ThreadPool()
Definition: thread_pool.hpp:68
defines.hpp
cl
We provide new interfaces for matrix muliply in this patch:
Definition: access.hpp:13
cl::sycl::detail::ThreadPool::finishAndWait
void finishAndWait()
Definition: thread_pool.hpp:70
cl::sycl::detail::ThreadPool
Definition: thread_pool.hpp:25
cl::sycl::detail::ThreadPool::ThreadPool
ThreadPool(unsigned int ThreadCount=1)
Definition: thread_pool.hpp:64
__SYCL_INLINE_NAMESPACE
#define __SYCL_INLINE_NAMESPACE(X)
Definition: defines_elementary.hpp:12