DPC++ Runtime
Runtime libraries for oneAPI DPC++
queue_impl.cpp
Go to the documentation of this file.
1 //==------------------ queue_impl.cpp - SYCL queue -------------------------==//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include <detail/event_impl.hpp>
11 #include <detail/queue_impl.hpp>
12 #include <sycl/context.hpp>
13 #include <sycl/detail/pi.hpp>
14 #include <sycl/device.hpp>
15 
16 #include <cstring>
17 #include <utility>
18 
19 #ifdef XPTI_ENABLE_INSTRUMENTATION
20 #include "xpti/xpti_trace_framework.hpp"
21 #include <detail/xpti_registry.hpp>
22 #include <sstream>
23 #endif
24 
25 namespace sycl {
27 namespace detail {
28 template <>
29 uint32_t queue_impl::get_info<info::queue::reference_count>() const {
30  RT::PiResult result = PI_SUCCESS;
31  if (!is_host())
33  MQueues[0], PI_QUEUE_INFO_REFERENCE_COUNT, sizeof(result), &result,
34  nullptr);
35  return result;
36 }
37 
38 template <> context queue_impl::get_info<info::queue::context>() const {
39  return get_context();
40 }
41 
42 template <> device queue_impl::get_info<info::queue::device>() const {
43  return get_device();
44 }
45 
46 static event
47 prepareUSMEvent(const std::shared_ptr<detail::queue_impl> &QueueImpl,
48  RT::PiEvent NativeEvent) {
49  auto EventImpl = std::make_shared<detail::event_impl>(QueueImpl);
50  EventImpl->getHandleRef() = NativeEvent;
51  EventImpl->setContextImpl(detail::getSyclObjImpl(QueueImpl->get_context()));
52  EventImpl->setStateIncomplete();
53  return detail::createSyclObjFromImpl<event>(EventImpl);
54 }
55 
56 static event createDiscardedEvent() {
57  EventImplPtr EventImpl =
58  std::make_shared<event_impl>(event_impl::HES_Discarded);
59  return createSyclObjFromImpl<event>(EventImpl);
60 }
61 
62 event queue_impl::memset(const std::shared_ptr<detail::queue_impl> &Self,
63  void *Ptr, int Value, size_t Count,
64  const std::vector<event> &DepEvents) {
65  if (MHasDiscardEventsSupport) {
66  MemoryManager::fill_usm(Ptr, Self, Count, Value,
67  getOrWaitEvents(DepEvents, MContext), nullptr);
68  return createDiscardedEvent();
69  }
70  event ResEvent;
71  {
72  // We need to submit command and update the last event under same lock if we
73  // have in-order queue.
74  auto ScopeLock = isInOrder() ? std::unique_lock<std::mutex>(MLastEventMtx)
75  : std::unique_lock<std::mutex>();
76  // If the last submitted command in the in-order queue is host_task then
77  // wait for it before submitting usm command.
78  if (isInOrder() && (MLastCGType == CG::CGTYPE::CodeplayHostTask ||
79  MLastCGType == CG::CGTYPE::CodeplayInteropTask))
80  MLastEvent.wait();
81 
82  RT::PiEvent NativeEvent{};
83  MemoryManager::fill_usm(Ptr, Self, Count, Value,
84  getOrWaitEvents(DepEvents, MContext), &NativeEvent);
85 
86  if (MContext->is_host())
87  return MDiscardEvents ? createDiscardedEvent() : event();
88 
89  ResEvent = prepareUSMEvent(Self, NativeEvent);
90  if (isInOrder()) {
91  MLastEvent = ResEvent;
92  // We don't create a command group for usm commands, so set it to None.
93  // This variable is used to perform explicit dependency management when
94  // required.
95  MLastCGType = CG::CGTYPE::None;
96  }
97  }
98  // Track only if we won't be able to handle it with piQueueFinish.
99  if (!MSupportOOO)
100  addSharedEvent(ResEvent);
101  return MDiscardEvents ? createDiscardedEvent() : ResEvent;
102 }
103 
104 event queue_impl::memcpy(const std::shared_ptr<detail::queue_impl> &Self,
105  void *Dest, const void *Src, size_t Count,
106  const std::vector<event> &DepEvents) {
107  if (MHasDiscardEventsSupport) {
108  MemoryManager::copy_usm(Src, Self, Count, Dest,
109  getOrWaitEvents(DepEvents, MContext), nullptr);
110  return createDiscardedEvent();
111  }
112  event ResEvent;
113  {
114  // We need to submit command and update the last event under same lock if we
115  // have in-order queue.
116  auto ScopeLock = isInOrder() ? std::unique_lock<std::mutex>(MLastEventMtx)
117  : std::unique_lock<std::mutex>();
118  // If the last submitted command in the in-order queue is host_task then
119  // wait for it before submitting usm command.
120  if (isInOrder() && (MLastCGType == CG::CGTYPE::CodeplayHostTask ||
121  MLastCGType == CG::CGTYPE::CodeplayInteropTask))
122  MLastEvent.wait();
123 
124  RT::PiEvent NativeEvent{};
125  MemoryManager::copy_usm(Src, Self, Count, Dest,
126  getOrWaitEvents(DepEvents, MContext), &NativeEvent);
127 
128  if (MContext->is_host())
129  return MDiscardEvents ? createDiscardedEvent() : event();
130 
131  ResEvent = prepareUSMEvent(Self, NativeEvent);
132  if (isInOrder()) {
133  MLastEvent = ResEvent;
134  // We don't create a command group for usm commands, so set it to None.
135  // This variable is used to perform explicit dependency management when
136  // required.
137  MLastCGType = CG::CGTYPE::None;
138  }
139  }
140  // Track only if we won't be able to handle it with piQueueFinish.
141  if (!MSupportOOO)
142  addSharedEvent(ResEvent);
143  return MDiscardEvents ? createDiscardedEvent() : ResEvent;
144 }
145 
146 event queue_impl::mem_advise(const std::shared_ptr<detail::queue_impl> &Self,
147  const void *Ptr, size_t Length,
148  pi_mem_advice Advice,
149  const std::vector<event> &DepEvents) {
150  if (MHasDiscardEventsSupport) {
151  MemoryManager::advise_usm(Ptr, Self, Length, Advice,
152  getOrWaitEvents(DepEvents, MContext), nullptr);
153  return createDiscardedEvent();
154  }
155  event ResEvent;
156  {
157  // We need to submit command and update the last event under same lock if we
158  // have in-order queue.
159  auto ScopeLock = isInOrder() ? std::unique_lock<std::mutex>(MLastEventMtx)
160  : std::unique_lock<std::mutex>();
161  // If the last submitted command in the in-order queue is host_task then
162  // wait for it before submitting usm command.
163  if (isInOrder() && (MLastCGType == CG::CGTYPE::CodeplayHostTask ||
164  MLastCGType == CG::CGTYPE::CodeplayInteropTask))
165  MLastEvent.wait();
166 
167  RT::PiEvent NativeEvent{};
168  MemoryManager::advise_usm(Ptr, Self, Length, Advice,
169  getOrWaitEvents(DepEvents, MContext),
170  &NativeEvent);
171 
172  if (MContext->is_host())
173  return MDiscardEvents ? createDiscardedEvent() : event();
174 
175  ResEvent = prepareUSMEvent(Self, NativeEvent);
176  if (isInOrder()) {
177  MLastEvent = ResEvent;
178  // We don't create a command group for usm commands, so set it to None.
179  // This variable is used to perform explicit dependency management when
180  // required.
181  MLastCGType = CG::CGTYPE::None;
182  }
183  }
184  // Track only if we won't be able to handle it with piQueueFinish.
185  if (!MSupportOOO)
186  addSharedEvent(ResEvent);
187  return MDiscardEvents ? createDiscardedEvent() : ResEvent;
188 }
189 
190 void queue_impl::addEvent(const event &Event) {
191  EventImplPtr EImpl = getSyclObjImpl(Event);
192  assert(EImpl && "Event implementation is missing");
193  auto *Cmd = static_cast<Command *>(EImpl->getCommand());
194  if (!Cmd) {
195  // if there is no command on the event, we cannot track it with MEventsWeak
196  // as that will leave it with no owner. Track in MEventsShared only if we're
197  // unable to call piQueueFinish during wait.
198  if (is_host() || !MSupportOOO)
199  addSharedEvent(Event);
200  }
201  // As long as the queue supports piQueueFinish we only need to store events
202  // with command nodes in the following cases:
203  // 1. Unenqueued commands, since they aren't covered by piQueueFinish.
204  // 2. Kernels with streams, since they are not supported by post enqueue
205  // cleanup.
206  // 3. Host tasks, for both reasons.
207  else if (is_host() || !MSupportOOO || EImpl->getHandleRef() == nullptr ||
208  EImpl->needsCleanupAfterWait()) {
209  std::weak_ptr<event_impl> EventWeakPtr{EImpl};
210  std::lock_guard<std::mutex> Lock{MMutex};
211  MEventsWeak.push_back(std::move(EventWeakPtr));
212  }
213 }
214 
218 void queue_impl::addSharedEvent(const event &Event) {
219  assert(is_host() || !MSupportOOO);
220  std::lock_guard<std::mutex> Lock(MMutex);
221  // Events stored in MEventsShared are not released anywhere else aside from
222  // calls to queue::wait/wait_and_throw, which a user application might not
223  // make, and ~queue_impl(). If the number of events grows large enough,
224  // there's a good chance that most of them are already completed and ownership
225  // of them can be released.
226  const size_t EventThreshold = 128;
227  if (MEventsShared.size() >= EventThreshold) {
228  // Generally, the vector is ordered so that the oldest events are in the
229  // front and the newer events are in the end. So, search to find the first
230  // event that isn't yet complete. All the events prior to that can be
231  // erased. This could leave some few events further on that have completed
232  // not yet erased, but that is OK. This cleanup doesn't have to be perfect.
233  // This also keeps the algorithm linear rather than quadratic because it
234  // doesn't continually recheck things towards the back of the list that
235  // really haven't had time to complete.
236  MEventsShared.erase(
237  MEventsShared.begin(),
238  std::find_if(
239  MEventsShared.begin(), MEventsShared.end(), [](const event &E) {
240  return E.get_info<info::event::command_execution_status>() !=
241  info::event_command_status::complete;
242  }));
243  }
244  MEventsShared.push_back(Event);
245 }
246 
247 void *queue_impl::instrumentationProlog(const detail::code_location &CodeLoc,
248  std::string &Name, int32_t StreamID,
249  uint64_t &IId) {
250  void *TraceEvent = nullptr;
251  (void)CodeLoc;
252  (void)Name;
253  (void)StreamID;
254  (void)IId;
255 #ifdef XPTI_ENABLE_INSTRUMENTATION
256  xpti::trace_event_data_t *WaitEvent = nullptr;
257  if (!xptiTraceEnabled())
258  return TraceEvent;
259 
260  xpti::payload_t Payload;
261  bool HasSourceInfo = false;
262  // We try to create a unique string for the wait() call by combining it with
263  // the queue address
264  xpti::utils::StringHelper NG;
265  Name = NG.nameWithAddress<queue_impl *>("queue.wait", this);
266 
267  if (CodeLoc.fileName()) {
268  // We have source code location information
269  Payload =
270  xpti::payload_t(Name.c_str(), CodeLoc.fileName(), CodeLoc.lineNumber(),
271  CodeLoc.columnNumber(), (void *)this);
272  HasSourceInfo = true;
273  } else {
274  // We have no location information, so we'll use the address of the queue
275  Payload = xpti::payload_t(Name.c_str(), (void *)this);
276  }
277  // wait() calls could be at different user-code locations; We create a new
278  // event based on the code location info and if this has been seen before, a
279  // previously created event will be returned.
280  uint64_t QWaitInstanceNo = 0;
281  WaitEvent = xptiMakeEvent(Name.c_str(), &Payload, xpti::trace_graph_event,
282  xpti_at::active, &QWaitInstanceNo);
283  IId = QWaitInstanceNo;
284  if (WaitEvent) {
285  device D = get_device();
286  std::string DevStr;
287  if (getSyclObjImpl(D)->is_host())
288  DevStr = "HOST";
289  else if (D.is_cpu())
290  DevStr = "CPU";
291  else if (D.is_gpu())
292  DevStr = "GPU";
293  else if (D.is_accelerator())
294  DevStr = "ACCELERATOR";
295  else
296  DevStr = "UNKNOWN";
297  xpti::addMetadata(WaitEvent, "sycl_device", DevStr);
298  if (HasSourceInfo) {
299  xpti::addMetadata(WaitEvent, "sym_function_name", CodeLoc.functionName());
300  xpti::addMetadata(WaitEvent, "sym_source_file_name", CodeLoc.fileName());
301  xpti::addMetadata(WaitEvent, "sym_line_no",
302  static_cast<int32_t>((CodeLoc.lineNumber())));
303  xpti::addMetadata(WaitEvent, "sym_column_no",
304  static_cast<int32_t>((CodeLoc.columnNumber())));
305  }
306  xptiNotifySubscribers(StreamID, xpti::trace_wait_begin, nullptr, WaitEvent,
307  QWaitInstanceNo,
308  static_cast<const void *>(Name.c_str()));
309  TraceEvent = (void *)WaitEvent;
310  }
311 #endif
312  return TraceEvent;
313 }
314 
315 void queue_impl::instrumentationEpilog(void *TelemetryEvent, std::string &Name,
316  int32_t StreamID, uint64_t IId) {
317  (void)TelemetryEvent;
318  (void)Name;
319  (void)StreamID;
320  (void)IId;
321 #ifdef XPTI_ENABLE_INSTRUMENTATION
322  if (!(xptiTraceEnabled() && TelemetryEvent))
323  return;
324  // Close the wait() scope
325  xpti::trace_event_data_t *TraceEvent =
326  (xpti::trace_event_data_t *)TelemetryEvent;
327  xptiNotifySubscribers(StreamID, xpti::trace_wait_end, nullptr, TraceEvent,
328  IId, static_cast<const void *>(Name.c_str()));
329 #endif
330 }
331 
332 void queue_impl::wait(const detail::code_location &CodeLoc) {
333  (void)CodeLoc;
334 #ifdef XPTI_ENABLE_INSTRUMENTATION
335  void *TelemetryEvent = nullptr;
336  uint64_t IId;
337  std::string Name;
338  int32_t StreamID = xptiRegisterStream(SYCL_STREAM_NAME);
339  TelemetryEvent = instrumentationProlog(CodeLoc, Name, StreamID, IId);
340 #endif
341 
342  std::vector<std::weak_ptr<event_impl>> WeakEvents;
343  std::vector<event> SharedEvents;
344  {
345  std::lock_guard<std::mutex> Lock(MMutex);
346  WeakEvents.swap(MEventsWeak);
347  SharedEvents.swap(MEventsShared);
348  }
349  // If the queue is either a host one or does not support OOO (and we use
350  // multiple in-order queues as a result of that), wait for each event
351  // directly. Otherwise, only wait for unenqueued or host task events, starting
352  // from the latest submitted task in order to minimize total amount of calls,
353  // then handle the rest with piQueueFinish.
354  const bool SupportsPiFinish = !is_host() && MSupportOOO;
355  for (auto EventImplWeakPtrIt = WeakEvents.rbegin();
356  EventImplWeakPtrIt != WeakEvents.rend(); ++EventImplWeakPtrIt) {
357  if (std::shared_ptr<event_impl> EventImplSharedPtr =
358  EventImplWeakPtrIt->lock()) {
359  // A nullptr PI event indicates that piQueueFinish will not cover it,
360  // either because it's a host task event or an unenqueued one.
361  if (!SupportsPiFinish || nullptr == EventImplSharedPtr->getHandleRef()) {
362  EventImplSharedPtr->wait(EventImplSharedPtr);
363  }
364  }
365  }
366  if (SupportsPiFinish) {
367  const detail::plugin &Plugin = getPlugin();
368  Plugin.call<detail::PiApiKind::piQueueFinish>(getHandleRef());
369  for (std::weak_ptr<event_impl> &EventImplWeakPtr : WeakEvents)
370  if (std::shared_ptr<event_impl> EventImplSharedPtr =
371  EventImplWeakPtr.lock())
372  if (EventImplSharedPtr->needsCleanupAfterWait())
373  EventImplSharedPtr->cleanupCommand(EventImplSharedPtr);
374  assert(SharedEvents.empty() && "Queues that support calling piQueueFinish "
375  "shouldn't have shared events");
376  } else {
377  for (event &Event : SharedEvents)
378  Event.wait();
379  }
380 
381  std::vector<EventImplPtr> StreamsServiceEvents;
382  {
383  std::lock_guard<std::mutex> Lock(MMutex);
384  StreamsServiceEvents.swap(MStreamsServiceEvents);
385  }
386  for (const EventImplPtr &Event : StreamsServiceEvents)
387  Event->wait(Event);
388 
389 #ifdef XPTI_ENABLE_INSTRUMENTATION
390  instrumentationEpilog(TelemetryEvent, Name, StreamID, IId);
391 #endif
392 }
393 
394 pi_native_handle queue_impl::getNative() const {
395  const detail::plugin &Plugin = getPlugin();
396  if (Plugin.getBackend() == backend::opencl)
397  Plugin.call<PiApiKind::piQueueRetain>(MQueues[0]);
398  pi_native_handle Handle{};
399  Plugin.call<PiApiKind::piextQueueGetNativeHandle>(MQueues[0], &Handle);
400  return Handle;
401 }
402 
403 bool queue_impl::ext_oneapi_empty() const {
404  // If we have in-order queue where events are not discarded then just check
405  // the status of the last event.
406  if (isInOrder() && !MDiscardEvents) {
407  std::lock_guard<std::mutex> Lock(MLastEventMtx);
408  return MLastEvent.get_info<info::event::command_execution_status>() ==
409  info::event_command_status::complete;
410  }
411 
412  // Check the status of the backend queue if this is not a host queue.
413  if (!is_host()) {
414  pi_bool IsReady = false;
416  MQueues[0], PI_EXT_ONEAPI_QUEUE_INFO_EMPTY, sizeof(pi_bool), &IsReady,
417  nullptr);
418  if (!IsReady)
419  return false;
420  }
421 
422  // We may have events like host tasks which are not submitted to the backend
423  // queue so we need to get their status separately.
424  std::lock_guard<std::mutex> Lock(MMutex);
425  for (event Event : MEventsShared)
426  if (Event.get_info<info::event::command_execution_status>() !=
427  info::event_command_status::complete)
428  return false;
429 
430  for (auto EventImplWeakPtrIt = MEventsWeak.begin();
431  EventImplWeakPtrIt != MEventsWeak.end(); ++EventImplWeakPtrIt)
432  if (std::shared_ptr<event_impl> EventImplSharedPtr =
433  EventImplWeakPtrIt->lock())
434  if (EventImplSharedPtr->is_host() &&
435  EventImplSharedPtr
436  ->get_info<info::event::command_execution_status>() !=
437  info::event_command_status::complete)
438  return false;
439 
440  // If we didn't exit early above then it means that all events in the queue
441  // are completed.
442  return true;
443 }
444 
445 } // namespace detail
446 } // __SYCL_INLINE_VER_NAMESPACE(_V1)
447 } // namespace sycl
The context class represents a SYCL context on which kernel functions may be executed.
Definition: context.hpp:41
The Command class represents some action that needs to be performed on one or more memory objects.
Definition: commands.hpp:95
The plugin class provides a unified interface to the underlying low-level runtimes for the device-agn...
Definition: plugin.hpp:90
void call(ArgsT... Args) const
Calls the API, traces the call, checks the result.
Definition: plugin.hpp:217
backend getBackend(void) const
Definition: plugin.hpp:229
The SYCL device class encapsulates a single SYCL device on which kernels may be executed.
Definition: device.hpp:49
bool is_accelerator() const
Check if device is an accelerator device.
Definition: device.cpp:87
bool is_gpu() const
Check if device is a GPU device.
Definition: device.cpp:85
bool is_cpu() const
Check if device is a CPU device.
Definition: device.cpp:83
An event object can be used to synchronize memory transfers, enqueues of kernels and signaling barrie...
Definition: event.hpp:40
#define __SYCL_INLINE_VER_NAMESPACE(X)
::pi_event PiEvent
Definition: pi.hpp:121
const plugin & getPlugin()
Definition: pi.cpp:509
::pi_result PiResult
Definition: pi.hpp:108
std::vector< RT::PiEvent > getOrWaitEvents(std::vector< sycl::event > DepEvents, std::shared_ptr< sycl::detail::context_impl > Context)
constexpr const char * SYCL_STREAM_NAME
static event createDiscardedEvent()
Definition: queue_impl.cpp:56
decltype(Obj::impl) getSyclObjImpl(const Obj &SyclObject)
Definition: common.hpp:248
std::shared_ptr< event_impl > EventImplPtr
Definition: cg.hpp:42
static event prepareUSMEvent(const std::shared_ptr< detail::queue_impl > &QueueImpl, RT::PiEvent NativeEvent)
Definition: queue_impl.cpp:47
void memcpy(void *Dst, const void *Src, std::size_t Size)
---— Error handling, matching OpenCL plugin semantics.
Definition: access.hpp:14
pi_result piQueueFinish(pi_queue command_queue)
uintptr_t pi_native_handle
Definition: pi.h:111
pi_uint32 pi_bool
Definition: pi.h:109
@ PI_QUEUE_INFO_REFERENCE_COUNT
Definition: pi.h:335
@ PI_EXT_ONEAPI_QUEUE_INFO_EMPTY
Definition: pi.h:339
_pi_mem_advice
Definition: pi.h:429
pi_result piextQueueGetNativeHandle(pi_queue queue, pi_native_handle *nativeHandle)
Gets the native handle of a PI queue object.
pi_result piQueueGetInfo(pi_queue command_queue, pi_queue_info param_name, size_t param_value_size, void *param_value, size_t *param_value_size_ret)
pi_result piQueueRetain(pi_queue command_queue)
C++ wrapper of extern "C" PI interfaces.
constexpr unsigned long columnNumber() const noexcept
Definition: common.hpp:88
constexpr const char * fileName() const noexcept
Definition: common.hpp:89
constexpr const char * functionName() const noexcept
Definition: common.hpp:90
constexpr unsigned long lineNumber() const noexcept
Definition: common.hpp:87