31 #ifdef XPTI_ENABLE_INSTRUMENTATION
33 std::set<Command *> DepCommands;
35 std::vector<Command *> ToCleanUp;
38 bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
39 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
41 #ifdef XPTI_ENABLE_INSTRUMENTATION
43 DepCommands.insert(Cmd);
45 GraphProcessor::waitForEvent(Cmd->
getEvent(), GraphReadLock, ToCleanUp);
49 bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
50 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
52 #ifdef XPTI_ENABLE_INSTRUMENTATION
53 DepCommands.insert(Cmd);
55 GraphProcessor::waitForEvent(Cmd->
getEvent(), GraphReadLock, ToCleanUp);
60 bool Enqueued = GraphProcessor::enqueueCommand(ReleaseCmd, Res, ToCleanUp);
61 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
63 #ifdef XPTI_ENABLE_INSTRUMENTATION
68 GraphProcessor::waitForEvent(ReleaseCmd->
getEvent(), GraphReadLock,
73 EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
76 const CG::CGTYPE Type = CommandGroup->getType();
77 std::vector<Command *> AuxiliaryCmds;
78 std::vector<StreamImplPtr> Streams;
80 if (Type == CG::Kernel) {
81 Streams = ((
CGExecKernel *)CommandGroup.get())->getStreams();
85 if (Queue->is_host()) {
94 acquireWriteLock(Lock);
99 NewCmd = MGraphBuilder.addCGUpdateHost(std::move(CommandGroup),
100 DefaultHostQueue, AuxiliaryCmds);
102 case CG::CodeplayHostTask:
103 NewCmd = MGraphBuilder.addCG(std::move(CommandGroup), DefaultHostQueue,
107 NewCmd = MGraphBuilder.addCG(std::move(CommandGroup), std::move(Queue),
113 std::vector<Command *> ToCleanUp;
122 auto CleanUp = [&]() {
123 if (NewCmd && (NewCmd->
MDeps.size() == 0 && NewCmd->
MUsers.size() == 0)) {
124 NewEvent->setCommand(
nullptr);
129 for (
Command *Cmd : AuxiliaryCmds) {
130 Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
132 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
133 throw runtime_error(
"Auxiliary enqueue process failed.",
139 std::rethrow_exception(std::current_exception());
147 bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
148 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
154 std::rethrow_exception(std::current_exception());
164 cleanupCommands(ToCleanUp);
174 std::vector<Command *> AuxiliaryCmds;
178 acquireWriteLock(Lock);
179 NewCmd = MGraphBuilder.addCopyBack(Req, AuxiliaryCmds);
186 std::vector<Command *> ToCleanUp;
192 for (
Command *Cmd : AuxiliaryCmds) {
193 Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
194 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
198 Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
199 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
202 NewCmd->
getQueue()->reportAsyncException(std::current_exception());
205 cleanupCommands(ToCleanUp);
210 return GlobalHandler::instance().getScheduler();
217 std::vector<Command *> ToCleanUp;
218 GraphProcessor::waitForEvent(std::move(Event), Lock, ToCleanUp,
220 cleanupCommands(ToCleanUp);
224 std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
229 detail::Scheduler::getInstance().deallocateStreamBuffers(
238 std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
243 std::vector<std::shared_ptr<const void>> AuxResourcesToDeallocate;
248 WriteLockT Lock(MGraphLock, std::try_to_lock);
249 if (Lock.owns_lock()) {
250 auto FinishedCmd =
static_cast<Command *
>(FinishedEvent->getCommand());
254 MGraphBuilder.cleanupFinishedCommands(FinishedCmd, StreamsToDeallocate,
255 AuxResourcesToDeallocate);
266 std::vector<std::shared_ptr<stream_impl>> StreamsToDeallocate;
271 std::vector<std::shared_ptr<const void>> AuxResourcesToDeallocate;
281 Record = MGraphBuilder.getMemObjRecord(MemObj);
286 waitForRecordToFinish(Record, Lock);
291 acquireWriteLock(Lock);
292 MGraphBuilder.decrementLeafCountersForRecord(Record);
293 MGraphBuilder.cleanupCommandsForRecord(Record, StreamsToDeallocate,
294 AuxResourcesToDeallocate);
295 MGraphBuilder.removeRecordForMemObj(MemObj);
302 std::vector<Command *> AuxiliaryCmds;
307 acquireWriteLock(Lock);
309 Command *NewCmd = MGraphBuilder.addHostAccessor(Req, AuxiliaryCmds);
315 std::vector<Command *> ToCleanUp;
321 for (
Command *Cmd : AuxiliaryCmds) {
322 Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
323 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
327 if (
Command *NewCmd =
static_cast<Command *
>(NewCmdEvent->getCommand())) {
328 Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
329 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
334 cleanupCommands(ToCleanUp);
341 std::vector<Command *> ToCleanUp;
345 assert(BlockedCmd &&
"Can't find appropriate command to unblock");
349 enqueueLeavesOfReqUnlocked(Req, ToCleanUp);
351 cleanupCommands(ToCleanUp);
354 void Scheduler::enqueueLeavesOfReqUnlocked(
const Requirement *
const Req,
355 std::vector<Command *> &ToCleanUp) {
360 bool Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
361 if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.
MResult)
371 size_t StreamBufferSize,
372 size_t FlushBufferSize) {
373 std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
374 StreamBuffersPool.insert(
375 {Impl,
new StreamBuffers(StreamBufferSize, FlushBufferSize)});
379 std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
380 delete StreamBuffersPool[Impl];
381 StreamBuffersPool.erase(Impl);
384 Scheduler::Scheduler() {
393 Scheduler::~Scheduler() {
401 std::lock_guard<std::recursive_mutex> lock(StreamBuffersPoolMutex);
402 if (!StreamBuffersPool.empty())
405 "\nWARNING: Some commands may have not finished the execution and "
406 "not all resources were released. Please be sure that all kernels "
407 "have synchronization points.\n\n");
423 while (!Lock.try_lock_for(std::chrono::milliseconds(10))) {
427 std::this_thread::yield();
440 void Scheduler::cleanupCommands(
const std::vector<Command *> &Cmds) {
443 WriteLockT Lock(MGraphLock, std::try_to_lock);
446 if (Lock.owns_lock()) {
448 MGraphBuilder.cleanupCommand(Cmd);
450 std::vector<Command *> DeferredCleanupCommands;
452 std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
453 std::swap(DeferredCleanupCommands, MDeferredCleanupCommands);
455 for (
Command *Cmd : DeferredCleanupCommands) {
456 MGraphBuilder.cleanupCommand(Cmd);
460 std::lock_guard<std::mutex> Lock{MDeferredCleanupMutex};
461 MDeferredCleanupCommands.insert(MDeferredCleanupCommands.end(),
462 Cmds.begin(), Cmds.end());