Skip to content

Commit 75c691c

Browse files
committed
add order manager example
1 parent 24ade80 commit 75c691c

1 file changed

Lines changed: 199 additions & 0 deletions

File tree

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Copyright 2020-2025 Intel Corporation
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Demonstrates using the SequentialOrderManager with USM and the Python threading
17+
module.
18+
"""
19+
20+
import concurrent.futures
21+
22+
import numpy as np
23+
24+
import dpctl
25+
import dpctl.memory as dpmem
26+
from dpctl.utils import SequentialOrderManager
27+
28+
29+
def _memset_async(q, usm_buf, fill_byte, om):
30+
"""
31+
Fill ``usm_buf`` with ``fill_byte`` asynchronously and track in ``om``.
32+
33+
``_submit_keep_args_alive`` prevents the buffer and the target from being
34+
garbage-collected while the device is still reading/writing.
35+
"""
36+
n = usm_buf.nbytes
37+
data = np.full(n, fill_byte, dtype=np.uint8)
38+
39+
comp_ev = q.memcpy_async(usm_buf, data, n, dEvents=om.submitted_events)
40+
# keep Python objects alive until the copy finishes
41+
ht_ev = q._submit_keep_args_alive((usm_buf, data), [comp_ev])
42+
om.add_event_pair(ht_ev, comp_ev)
43+
return comp_ev
44+
45+
46+
def independent_threads():
47+
"""
48+
Each thread fills and reads back its own USM buffer independently, with
49+
each thread operating on separate memory, with separate order managers.
50+
"""
51+
nbytes = 1024
52+
q = dpctl.SyclQueue()
53+
n_threads = 2
54+
55+
def worker(thread_id):
56+
om = SequentialOrderManager[q]
57+
buf = dpmem.MemoryUSMShared(nbytes, queue=q)
58+
fill_value = thread_id + 1
59+
60+
_memset_async(q, buf, fill_value, om)
61+
62+
om.wait()
63+
64+
arr = np.frombuffer(buf.copy_to_host(), dtype=np.uint8)
65+
return int(arr[0])
66+
67+
with concurrent.futures.ThreadPoolExecutor(
68+
max_workers=n_threads
69+
) as executor:
70+
results = list(executor.map(worker, range(n_threads)))
71+
72+
assert results == [1, 2], f"Unexpected results: {results}"
73+
print(f"independent_threads got what we expected: {results}")
74+
75+
76+
def fork_join():
77+
"""
78+
The main thread allocates, then each child thread creates a device buffer,
79+
then fills it via ``memcpy_async``, and waits. After ``thread.join()``,
80+
the main thread copies each child's buffer into a single shared result
81+
buffer and verifies the contents.
82+
"""
83+
nbytes = 1024
84+
q = dpctl.SyclQueue()
85+
n_threads = 2
86+
chunk = nbytes // n_threads
87+
88+
def child_fill(thread_id):
89+
child_om = SequentialOrderManager[q]
90+
fill_val = (thread_id + 1) * 10
91+
92+
host_data = np.full(chunk, fill_val, dtype=np.uint8)
93+
usm_data = dpmem.MemoryUSMShared(chunk, queue=q)
94+
usm_data.copy_from_host(host_data)
95+
96+
usm_chunk = dpmem.MemoryUSMDevice(chunk, queue=q)
97+
98+
comp_ev = q.memcpy_async(
99+
usm_chunk,
100+
usm_data,
101+
chunk,
102+
dEvents=child_om.submitted_events,
103+
)
104+
ht_ev = q._submit_keep_args_alive((usm_chunk, usm_data), [comp_ev])
105+
child_om.add_event_pair(ht_ev, comp_ev)
106+
107+
child_om.wait()
108+
return usm_chunk
109+
110+
with concurrent.futures.ThreadPoolExecutor(
111+
max_workers=n_threads
112+
) as executor:
113+
child_bufs = list(executor.map(child_fill, range(n_threads)))
114+
115+
main_om = SequentialOrderManager[q]
116+
result_parts = []
117+
118+
for child_buf in child_bufs:
119+
part = dpmem.MemoryUSMShared(chunk, queue=q)
120+
comp_ev = q.memcpy_async(
121+
part, child_buf, chunk, dEvents=main_om.submitted_events
122+
)
123+
ht_ev = q._submit_keep_args_alive((part, child_buf), [comp_ev])
124+
main_om.add_event_pair(ht_ev, comp_ev)
125+
result_parts.append(part)
126+
main_om.wait()
127+
128+
arr = np.concatenate(
129+
[np.frombuffer(p.copy_to_host(), dtype=np.uint8) for p in result_parts]
130+
)
131+
assert np.all(
132+
arr[0:chunk] == 10
133+
), f"Expected all values in first chunk to be 10, got {arr[0:chunk]}"
134+
assert np.all(
135+
arr[chunk:] == 20
136+
), f"Expected all values in second chunk to be 20, got {arr[chunk:]}"
137+
print(f"fork-join got what we expected: [{arr[0]}, ..., {arr[chunk]}, ...]")
138+
139+
140+
def explicit_event_passing():
141+
"""
142+
Each child thread performs an async fill and gives its tracked events.
143+
The main thread adds those events to its own order manager so that a
144+
subsequent ``memcpy_async`` depends on the child work completing first.
145+
"""
146+
nbytes = 1024
147+
q = dpctl.SyclQueue()
148+
n_threads = 2
149+
150+
def child_prepare(thread_id):
151+
child_om = SequentialOrderManager[q]
152+
buf = dpmem.MemoryUSMShared(nbytes, queue=q)
153+
fill_val = (thread_id + 1) * 42 # 42 or 84
154+
155+
_memset_async(q, buf, fill_val, child_om)
156+
157+
return buf, child_om.host_task_events, child_om.submitted_events
158+
159+
with concurrent.futures.ThreadPoolExecutor(
160+
max_workers=n_threads
161+
) as executor:
162+
futures_results = list(executor.map(child_prepare, range(n_threads)))
163+
164+
child_buffers = []
165+
collected_ht_events = []
166+
collected_comp_events = []
167+
for buf, ht_events, comp_events in futures_results:
168+
child_buffers.append(buf)
169+
collected_ht_events.extend(ht_events)
170+
collected_comp_events.extend(comp_events)
171+
172+
main_om = SequentialOrderManager[q]
173+
main_om.add_event_pair(collected_ht_events, collected_comp_events)
174+
175+
results = []
176+
for buf in child_buffers:
177+
out = dpmem.MemoryUSMShared(nbytes, queue=q)
178+
comp_ev = q.memcpy_async(
179+
out, buf, nbytes, dEvents=main_om.submitted_events
180+
)
181+
ht_ev = q._submit_keep_args_alive((out, buf), [comp_ev])
182+
main_om.add_event_pair(ht_ev, comp_ev)
183+
results.append(out)
184+
185+
main_om.wait()
186+
187+
values = [
188+
int(np.frombuffer(r.copy_to_host(), dtype=np.uint8)[0]) for r in results
189+
]
190+
assert values == [42, 84], f"Unexpected: {values}"
191+
print(f"explicit_event_passing got what we expected: {values}")
192+
193+
194+
if __name__ == "__main__":
195+
import _runner as runner
196+
197+
runner.run_examples(
198+
"Examples for working with SequentialOrderManager in dpctl.", globals()
199+
)

0 commit comments

Comments
 (0)