DPC++ Runtime
Runtime libraries for oneAPI DPC++
scheduler.cpp
Go to the documentation of this file.
1 //===-- scheduler.cpp - SYCL Scheduler --------------------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
11 #include <detail/queue_impl.hpp>
14 #include <detail/stream_impl.hpp>
15 #include <sycl/device_selector.hpp>
16 
17 #include <chrono>
18 #include <cstdio>
19 #include <memory>
20 #include <mutex>
21 #include <set>
22 #include <thread>
23 #include <vector>
24 
25 namespace sycl {
27 namespace detail {
28 
29 void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
30  ReadLockT &GraphReadLock) {
31 #ifdef XPTI_ENABLE_INSTRUMENTATION
32  // Will contain the list of dependencies for the Release Command
33  std::set<Command *> DepCommands;
34 #endif
35  std::vector<Command *> ToCleanUp;
36  for (Command *Cmd : Record->MReadLeaves) {
37  EnqueueResultT Res;
38  bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
39  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
40  throw runtime_error("Enqueue process failed.",
41  PI_ERROR_INVALID_OPERATION);
42 #ifdef XPTI_ENABLE_INSTRUMENTATION
43  // Capture the dependencies
44  DepCommands.insert(Cmd);
45 #endif
46  GraphProcessor::waitForEvent(Cmd->getEvent(), GraphReadLock, ToCleanUp);
47  }
48  for (Command *Cmd : Record->MWriteLeaves) {
49  EnqueueResultT Res;
50  bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
51  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
52  throw runtime_error("Enqueue process failed.",
53  PI_ERROR_INVALID_OPERATION);
54 #ifdef XPTI_ENABLE_INSTRUMENTATION
55  DepCommands.insert(Cmd);
56 #endif
57  GraphProcessor::waitForEvent(Cmd->getEvent(), GraphReadLock, ToCleanUp);
58  }
59  for (AllocaCommandBase *AllocaCmd : Record->MAllocaCommands) {
60  Command *ReleaseCmd = AllocaCmd->getReleaseCmd();
61  EnqueueResultT Res;
62  bool Enqueued = GraphProcessor::enqueueCommand(ReleaseCmd, Res, ToCleanUp);
63  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
64  throw runtime_error("Enqueue process failed.",
65  PI_ERROR_INVALID_OPERATION);
66 #ifdef XPTI_ENABLE_INSTRUMENTATION
67  // Report these dependencies to the Command so these dependencies can be
68  // reported as edges
69  ReleaseCmd->resolveReleaseDependencies(DepCommands);
70 #endif
71  GraphProcessor::waitForEvent(ReleaseCmd->getEvent(), GraphReadLock,
72  ToCleanUp);
73  }
74 }
75 
76 EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
77  const QueueImplPtr &Queue) {
78  EventImplPtr NewEvent = nullptr;
79  const CG::CGTYPE Type = CommandGroup->getType();
80  std::vector<Command *> AuxiliaryCmds;
81  std::vector<StreamImplPtr> Streams;
82 
83  if (Type == CG::Kernel) {
84  Streams = ((CGExecKernel *)CommandGroup.get())->getStreams();
85  // Stream's flush buffer memory is mainly initialized in stream's __init
86  // method. However, this method is not available on host device.
87  // Initializing stream's flush buffer on the host side in a separate task.
88  if (Queue->is_host()) {
89  for (const StreamImplPtr &Stream : Streams) {
90  initStream(Stream, Queue);
91  }
92  }
93  }
94 
95  {
96  WriteLockT Lock = acquireWriteLock();
97 
98  Command *NewCmd = nullptr;
99  switch (Type) {
100  case CG::UpdateHost:
101  NewCmd = MGraphBuilder.addCGUpdateHost(std::move(CommandGroup),
102  DefaultHostQueue, AuxiliaryCmds);
103  break;
104  case CG::CodeplayHostTask:
105  NewCmd = MGraphBuilder.addCG(std::move(CommandGroup), DefaultHostQueue,
106  AuxiliaryCmds);
107  break;
108  default:
109  NewCmd = MGraphBuilder.addCG(std::move(CommandGroup), std::move(Queue),
110  AuxiliaryCmds);
111  }
112  NewEvent = NewCmd->getEvent();
113  }
114 
115  std::vector<Command *> ToCleanUp;
116  {
117  ReadLockT Lock = acquireReadLock();
118 
119  Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());
120 
121  EnqueueResultT Res;
122  bool Enqueued;
123 
124  auto CleanUp = [&]() {
125  if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
126  NewEvent->setCommand(nullptr);
127  delete NewCmd;
128  }
129  };
130 
131  for (Command *Cmd : AuxiliaryCmds) {
132  Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
133  try {
134  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
135  throw runtime_error("Auxiliary enqueue process failed.",
136  PI_ERROR_INVALID_OPERATION);
137  } catch (...) {
138  // enqueueCommand() func and if statement above may throw an exception,
139  // so destroy required resources to avoid memory leak
140  CleanUp();
141  std::rethrow_exception(std::current_exception());
142  }
143  }
144 
145  if (NewCmd) {
146  // TODO: Check if lazy mode.
147  EnqueueResultT Res;
148  try {
149  bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
150  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
151  throw runtime_error("Enqueue process failed.",
152  PI_ERROR_INVALID_OPERATION);
153  } catch (...) {
154  // enqueueCommand() func and if statement above may throw an exception,
155  // so destroy required resources to avoid memory leak
156  CleanUp();
157  std::rethrow_exception(std::current_exception());
158  }
159  }
160  }
161  cleanupCommands(ToCleanUp);
162 
163  for (auto StreamImplPtr : Streams) {
164  StreamImplPtr->flush(NewEvent);
165  }
166 
167  return NewEvent;
168 }
169 
170 EventImplPtr Scheduler::addCopyBack(Requirement *Req) {
171  std::vector<Command *> AuxiliaryCmds;
172  Command *NewCmd = nullptr;
173  {
174  WriteLockT Lock = acquireWriteLock();
175  NewCmd = MGraphBuilder.addCopyBack(Req, AuxiliaryCmds);
176  // Command was not creted because there were no operations with
177  // buffer.
178  if (!NewCmd)
179  return nullptr;
180  }
181 
182  std::vector<Command *> ToCleanUp;
183  try {
184  ReadLockT Lock = acquireReadLock();
185  EnqueueResultT Res;
186  bool Enqueued;
187 
188  for (Command *Cmd : AuxiliaryCmds) {
189  Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
190  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
191  throw runtime_error("Enqueue process failed.",
192  PI_ERROR_INVALID_OPERATION);
193  }
194 
195  Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
196  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
197  throw runtime_error("Enqueue process failed.",
198  PI_ERROR_INVALID_OPERATION);
199  } catch (...) {
200  NewCmd->getQueue()->reportAsyncException(std::current_exception());
201  }
202  EventImplPtr NewEvent = NewCmd->getEvent();
203  cleanupCommands(ToCleanUp);
204  return NewEvent;
205 }
206 
207 Scheduler &Scheduler::getInstance() {
208  return GlobalHandler::instance().getScheduler();
209 }
210 
211 void Scheduler::waitForEvent(const EventImplPtr &Event) {
212  ReadLockT Lock = acquireReadLock();
213  // It's fine to leave the lock unlocked upon return from waitForEvent as
214  // there's no more actions to do here with graph
215  std::vector<Command *> ToCleanUp;
216  GraphProcessor::waitForEvent(std::move(Event), Lock, ToCleanUp,
217  /*LockTheLock=*/false);
218  cleanupCommands(ToCleanUp);
219 }
220 
221 static void deallocateStreams(
222  std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
223  // Deallocate buffers for stream objects of the finished commands. Iterate in
224  // reverse order because it is the order of commands execution.
225  for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
226  StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
227  detail::Scheduler::getInstance().deallocateStreamBuffers(
228  StreamImplPtr->get());
229 }
230 
231 void Scheduler::cleanupFinishedCommands(const EventImplPtr &FinishedEvent) {
232  // We are going to traverse a graph of finished commands. Gather stream
233  // objects from these commands if any and deallocate buffers for these stream
234  // objects, this is needed to guarantee that streamed data is printed and
235  // resources are released.
236  std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
237  // Similar to streams, we also collect the auxiliary resources used by the
238  // commands. Cleanup will make sure the commands do not own the resources
239  // anymore, so we just need them to survive the graph lock then they can die
240  // as they go out of scope.
241  std::vector<std::shared_ptr<const void>> AuxResourcesToDeallocate;
242  {
243  // Avoiding deadlock situation, where one thread is in the process of
244  // enqueueing (with a locked mutex) a currently blocked task that waits for
245  // another thread which is stuck at attempting cleanup.
246  WriteLockT Lock(MGraphLock, std::try_to_lock);
247  if (Lock.owns_lock()) {
248  auto FinishedCmd = static_cast<Command *>(FinishedEvent->getCommand());
249  // The command might have been cleaned up (and set to nullptr) by another
250  // thread
251  if (FinishedCmd)
252  MGraphBuilder.cleanupFinishedCommands(FinishedCmd, StreamsToDeallocate,
253  AuxResourcesToDeallocate);
254  }
255  }
256  deallocateStreams(StreamsToDeallocate);
257 }
258 
259 void Scheduler::removeMemoryObject(detail::SYCLMemObjI *MemObj) {
260  // We are going to traverse a graph of finished commands. Gather stream
261  // objects from these commands if any and deallocate buffers for these stream
262  // objects, this is needed to guarantee that streamed data is printed and
263  // resources are released.
264  std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
265  // Similar to streams, we also collect the auxiliary resources used by the
266  // commands. Cleanup will make sure the commands do not own the resources
267  // anymore, so we just need them to survive the graph lock then they can die
268  // as they go out of scope.
269  std::vector<std::shared_ptr<const void>> AuxResourcesToDeallocate;
270 
271  {
272  MemObjRecord *Record = nullptr;
273 
274  {
275  // This only needs a shared mutex as it only involves enqueueing and
276  // awaiting for events
277  ReadLockT Lock = acquireReadLock();
278 
279  Record = MGraphBuilder.getMemObjRecord(MemObj);
280  if (!Record)
281  // No operations were performed on the mem object
282  return;
283 
284  waitForRecordToFinish(Record, Lock);
285  }
286 
287  {
288  WriteLockT Lock = acquireWriteLock();
289  MGraphBuilder.decrementLeafCountersForRecord(Record);
290  MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate,
291  AuxResourcesToDeallocate);
292  MGraphBuilder.removeRecordForMemObj(MemObj);
293  }
294  }
295  deallocateStreams(StreamsToDeallocate);
296 }
297 
298 EventImplPtr Scheduler::addHostAccessor(Requirement *Req) {
299  std::vector<Command *> AuxiliaryCmds;
300  EventImplPtr NewCmdEvent = nullptr;
301 
302  {
303  WriteLockT Lock = acquireWriteLock();
304 
305  Command *NewCmd = MGraphBuilder.addHostAccessor(Req, AuxiliaryCmds);
306  if (!NewCmd)
307  return nullptr;
308  NewCmdEvent = NewCmd->getEvent();
309  }
310 
311  std::vector<Command *> ToCleanUp;
312  {
313  ReadLockT Lock = acquireReadLock();
314  EnqueueResultT Res;
315  bool Enqueued;
316 
317  for (Command *Cmd : AuxiliaryCmds) {
318  Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
319  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
320  throw runtime_error("Enqueue process failed.",
321  PI_ERROR_INVALID_OPERATION);
322  }
323 
324  if (Command *NewCmd = static_cast<Command *>(NewCmdEvent->getCommand())) {
325  Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
326  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
327  throw runtime_error("Enqueue process failed.",
328  PI_ERROR_INVALID_OPERATION);
329  }
330  }
331 
332  cleanupCommands(ToCleanUp);
333  return NewCmdEvent;
334 }
335 
336 void Scheduler::releaseHostAccessor(Requirement *Req) {
337  Command *const BlockedCmd = Req->MBlockedCmd;
338 
339  std::vector<Command *> ToCleanUp;
340  {
341  ReadLockT Lock = acquireReadLock();
342 
343  assert(BlockedCmd && "Can't find appropriate command to unblock");
344 
345  BlockedCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;
346 
347  enqueueLeavesOfReqUnlocked(Req, ToCleanUp);
348  }
349  cleanupCommands(ToCleanUp);
350 }
351 
352 void Scheduler::enqueueLeavesOfReqUnlocked(const Requirement *const Req,
353  std::vector<Command *> &ToCleanUp) {
354  MemObjRecord *Record = Req->MSYCLMemObj->MRecord.get();
355  auto EnqueueLeaves = [&ToCleanUp](LeavesCollection &Leaves) {
356  for (Command *Cmd : Leaves) {
357  EnqueueResultT Res;
358  bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
359  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
360  throw runtime_error("Enqueue process failed.",
361  PI_ERROR_INVALID_OPERATION);
362  }
363  };
364 
365  EnqueueLeaves(Record->MReadLeaves);
366  EnqueueLeaves(Record->MWriteLeaves);
367 }
368 
369 void Scheduler::allocateStreamBuffers(stream_impl *Impl,
370  size_t StreamBufferSize,
371  size_t FlushBufferSize) {
372  std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
373  StreamBuffersPool.insert(
374  {Impl, new StreamBuffers(StreamBufferSize, FlushBufferSize)});
375 }
376 
377 void Scheduler::deallocateStreamBuffers(stream_impl *Impl) {
378  std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
379  delete StreamBuffersPool[Impl];
380  StreamBuffersPool.erase(Impl);
381 }
382 
383 Scheduler::Scheduler() {
384  sycl::device HostDevice =
385  createSyclObjFromImpl<device>(device_impl::getHostDeviceImpl());
386  sycl::context HostContext{HostDevice};
387  DefaultHostQueue = QueueImplPtr(
388  new queue_impl(detail::getSyclObjImpl(HostDevice),
389  detail::getSyclObjImpl(HostContext), /*AsyncHandler=*/{},
390  /*PropList=*/{}));
391 }
392 
393 Scheduler::~Scheduler() {
394  // By specification there are several possible sync points: buffer
395  // destruction, wait() method of a queue or event. Stream doesn't introduce
396  // any synchronization point. It is guaranteed that stream is flushed and
397  // resources are released only if one of the listed sync points was used for
398  // the kernel. Otherwise resources for stream will not be released, issue a
399  // warning in this case.
401  std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
402  if (!StreamBuffersPool.empty())
403  fprintf(
404  stderr,
405  "\nWARNING: Some commands may have not finished the execution and "
406  "not all resources were released. Please be sure that all kernels "
407  "have synchronization points.\n\n");
408  }
409  // There might be some commands scheduled for post enqueue cleanup that
410  // haven't been freed because of the graph mutex being locked at the time,
411  // clean them up now.
412  cleanupCommands({});
413 }
414 
415 MemObjRecord *Scheduler::getMemObjRecord(const Requirement *const Req) {
416  return Req->MSYCLMemObj->MRecord.get();
417 }
418 
419 void Scheduler::cleanupCommands(const std::vector<Command *> &Cmds) {
420  if (Cmds.empty())
421  {
422  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
423  if (MDeferredCleanupCommands.empty())
424  return;
425  }
426 
427  WriteLockT Lock(MGraphLock, std::try_to_lock);
428  // In order to avoid deadlocks related to blocked commands, defer cleanup if
429  // the lock wasn't acquired.
430  if (Lock.owns_lock()) {
431  for (Command *Cmd : Cmds) {
432  MGraphBuilder.cleanupCommand(Cmd);
433  }
434  std::vector<Command *> DeferredCleanupCommands;
435  {
436  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
437  std::swap(DeferredCleanupCommands, MDeferredCleanupCommands);
438  }
439  for (Command *Cmd : DeferredCleanupCommands) {
440  MGraphBuilder.cleanupCommand(Cmd);
441  }
442 
443  } else {
444  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
445  MDeferredCleanupCommands.insert(MDeferredCleanupCommands.end(),
446  Cmds.begin(), Cmds.end());
447  }
448 }
449 
450 void Scheduler::NotifyHostTaskCompletion(Command *Cmd, Command *BlockingCmd) {
451  // Completing command's event along with unblocking enqueue readiness of
452  // empty command may lead to quick deallocation of MThisCmd by some cleanup
453  // process. Thus we'll copy deps prior to completing of event and unblocking
454  // of empty command.
455  // Also, it's possible to have record deallocated prior to enqueue process.
456  // Thus we employ read-lock of graph.
457 
458  std::vector<Command *> ToCleanUp;
459  {
460  ReadLockT Lock = acquireReadLock();
461 
462  std::vector<DepDesc> Deps = Cmd->MDeps;
463 
464  // update self-event status
465  Cmd->getEvent()->setComplete();
466 
467  BlockingCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;
468 
469  for (const DepDesc &Dep : Deps)
470  Scheduler::enqueueLeavesOfReqUnlocked(Dep.MDepRequirement, ToCleanUp);
471  }
472  cleanupCommands(ToCleanUp);
473 }
474 
475 } // namespace detail
476 } // __SYCL_INLINE_VER_NAMESPACE(_V1)
477 } // namespace sycl
Base class for memory allocation commands.
Definition: commands.hpp:373
"Execute kernel" command group class.
Definition: cg.hpp:126
CGTYPE
Type of the command group.
Definition: cg.hpp:55
The Command class represents some action that needs to be performed on one or more memory objects.
Definition: commands.hpp:95
void resolveReleaseDependencies(std::set< Command * > &list)
Looks at all the dependencies for the release command and enables instrumentation to report these dep...
Definition: commands.cpp:770
std::unordered_set< Command * > MUsers
Contains list of commands that depend on the command.
Definition: commands.hpp:267
std::vector< DepDesc > MDeps
Contains list of dependencies(edges)
Definition: commands.hpp:265
std::atomic< EnqueueResultT::ResultT > MEnqueueStatus
Describes the status of the command.
Definition: commands.hpp:288
const EventImplPtr & getEvent() const
Definition: commands.hpp:151
const QueueImplPtr & getQueue() const
Definition: commands.hpp:149
A wrapper for CircularBuffer class along with collection for host accessor's EmptyCommands.
std::shared_ptr< MemObjRecord > MRecord
DPC++ graph scheduler class.
Definition: scheduler.hpp:358
std::unique_lock< RWLockT > WriteLockT
Definition: scheduler.hpp:454
std::shared_lock< RWLockT > ReadLockT
Definition: scheduler.hpp:453
#define __SYCL_INLINE_VER_NAMESPACE(X)
bool trace(TraceLevel level)
Definition: pi.cpp:394
void initStream(StreamImplPtr Stream, QueueImplPtr Queue)
decltype(Obj::impl) getSyclObjImpl(const Obj &SyclObject)
Definition: common.hpp:248
static void deallocateStreams(std::vector< std::shared_ptr< stream_impl >> &StreamsToDeallocate)
Definition: scheduler.cpp:221
std::shared_ptr< detail::stream_impl > StreamImplPtr
Definition: commands.hpp:38
std::shared_ptr< event_impl > EventImplPtr
Definition: cg.hpp:42
std::shared_ptr< sycl::detail::queue_impl > QueueImplPtr
Definition: event_impl.hpp:32
---— Error handling, matching OpenCL plugin semantics.
Definition: access.hpp:14
Dependency between two commands.
Definition: commands.hpp:69
Result of command enqueueing.
Definition: commands.hpp:50
ResultT MResult
Indicates the result of enqueueing.
Definition: commands.hpp:61
Memory Object Record.
Definition: scheduler.hpp:193
std::vector< AllocaCommandBase * > MAllocaCommands
Definition: scheduler.hpp:200