DPC++ Runtime
Runtime libraries for oneAPI DPC++
scheduler.cpp
Go to the documentation of this file.
1 //===-- scheduler.cpp - SYCL Scheduler --------------------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
11 #include <detail/graph_impl.hpp>
12 #include <detail/queue_impl.hpp>
14 #include <detail/stream_impl.hpp>
15 #include <sycl/device_selector.hpp>
16 
17 #include <chrono>
18 #include <cstdio>
19 #include <memory>
20 #include <mutex>
21 #include <set>
22 #include <thread>
23 #include <vector>
24 
25 namespace sycl {
26 inline namespace _V1 {
27 namespace detail {
28 
30  for (Command *Cmd : Record->MReadLeaves) {
31  if (!(Cmd->getType() == detail::Command::ALLOCA ||
33  !Cmd->getEvent()->isCompleted())
34  return false;
35  }
36  for (Command *Cmd : Record->MWriteLeaves) {
37  if (!(Cmd->getType() == detail::Command::ALLOCA ||
39  !Cmd->getEvent()->isCompleted())
40  return false;
41  }
42  return true;
43 }
44 
46  ReadLockT &GraphReadLock) {
47 #ifdef XPTI_ENABLE_INSTRUMENTATION
48  // Will contain the list of dependencies for the Release Command
49  std::set<Command *> DepCommands;
50 #endif
51  std::vector<Command *> ToCleanUp;
52  for (Command *Cmd : Record->MReadLeaves) {
53  EnqueueResultT Res;
54  bool Enqueued =
55  GraphProcessor::enqueueCommand(Cmd, GraphReadLock, Res, ToCleanUp, Cmd);
56  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
57  throw runtime_error("Enqueue process failed.",
58  PI_ERROR_INVALID_OPERATION);
59 #ifdef XPTI_ENABLE_INSTRUMENTATION
60  // Capture the dependencies
61  DepCommands.insert(Cmd);
62 #endif
63  GraphProcessor::waitForEvent(Cmd->getEvent(), GraphReadLock, ToCleanUp);
64  }
65  for (Command *Cmd : Record->MWriteLeaves) {
66  EnqueueResultT Res;
67  bool Enqueued =
68  GraphProcessor::enqueueCommand(Cmd, GraphReadLock, Res, ToCleanUp, Cmd);
69  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
70  throw runtime_error("Enqueue process failed.",
71  PI_ERROR_INVALID_OPERATION);
72 #ifdef XPTI_ENABLE_INSTRUMENTATION
73  DepCommands.insert(Cmd);
74 #endif
75  GraphProcessor::waitForEvent(Cmd->getEvent(), GraphReadLock, ToCleanUp);
76  }
77  for (AllocaCommandBase *AllocaCmd : Record->MAllocaCommands) {
78  Command *ReleaseCmd = AllocaCmd->getReleaseCmd();
79  EnqueueResultT Res;
80  bool Enqueued = GraphProcessor::enqueueCommand(ReleaseCmd, GraphReadLock,
81  Res, ToCleanUp, ReleaseCmd);
82  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
83  throw runtime_error("Enqueue process failed.",
84  PI_ERROR_INVALID_OPERATION);
85 #ifdef XPTI_ENABLE_INSTRUMENTATION
86  // Report these dependencies to the Command so these dependencies can be
87  // reported as edges
88  ReleaseCmd->resolveReleaseDependencies(DepCommands);
89 #endif
90  GraphProcessor::waitForEvent(ReleaseCmd->getEvent(), GraphReadLock,
91  ToCleanUp);
92  }
93 }
94 
96  std::unique_ptr<detail::CG> CommandGroup, const QueueImplPtr &Queue,
98  const std::vector<sycl::detail::pi::PiExtSyncPoint> &Dependencies) {
99  EventImplPtr NewEvent = nullptr;
100  const CG::CGTYPE Type = CommandGroup->getType();
101  std::vector<Command *> AuxiliaryCmds;
102  std::vector<StreamImplPtr> Streams;
103 
104  if (Type == CG::Kernel) {
105  auto *CGExecKernelPtr = static_cast<CGExecKernel *>(CommandGroup.get());
106  Streams = CGExecKernelPtr->getStreams();
107  CGExecKernelPtr->clearStreams();
108  // Stream's flush buffer memory is mainly initialized in stream's __init
109  // method. However, this method is not available on host device.
110  // Initializing stream's flush buffer on the host side in a separate task.
111  if (Queue->is_host()) {
112  for (const StreamImplPtr &Stream : Streams) {
113  Stream->initStreamHost(Queue);
114  }
115  }
116  }
117  std::vector<std::shared_ptr<const void>> AuxiliaryResources;
118  AuxiliaryResources = CommandGroup->getAuxiliaryResources();
119  CommandGroup->clearAuxiliaryResources();
120 
121  bool ShouldEnqueue = true;
122  {
123  WriteLockT Lock = acquireWriteLock();
124 
125  Command *NewCmd = nullptr;
126  switch (Type) {
127  case CG::UpdateHost:
128  NewCmd = MGraphBuilder.addCGUpdateHost(std::move(CommandGroup),
129  DefaultHostQueue, AuxiliaryCmds);
130  NewEvent = NewCmd->getEvent();
131  break;
132  case CG::CodeplayHostTask: {
133  auto Result = MGraphBuilder.addCG(std::move(CommandGroup),
134  DefaultHostQueue, AuxiliaryCmds);
135  NewCmd = Result.NewCmd;
136  NewEvent = Result.NewEvent;
137  ShouldEnqueue = Result.ShouldEnqueue;
138  break;
139  }
140  default:
141  auto Result = MGraphBuilder.addCG(std::move(CommandGroup),
142  std::move(Queue), AuxiliaryCmds,
143  CommandBuffer, std::move(Dependencies));
144 
145  NewCmd = Result.NewCmd;
146  NewEvent = Result.NewEvent;
147  ShouldEnqueue = Result.ShouldEnqueue;
148  }
149  NewEvent->setSubmissionTime();
150  }
151 
152  if (ShouldEnqueue) {
153  enqueueCommandForCG(NewEvent, AuxiliaryCmds);
154 
155  for (const auto &StreamImplPtr : Streams) {
156  StreamImplPtr->flush(NewEvent);
157  }
158  }
159 
160  if (!AuxiliaryResources.empty())
161  registerAuxiliaryResources(NewEvent, std::move(AuxiliaryResources));
162 
163  return NewEvent;
164 }
165 
167  std::vector<Command *> &AuxiliaryCmds,
168  BlockingT Blocking) {
169  std::vector<Command *> ToCleanUp;
170  {
171  ReadLockT Lock = acquireReadLock();
172 
173  Command *NewCmd =
174  (NewEvent) ? static_cast<Command *>(NewEvent->getCommand()) : nullptr;
175 
176  EnqueueResultT Res;
177  bool Enqueued;
178 
179  auto CleanUp = [&]() {
180  if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
181  if (NewEvent) {
182  NewEvent->setCommand(nullptr);
183  }
184  delete NewCmd;
185  }
186  };
187 
188  for (Command *Cmd : AuxiliaryCmds) {
189  Enqueued = GraphProcessor::enqueueCommand(Cmd, Lock, Res, ToCleanUp, Cmd,
190  Blocking);
191  try {
192  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
193  throw runtime_error("Auxiliary enqueue process failed.",
194  PI_ERROR_INVALID_OPERATION);
195  } catch (...) {
196  // enqueueCommand() func and if statement above may throw an exception,
197  // so destroy required resources to avoid memory leak
198  CleanUp();
199  std::rethrow_exception(std::current_exception());
200  }
201  }
202 
203  if (NewCmd) {
204  // TODO: Check if lazy mode.
205  EnqueueResultT Res;
206  try {
207  bool Enqueued = GraphProcessor::enqueueCommand(
208  NewCmd, Lock, Res, ToCleanUp, NewCmd, Blocking);
209  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
210  throw runtime_error("Enqueue process failed.",
211  PI_ERROR_INVALID_OPERATION);
212  } catch (...) {
213  // enqueueCommand() func and if statement above may throw an exception,
214  // so destroy required resources to avoid memory leak
215  CleanUp();
216  std::rethrow_exception(std::current_exception());
217  }
218  }
219  }
220  cleanupCommands(ToCleanUp);
221 }
222 
224  std::vector<Command *> AuxiliaryCmds;
225  Command *NewCmd = nullptr;
226  {
227  WriteLockT Lock = acquireWriteLock();
228  NewCmd = MGraphBuilder.addCopyBack(Req, AuxiliaryCmds);
229  // Command was not creted because there were no operations with
230  // buffer.
231  if (!NewCmd)
232  return nullptr;
233  }
234 
235  std::vector<Command *> ToCleanUp;
236  try {
237  ReadLockT Lock = acquireReadLock();
238  EnqueueResultT Res;
239  bool Enqueued;
240 
241  for (Command *Cmd : AuxiliaryCmds) {
242  Enqueued = GraphProcessor::enqueueCommand(Cmd, Lock, Res, ToCleanUp, Cmd);
243  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
244  throw runtime_error("Enqueue process failed.",
245  PI_ERROR_INVALID_OPERATION);
246  }
247 
248  Enqueued =
249  GraphProcessor::enqueueCommand(NewCmd, Lock, Res, ToCleanUp, NewCmd);
250  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
251  throw runtime_error("Enqueue process failed.",
252  PI_ERROR_INVALID_OPERATION);
253  } catch (...) {
254  NewCmd->getQueue()->reportAsyncException(std::current_exception());
255  }
256  EventImplPtr NewEvent = NewCmd->getEvent();
257  cleanupCommands(ToCleanUp);
258  return NewEvent;
259 }
260 
263 }
264 
267 }
268 
269 void Scheduler::waitForEvent(const EventImplPtr &Event, bool *Success) {
270  ReadLockT Lock = acquireReadLock();
271  // It's fine to leave the lock unlocked upon return from waitForEvent as
272  // there's no more actions to do here with graph
273  std::vector<Command *> ToCleanUp;
274  GraphProcessor::waitForEvent(std::move(Event), Lock, ToCleanUp,
275  /*LockTheLock=*/false, Success);
276  cleanupCommands(ToCleanUp);
277 }
278 
280  bool StrictLock) {
281  MemObjRecord *Record = MGraphBuilder.getMemObjRecord(MemObj);
282  if (!Record)
283  // No operations were performed on the mem object
284  return true;
285 
286  {
287  // This only needs a shared mutex as it only involves enqueueing and
288  // awaiting for events
289  ReadLockT Lock = StrictLock ? ReadLockT(MGraphLock)
290  : ReadLockT(MGraphLock, std::try_to_lock);
291  if (!Lock.owns_lock())
292  return false;
293  waitForRecordToFinish(Record, Lock);
294  }
295  {
296  WriteLockT Lock = StrictLock ? acquireWriteLock()
297  : WriteLockT(MGraphLock, std::try_to_lock);
298  if (!Lock.owns_lock())
299  return false;
303  }
304  return true;
305 }
306 
308  std::vector<Command *> AuxiliaryCmds;
309  EventImplPtr NewCmdEvent = nullptr;
310 
311  {
312  WriteLockT Lock = acquireWriteLock();
313 
314  Command *NewCmd = MGraphBuilder.addHostAccessor(Req, AuxiliaryCmds);
315  if (!NewCmd)
316  return nullptr;
317  NewCmdEvent = NewCmd->getEvent();
318  }
319 
320  std::vector<Command *> ToCleanUp;
321  {
322  ReadLockT Lock = acquireReadLock();
323  EnqueueResultT Res;
324  bool Enqueued;
325 
326  for (Command *Cmd : AuxiliaryCmds) {
327  Enqueued = GraphProcessor::enqueueCommand(Cmd, Lock, Res, ToCleanUp, Cmd);
328  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
329  throw runtime_error("Enqueue process failed.",
330  PI_ERROR_INVALID_OPERATION);
331  }
332 
333  if (Command *NewCmd = static_cast<Command *>(NewCmdEvent->getCommand())) {
334  Enqueued =
335  GraphProcessor::enqueueCommand(NewCmd, Lock, Res, ToCleanUp, NewCmd);
336  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
337  throw runtime_error("Enqueue process failed.",
338  PI_ERROR_INVALID_OPERATION);
339  }
340  }
341 
342  cleanupCommands(ToCleanUp);
343  return NewCmdEvent;
344 }
345 
347  Command *const BlockedCmd = Req->MBlockedCmd;
348 
349  std::vector<Command *> ToCleanUp;
350  {
351  ReadLockT Lock = acquireReadLock();
352 
353  assert(BlockedCmd && "Can't find appropriate command to unblock");
354 
356 
357  enqueueLeavesOfReqUnlocked(Req, Lock, ToCleanUp);
358  }
359  cleanupCommands(ToCleanUp);
360 }
361 
363  ReadLockT &GraphReadLock,
364  std::vector<Command *> &ToCleanUp) {
365  MemObjRecord *Record = Req->MSYCLMemObj->MRecord.get();
366  auto EnqueueLeaves = [&ToCleanUp, &GraphReadLock](LeavesCollection &Leaves) {
367  for (Command *Cmd : Leaves) {
368  EnqueueResultT Res;
369  bool Enqueued = GraphProcessor::enqueueCommand(Cmd, GraphReadLock, Res,
370  ToCleanUp, Cmd);
371  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
372  throw runtime_error("Enqueue process failed.",
373  PI_ERROR_INVALID_OPERATION);
374  }
375  };
376 
377  EnqueueLeaves(Record->MReadLeaves);
378  EnqueueLeaves(Record->MWriteLeaves);
379 }
380 
382  const std::vector<EventImplPtr> &ToEnqueue, ReadLockT &GraphReadLock,
383  std::vector<Command *> &ToCleanUp) {
384  for (auto &Event : ToEnqueue) {
385  Command *Cmd = static_cast<Command *>(Event->getCommand());
386  if (!Cmd)
387  continue;
388  EnqueueResultT Res;
389  bool Enqueued =
390  GraphProcessor::enqueueCommand(Cmd, GraphReadLock, Res, ToCleanUp, Cmd);
391  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
392  throw runtime_error("Enqueue process failed.",
393  PI_ERROR_INVALID_OPERATION);
394  }
395 }
396 
398  sycl::device HostDevice =
399  createSyclObjFromImpl<device>(device_impl::getHostDeviceImpl());
400  sycl::context HostContext{HostDevice};
402  new queue_impl(detail::getSyclObjImpl(HostDevice),
403  detail::getSyclObjImpl(HostContext), /*AsyncHandler=*/{},
404  /*PropList=*/{sycl::property::queue::enable_profiling()}));
405 }
406 
408 
410  // There might be some commands scheduled for post enqueue cleanup that
411  // haven't been freed because of the graph mutex being locked at the time,
412  // clean them up now.
413  cleanupCommands({});
414 
415  cleanupAuxiliaryResources(Blocking);
416  // We need loop since sometimes we may need new objects to be added to
417  // deferred mem objects storage during cleanup. Known example is: we cleanup
418  // existing deferred mem objects under write lock, during this process we
419  // cleanup commands related to this record, command may have last reference to
420  // queue_impl, ~queue_impl is called and buffer for assert (which is created
421  // with size only so all confitions for deferred release are satisfied) is
422  // added to deferred mem obj storage. So we may end up with leak.
423  do {
424  cleanupDeferredMemObjects(Blocking);
425  } while (Blocking == BlockingT::BLOCKING && !isDeferredMemObjectsEmpty());
426 }
427 
429  return Req->MSYCLMemObj->MRecord.get();
430 }
431 
432 void Scheduler::cleanupCommands(const std::vector<Command *> &Cmds) {
435 
436  if (Cmds.empty()) {
437  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
438  if (MDeferredCleanupCommands.empty())
439  return;
440  }
441 
442  WriteLockT Lock(MGraphLock, std::try_to_lock);
443  // In order to avoid deadlocks related to blocked commands, defer cleanup if
444  // the lock wasn't acquired.
445  if (Lock.owns_lock()) {
446  for (Command *Cmd : Cmds) {
448  }
449  std::vector<Command *> DeferredCleanupCommands;
450  {
451  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
452  std::swap(DeferredCleanupCommands, MDeferredCleanupCommands);
453  }
454  for (Command *Cmd : DeferredCleanupCommands) {
456  }
457 
458  } else {
459  std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
460  // Full cleanup for fusion placeholder commands is handled by the entry
461  // points for fusion (start_fusion, ...). To avoid double free or access to
462  // objects after their lifetime, fusion commands should therefore never be
463  // added to the deferred command list.
464  std::copy_if(Cmds.begin(), Cmds.end(),
465  std::back_inserter(MDeferredCleanupCommands),
466  [](const Command *Cmd) {
467  return Cmd->getType() != Command::CommandType::FUSION;
468  });
469  }
470 }
471 
473  // Completing command's event along with unblocking enqueue readiness of
474  // empty command may lead to quick deallocation of MThisCmd by some cleanup
475  // process. Thus we'll copy deps prior to completing of event and unblocking
476  // of empty command.
477  // Also, it's possible to have record deallocated prior to enqueue process.
478  // Thus we employ read-lock of graph.
479 
480  std::vector<Command *> ToCleanUp;
481  {
482  ReadLockT Lock = acquireReadLock();
483 
484  std::vector<DepDesc> Deps = Cmd->MDeps;
485  // Host tasks are cleaned up upon completion rather than enqueuing.
486  if (Cmd->MLeafCounter == 0) {
487  ToCleanUp.push_back(Cmd);
488  Cmd->MMarkedForCleanup = true;
489  }
490 
491  {
492  std::lock_guard<std::mutex> Guard(Cmd->MBlockedUsersMutex);
493  // update self-event status
494  Cmd->getEvent()->setComplete();
495  }
496  Scheduler::enqueueUnblockedCommands(Cmd->MBlockedUsers, Lock, ToCleanUp);
497  }
498  cleanupCommands(ToCleanUp);
499 }
500 
501 void Scheduler::deferMemObjRelease(const std::shared_ptr<SYCLMemObjI> &MemObj) {
502  {
503  std::lock_guard<std::mutex> Lock{MDeferredMemReleaseMutex};
504  MDeferredMemObjRelease.push_back(MemObj);
505  }
507 }
508 
510  std::lock_guard<std::mutex> Lock{MDeferredMemReleaseMutex};
511  return MDeferredMemObjRelease.empty();
512 }
513 
516  return;
517  if (Blocking == BlockingT::BLOCKING) {
518  std::vector<std::shared_ptr<SYCLMemObjI>> TempStorage;
519  {
520  std::lock_guard<std::mutex> LockDef{MDeferredMemReleaseMutex};
521  MDeferredMemObjRelease.swap(TempStorage);
522  }
523  // if any objects in TempStorage exist - it is leaving scope and being
524  // deleted
525  }
526 
527  std::vector<std::shared_ptr<SYCLMemObjI>> ObjsReadyToRelease;
528  {
529  // Lock is needed for checkLeavesCompletion - if walks through Record leaves
530  ReadLockT Lock = ReadLockT(MGraphLock, std::try_to_lock);
531  if (Lock.owns_lock()) {
532  // Not expected that Blocking == true will be used in parallel with
533  // adding MemObj to storage, no such scenario.
534  std::lock_guard<std::mutex> LockDef{MDeferredMemReleaseMutex};
535  auto MemObjIt = MDeferredMemObjRelease.begin();
536  while (MemObjIt != MDeferredMemObjRelease.end()) {
537  MemObjRecord *Record = MGraphBuilder.getMemObjRecord((*MemObjIt).get());
538  if (!checkLeavesCompletion(Record)) {
539  MemObjIt++;
540  continue;
541  }
542  ObjsReadyToRelease.push_back(*MemObjIt);
543  MemObjIt = MDeferredMemObjRelease.erase(MemObjIt);
544  }
545  }
546  }
547  auto ReleaseCandidateIt = ObjsReadyToRelease.begin();
548  while (ReleaseCandidateIt != ObjsReadyToRelease.end()) {
549  if (!removeMemoryObject(ReleaseCandidateIt->get(), false))
550  break;
551  ReleaseCandidateIt = ObjsReadyToRelease.erase(ReleaseCandidateIt);
552  }
553  if (!ObjsReadyToRelease.empty()) {
554  std::lock_guard<std::mutex> LockDef{MDeferredMemReleaseMutex};
555  MDeferredMemObjRelease.insert(
557  std::make_move_iterator(ObjsReadyToRelease.begin()),
558  std::make_move_iterator(ObjsReadyToRelease.end()));
559  }
560 }
561 
563  std::unordered_map<EventImplPtr, std::vector<std::shared_ptr<const void>>>
564  &AuxiliaryResources,
565  const EventImplPtr &Event,
566  std::vector<std::shared_ptr<const void>> &&Resources) {
567  std::vector<std::shared_ptr<const void>> &StoredResources =
568  AuxiliaryResources[Event];
569  StoredResources.insert(StoredResources.end(),
570  std::make_move_iterator(Resources.begin()),
571  std::make_move_iterator(Resources.end()));
572 }
573 
575  const EventImplPtr &Src) {
576  std::unique_lock<std::mutex> Lock{MAuxiliaryResourcesMutex};
577  auto Iter = MAuxiliaryResources.find(Src);
578  if (Iter == MAuxiliaryResources.end()) {
579  return;
580  }
582  std::move(Iter->second));
583  MAuxiliaryResources.erase(Iter);
584 }
585 
587  EventImplPtr &Event, std::vector<std::shared_ptr<const void>> Resources) {
588  std::unique_lock<std::mutex> Lock{MAuxiliaryResourcesMutex};
590  std::move(Resources));
591 }
592 
594  std::unique_lock<std::mutex> Lock{MAuxiliaryResourcesMutex};
595  for (auto It = MAuxiliaryResources.begin();
596  It != MAuxiliaryResources.end();) {
597  const EventImplPtr &Event = It->first;
598  if (Blocking == BlockingT::BLOCKING) {
599  Event->waitInternal();
600  It = MAuxiliaryResources.erase(It);
601  } else if (Event->isCompleted())
602  It = MAuxiliaryResources.erase(It);
603  else
604  ++It;
605  }
606 }
607 
609  WriteLockT Lock = acquireWriteLock();
610  WriteLockT FusionMapLock = acquireFusionWriteLock();
611  MGraphBuilder.startFusion(Queue);
612 }
613 
614 void Scheduler::cleanUpCmdFusion(sycl::detail::queue_impl *Queue) {
615  // No graph lock, we might be called because the graph builder is releasing
616  // resources.
617  WriteLockT FusionMapLock = acquireFusionWriteLock();
619 }
620 
622  std::vector<Command *> ToEnqueue;
623  {
624  WriteLockT Lock = acquireWriteLock();
625  WriteLockT FusionMapLock = acquireFusionWriteLock();
626  MGraphBuilder.cancelFusion(Queue, ToEnqueue);
627  }
628  enqueueCommandForCG(nullptr, ToEnqueue);
629 }
630 
632  const property_list &PropList) {
633  std::vector<Command *> ToEnqueue;
634  EventImplPtr FusedEvent;
635  {
636  WriteLockT Lock = acquireWriteLock();
637  WriteLockT FusionMapLock = acquireFusionWriteLock();
638  FusedEvent = MGraphBuilder.completeFusion(Queue, ToEnqueue, PropList);
639  }
640  enqueueCommandForCG(nullptr, ToEnqueue);
641 
642  return FusedEvent;
643 }
644 
648 }
649 
650 void Scheduler::printFusionWarning(const std::string &Message) {
652  std::cerr << "WARNING: " << Message << "\n";
653  }
654 }
655 
656 KernelFusionCommand *Scheduler::isPartOfActiveFusion(Command *Cmd) {
657  auto CmdType = Cmd->getType();
658  switch (CmdType) {
659  case Command::FUSION: {
660  auto *FusionCmd = static_cast<KernelFusionCommand *>(Cmd);
661  return (FusionCmd->isActive()) ? FusionCmd : nullptr;
662  }
663  case Command::RUN_CG: {
664  auto *CGCmd = static_cast<ExecCGCommand *>(Cmd);
665  return (CGCmd->MFusionCmd && CGCmd->MFusionCmd->isActive())
666  ? CGCmd->MFusionCmd
667  : nullptr;
668  }
669  default:
670  return nullptr;
671  }
672 }
673 
676  std::vector<std::shared_ptr<ext::oneapi::experimental::detail::node_impl>>
677  Nodes,
678  const QueueImplPtr &Queue, std::vector<Requirement *> Requirements,
679  std::vector<detail::EventImplPtr> &Events) {
680  std::vector<Command *> AuxiliaryCmds;
681  EventImplPtr NewCmdEvent = nullptr;
682 
683  {
684  WriteLockT Lock = acquireWriteLock();
685 
687  Graph, Nodes, Queue, Requirements, Events, AuxiliaryCmds);
688  if (!NewCmd)
689  return nullptr;
690  NewCmdEvent = NewCmd->getEvent();
691  }
692 
693  std::vector<Command *> ToCleanUp;
694  {
695  ReadLockT Lock = acquireReadLock();
696  EnqueueResultT Res;
697  bool Enqueued;
698 
699  for (Command *Cmd : AuxiliaryCmds) {
700  Enqueued = GraphProcessor::enqueueCommand(Cmd, Lock, Res, ToCleanUp, Cmd);
701  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
702  throw runtime_error("Enqueue process failed.",
703  PI_ERROR_INVALID_OPERATION);
704  }
705 
706  if (Command *NewCmd = static_cast<Command *>(NewCmdEvent->getCommand())) {
707  Enqueued =
708  GraphProcessor::enqueueCommand(NewCmd, Lock, Res, ToCleanUp, NewCmd);
709  if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
710  throw runtime_error("Enqueue process failed.",
711  PI_ERROR_INVALID_OPERATION);
712  }
713  }
714 
715  cleanupCommands(ToCleanUp);
716  return NewCmdEvent;
717 }
718 
719 } // namespace detail
720 } // namespace _V1
721 } // namespace sycl
The context class represents a SYCL context on which kernel functions may be executed.
Definition: context.hpp:51
Base class for memory allocation commands.
Definition: commands.hpp:458
"Execute kernel" command group class.
Definition: cg.hpp:167
std::vector< std::shared_ptr< detail::stream_impl > > getStreams() const
Definition: cg.hpp:206
CGTYPE
Type of the command group.
Definition: cg.hpp:56
The Command class represents some action that needs to be performed on one or more memory objects.
Definition: commands.hpp:107
void resolveReleaseDependencies(std::set< Command * > &list)
Looks at all the dependencies for the release command and enables instrumentation to report these dep...
Definition: commands.cpp:909
bool MMarkedForCleanup
Indicates that the node will be freed by graph cleanup.
Definition: commands.hpp:391
unsigned MLeafCounter
Counts the number of memory objects this command is a leaf for.
Definition: commands.hpp:326
std::unordered_set< Command * > MUsers
Contains list of commands that depend on the command.
Definition: commands.hpp:322
std::vector< DepDesc > MDeps
Contains list of dependencies(edges)
Definition: commands.hpp:320
std::vector< EventImplPtr > MBlockedUsers
Contains list of commands that depends on the host command explicitly (by depends_on).
Definition: commands.hpp:398
std::atomic< EnqueueResultT::ResultT > MEnqueueStatus
Describes the status of the command.
Definition: commands.hpp:343
const EventImplPtr & getEvent() const
Definition: commands.hpp:180
CommandType getType() const
Definition: commands.hpp:144
const QueueImplPtr & getQueue() const
Definition: commands.hpp:178
static GlobalHandler & instance()
A wrapper for CircularBuffer class along with collection for host accessor's EmptyCommands.
std::shared_ptr< MemObjRecord > MRecord
Command * addCommandGraphUpdate(ext::oneapi::experimental::detail::exec_graph_impl *Graph, std::vector< std::shared_ptr< ext::oneapi::experimental::detail::node_impl >> Nodes, const QueueImplPtr &Queue, std::vector< Requirement * > Requirements, std::vector< detail::EventImplPtr > &Events, std::vector< Command * > &ToEnqueue)
Adds a command buffer update operation to the execution graph.
void cleanupCommand(Command *Cmd, bool AllowUnsubmitted=false)
void decrementLeafCountersForRecord(MemObjRecord *Record)
Decrements leaf counters for all leaves of the record.
MemObjRecord * getMemObjRecord(SYCLMemObjI *MemObject)
EventImplPtr completeFusion(QueueImplPtr Queue, std::vector< Command * > &ToEnqueue, const property_list &)
Command * addHostAccessor(Requirement *Req, std::vector< Command * > &ToEnqueue)
Enqueues a command to create a host accessor.
void cleanupCommandsForRecord(MemObjRecord *Record)
Removes commands that use the given MemObjRecord from the graph.
GraphBuildResult addCG(std::unique_ptr< detail::CG > CommandGroup, const QueueImplPtr &Queue, std::vector< Command * > &ToEnqueue, sycl::detail::pi::PiExtCommandBuffer CommandBuffer=nullptr, const std::vector< sycl::detail::pi::PiExtSyncPoint > &Dependencies={})
Registers command group and adds it to the dependency graph.
void removeRecordForMemObj(SYCLMemObjI *MemObject)
Removes the MemObjRecord for the memory object passed.
Command * addCopyBack(Requirement *Req, std::vector< Command * > &ToEnqueue)
Enqueues a command to update memory to the latest state.
Command * addCGUpdateHost(std::unique_ptr< detail::CG > CommandGroup, const QueueImplPtr &HostQueue, std::vector< Command * > &ToEnqueue)
Registers a command group that updates host memory to the latest state.
void cancelFusion(QueueImplPtr Queue, std::vector< Command * > &ToEnqueue)
void cleanUpCmdFusion(sycl::detail::queue_impl *Queue)
Clean up the internal fusion commands held for the given queue.
static void waitForEvent(const EventImplPtr &Event, ReadLockT &GraphReadLock, std::vector< Command * > &ToCleanUp, bool LockTheLock=true, bool *Success=nullptr)
Waits for the command, associated with Event passed, is completed.
static bool enqueueCommand(Command *Cmd, ReadLockT &GraphReadLock, EnqueueResultT &EnqueueResult, std::vector< Command * > &ToCleanUp, Command *RootCommand, BlockingT Blocking=NON_BLOCKING)
Enqueues the command and all its dependencies.
DPC++ graph scheduler class.
Definition: scheduler.hpp:367
void waitForEvent(const EventImplPtr &Event, bool *Success=nullptr)
Waits for the event.
Definition: scheduler.cpp:269
ReadLockT acquireFusionReadLock()
Provides shared access to std::shared_timed_mutex object with deadlock avoidance to the Fusion map.
Definition: scheduler.hpp:543
EventImplPtr addCopyBack(Requirement *Req)
Registers a command group, that copies most recent memory to the memory pointed by the requirement.
Definition: scheduler.cpp:223
static void enqueueUnblockedCommands(const std::vector< EventImplPtr > &CmdsToEnqueue, ReadLockT &GraphReadLock, std::vector< Command * > &ToCleanUp)
Definition: scheduler.cpp:381
ReadLockT acquireReadLock()
Provides shared access to std::shared_timed_mutex object with deadlock avoidance.
Definition: scheduler.hpp:539
EventImplPtr addHostAccessor(Requirement *Req)
Adds nodes to the graph, that update the requirement with the pointer to the host memory.
Definition: scheduler.cpp:307
std::unordered_map< EventImplPtr, std::vector< std::shared_ptr< const void > > > MAuxiliaryResources
Definition: scheduler.hpp:959
void registerAuxiliaryResources(EventImplPtr &Event, std::vector< std::shared_ptr< const void >> Resources)
Definition: scheduler.cpp:586
void cleanupAuxiliaryResources(BlockingT Blocking)
Definition: scheduler.cpp:593
std::unique_lock< RWLockT > WriteLockT
Definition: scheduler.hpp:497
EventImplPtr completeFusion(QueueImplPtr Queue, const property_list &)
Definition: scheduler.cpp:631
EventImplPtr addCommandGraphUpdate(ext::oneapi::experimental::detail::exec_graph_impl *Graph, std::vector< std::shared_ptr< ext::oneapi::experimental::detail::node_impl >> Nodes, const QueueImplPtr &Queue, std::vector< Requirement * > Requirements, std::vector< detail::EventImplPtr > &Events)
Adds a command buffer update operation to the execution graph.
Definition: scheduler.cpp:674
void cleanupDeferredMemObjects(BlockingT Blocking)
Definition: scheduler.cpp:514
static void enqueueLeavesOfReqUnlocked(const Requirement *const Req, ReadLockT &GraphReadLock, std::vector< Command * > &ToCleanUp)
Definition: scheduler.cpp:362
void enqueueCommandForCG(EventImplPtr NewEvent, std::vector< Command * > &AuxilaryCmds, BlockingT Blocking=NON_BLOCKING)
Definition: scheduler.cpp:166
bool isInFusionMode(QueueIdT Queue)
Definition: scheduler.cpp:645
void cancelFusion(QueueImplPtr Queue)
Definition: scheduler.cpp:621
std::shared_lock< RWLockT > ReadLockT
Definition: scheduler.hpp:496
std::vector< std::shared_ptr< SYCLMemObjI > > MDeferredMemObjRelease
Definition: scheduler.hpp:955
void startFusion(QueueImplPtr Queue)
Definition: scheduler.cpp:608
bool checkLeavesCompletion(MemObjRecord *Record)
Definition: scheduler.cpp:29
static MemObjRecord * getMemObjRecord(const Requirement *const Req)
Definition: scheduler.cpp:428
void releaseHostAccessor(Requirement *Req)
Unblocks operations with the memory object.
Definition: scheduler.cpp:346
void waitForRecordToFinish(MemObjRecord *Record, ReadLockT &GraphReadLock)
This function waits on all of the graph leaves which somehow use the memory object which is represent...
Definition: scheduler.cpp:45
static Scheduler & getInstance()
Definition: scheduler.cpp:261
void cleanUpCmdFusion(sycl::detail::queue_impl *Queue)
Definition: scheduler.cpp:614
EventImplPtr addCG(std::unique_ptr< detail::CG > CommandGroup, const QueueImplPtr &Queue, sycl::detail::pi::PiExtCommandBuffer CommandBuffer=nullptr, const std::vector< sycl::detail::pi::PiExtSyncPoint > &Dependencies={})
Registers a command group, and adds it to the dependency graph.
Definition: scheduler.cpp:95
void takeAuxiliaryResources(const EventImplPtr &Dst, const EventImplPtr &Src)
Assign Src's auxiliary resources to Dst.
Definition: scheduler.cpp:574
void cleanupCommands(const std::vector< Command * > &Cmds)
Definition: scheduler.cpp:432
void NotifyHostTaskCompletion(Command *Cmd)
Definition: scheduler.cpp:472
WriteLockT acquireWriteLock()
Provides exclusive access to std::shared_timed_mutex object with deadlock avoidance.
Definition: scheduler.hpp:501
bool removeMemoryObject(detail::SYCLMemObjI *MemObj, bool StrictLock=true)
Removes buffer from the graph.
Definition: scheduler.cpp:279
WriteLockT acquireFusionWriteLock()
Provides exclusive access to std::shared_timed_mutex object with deadlock avoidance to the Fusion map...
Definition: scheduler.hpp:520
std::vector< Command * > MDeferredCleanupCommands
Definition: scheduler.hpp:952
void deferMemObjRelease(const std::shared_ptr< detail::SYCLMemObjI > &MemObj)
Definition: scheduler.cpp:501
void releaseResources(BlockingT Blocking=BlockingT::BLOCKING)
Definition: scheduler.cpp:409
static std::shared_ptr< device_impl > getHostDeviceImpl()
Gets the single instance of the Host Device.
The SYCL device class encapsulates a single SYCL device on which kernels may be executed.
Definition: device.hpp:64
Class representing the implementation of command_graph<executable>.
Objects of the property_list class are containers for the SYCL properties.
Encapsulates a single SYCL queue which schedules kernels on a SYCL device.
Definition: queue.hpp:111
__SYCL_EXTERN_STREAM_ATTRS ostream cerr
Linked to standard error (unbuffered)
std::hash< std::shared_ptr< detail::queue_impl > >::result_type QueueIdT
Definition: scheduler.hpp:191
static void registerAuxiliaryResourcesNoLock(std::unordered_map< EventImplPtr, std::vector< std::shared_ptr< const void >>> &AuxiliaryResources, const EventImplPtr &Event, std::vector< std::shared_ptr< const void >> &&Resources)
Definition: scheduler.cpp:562
decltype(Obj::impl) getSyclObjImpl(const Obj &SyclObject)
Definition: impl_utils.hpp:30
std::shared_ptr< detail::stream_impl > StreamImplPtr
Definition: commands.hpp:50
std::shared_ptr< event_impl > EventImplPtr
Definition: cg.hpp:43
std::shared_ptr< sycl::detail::queue_impl > QueueImplPtr
Definition: event_impl.hpp:34
Definition: access.hpp:18
Result of command enqueueing.
Definition: commands.hpp:62
ResultT MResult
Indicates the result of enqueueing.
Definition: commands.hpp:73
Memory Object Record.
Definition: scheduler.hpp:202
std::vector< AllocaCommandBase * > MAllocaCommands
Definition: scheduler.hpp:209