Line data Source code
1 1 : /**
2 : * @file gvt/gvt.c
3 : *
4 : * @brief Global Virtual Time
5 : *
6 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
7 : * SPDX-License-Identifier: GPL-3.0-only
8 : */
9 : #include <gvt/gvt.h>
10 :
11 : #include <arch/timer.h>
12 : #include <core/init.h>
13 : #include <datatypes/msg_queue.h>
14 : #include <distributed/mpi.h>
15 : #include <log/stats.h>
16 :
17 : #include <memory.h>
18 : #include <stdatomic.h>
19 :
20 : /// A thread phase during the gvt algorithm computation
21 0 : enum thread_phase_t {
22 : tphase_rdy = 0,
23 : tphase_A,
24 : tphase_B,
25 : #ifdef ROOTSIM_MPI
26 : tphase_B_reduce,
27 : tphase_C,
28 : tphase_C_reduce,
29 : tphase_B_rdone,
30 : tphase_C_rdone,
31 : tphase_B_wait_msgs,
32 : #endif
33 : tphase_wait
34 : };
35 :
36 0 : static __thread enum thread_phase_t thread_phase = tphase_rdy;
37 :
38 0 : static timer_uint last_gvt;
39 0 : static simtime_t reducing_p[MAX_THREADS];
40 0 : static __thread simtime_t current_gvt;
41 :
42 : #ifdef ROOTSIM_MPI
43 :
44 0 : static atomic_uint sent_tot[MAX_NODES];
45 0 : static _Atomic(nid_t) missing_nodes;
46 :
47 : __thread bool gvt_phase_green = false;
48 : __thread unsigned remote_msg_sent[MAX_NODES] = {0};
49 0 : atomic_int remote_msg_received[2];
50 :
51 : #endif
52 :
53 : /**
54 : * @brief Initializes the gvt module in the node
55 : */
56 1 : void gvt_global_init(void)
57 : {
58 : last_gvt = timer_new();
59 : }
60 :
61 0 : static inline simtime_t gvt_node_reduce(void)
62 : {
63 : unsigned i = n_threads - 1;
64 : simtime_t candidate = reducing_p[i];
65 : while(i--){
66 : candidate = min(reducing_p[i], candidate);
67 : }
68 : return candidate;
69 : }
70 :
71 : #ifndef ROOTSIM_MPI
72 :
73 : simtime_t gvt_phase_run(void)
74 : {
75 : static atomic_uint c_a = 0;
76 : static atomic_uint c_b = 0;
77 :
78 : if(likely(thread_phase == tphase_rdy)) {
79 : if (rid) {
80 : if (likely(!atomic_load_explicit(&c_a,
81 : memory_order_relaxed)))
82 : return 0;
83 : } else {
84 : if (likely(global_config.gvt_period >
85 : timer_value(last_gvt) || atomic_load_explicit(
86 : &c_b, memory_order_relaxed)))
87 : return 0;
88 : }
89 : stats_time_start(STATS_GVT);
90 : current_gvt = SIMTIME_MAX;
91 : thread_phase = tphase_A;
92 : atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
93 : return 0;
94 : }
95 :
96 : switch (thread_phase) {
97 : default:
98 : case tphase_rdy:
99 : __builtin_unreachable();
100 : /* fallthrough */
101 : case tphase_A:
102 : if (atomic_load_explicit(&c_a, memory_order_relaxed)
103 : == n_threads) {
104 : simtime_t this_t = msg_queue_time_peek();
105 : reducing_p[rid] = min(current_gvt, this_t);
106 : thread_phase = tphase_B;
107 : atomic_fetch_add_explicit(&c_b, 1U,
108 : memory_order_release);
109 : }
110 : break;
111 : case tphase_B:
112 : if (atomic_load_explicit(&c_b, memory_order_acquire)
113 : == n_threads) {
114 : thread_phase = tphase_wait;
115 : atomic_fetch_sub_explicit(&c_a, 1U,
116 : memory_order_relaxed);
117 : if(!rid){
118 : last_gvt = timer_new();
119 : }
120 : stats_time_take(STATS_GVT);
121 : return gvt_node_reduce();
122 : }
123 : break;
124 : case tphase_wait:
125 : if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
126 : atomic_fetch_sub_explicit(&c_b, 1U,
127 : memory_order_relaxed);
128 : thread_phase = tphase_rdy;
129 : }
130 : break;
131 : }
132 :
133 : return 0;
134 : }
135 :
136 : #else
137 :
138 0 : static atomic_uint c_a = 0;
139 :
140 : /**
141 : * @brief Handles a MSG_CTRL_GVT_START control message
142 : *
143 : * Called by the MPI layer in response to a MSG_CTRL_GVT_START control message
144 : */
145 1 : void gvt_on_start_ctrl_msg(void)
146 : {
147 : stats_time_start(STATS_GVT);
148 : current_gvt = SIMTIME_MAX;
149 : thread_phase = tphase_A;
150 : atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
151 : }
152 :
153 : /**
154 : * @brief Handles a MSG_CTRL_GVT_DONE control message
155 : *
156 : * Called by the MPI layer in response to a MSG_CTRL_GVT_DONE control message
157 : */
158 1 : void gvt_on_done_ctrl_msg(void)
159 : {
160 : atomic_fetch_sub_explicit(&missing_nodes, 1U, memory_order_relaxed);
161 : }
162 :
163 0 : simtime_t gvt_phase_run(void)
164 : {
165 : static atomic_uint c_b = 0;
166 : static unsigned remote_msg_to_receive;
167 : static __thread bool red_round = false;
168 :
169 : if(likely(thread_phase == tphase_rdy)) {
170 : if (nid || rid) {
171 : if (likely(!atomic_load_explicit(&c_a,
172 : memory_order_relaxed)))
173 : return 0;
174 : } else {
175 : if (likely(global_config.gvt_period >
176 : timer_value(last_gvt) || atomic_load_explicit(
177 : &missing_nodes, memory_order_relaxed)))
178 : return 0;
179 :
180 : atomic_store_explicit(&missing_nodes, n_nodes,
181 : memory_order_relaxed);
182 : mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
183 : }
184 : stats_time_start(STATS_GVT);
185 : current_gvt = SIMTIME_MAX;
186 : thread_phase = tphase_A;
187 : atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
188 : return 0;
189 : }
190 :
191 : switch (thread_phase) {
192 : default:
193 : case tphase_rdy:
194 : __builtin_unreachable();
195 : /* fallthrough */
196 : case tphase_A:
197 : if (atomic_load_explicit(&c_a, memory_order_relaxed)
198 : == n_threads) {
199 : simtime_t this_t = msg_queue_time_peek();
200 : // this leverages the enum layout
201 : thread_phase = tphase_B + (2 * red_round) + !rid;
202 :
203 : red_round = !red_round;
204 : if (red_round) {
205 : for(nid_t i = 0; i < n_nodes; ++i)
206 : atomic_fetch_add_explicit(&sent_tot[i],
207 : remote_msg_sent[i],
208 : memory_order_relaxed);
209 : // release renders visible the sum aggregation
210 : atomic_fetch_add_explicit(&c_b, 1U,
211 : memory_order_release);
212 :
213 : memset(remote_msg_sent, 0,
214 : sizeof(unsigned) * n_nodes);
215 : gvt_phase_green = !gvt_phase_green;
216 : current_gvt = min(current_gvt, this_t);
217 : } else {
218 : reducing_p[rid] = min(current_gvt, this_t);
219 : // release renders visible the thread local gvt
220 : atomic_fetch_add_explicit(&c_b, 1U,
221 : memory_order_release);
222 : }
223 : }
224 : break;
225 : case tphase_B:
226 : // acquire is needed to sync with the remote_msg_received sum
227 : // and the c_b counter
228 : if (!atomic_load_explicit(&c_a, memory_order_acquire))
229 : thread_phase = tphase_B_wait_msgs;
230 : break;
231 : case tphase_B_reduce:
232 : // acquire is needed to sync with all threads aggregated values
233 : if (atomic_load_explicit(&c_b, memory_order_acquire) ==
234 : n_threads) {
235 : mpi_reduce_sum_scatter((unsigned *)sent_tot,
236 : &remote_msg_to_receive);
237 : thread_phase = tphase_B_rdone;
238 : }
239 : break;
240 : case tphase_B_rdone:
241 : if (mpi_reduce_sum_scatter_done()) {
242 : atomic_fetch_sub_explicit(remote_msg_received +
243 : !gvt_phase_green, remote_msg_to_receive,
244 : memory_order_relaxed);
245 : // release renders visible the remote_msg_received sum
246 : // and the c_b counter
247 : atomic_store_explicit(&c_a, 0, memory_order_release);
248 : memset(sent_tot, 0, sizeof(atomic_uint) * n_nodes);
249 : thread_phase = tphase_B_wait_msgs;
250 : }
251 : break;
252 : case tphase_C:
253 : // no sync needed
254 : if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
255 : atomic_fetch_sub_explicit(&c_b, 1U,
256 : memory_order_relaxed);
257 : thread_phase = tphase_wait;
258 : return *reducing_p;
259 : }
260 : break;
261 : case tphase_C_reduce:
262 : // acquire is needed to sync with all threads gvt local values
263 : if (atomic_load_explicit(&c_b, memory_order_acquire) ==
264 : n_threads) {
265 : *reducing_p = gvt_node_reduce();
266 : mpi_reduce_min(reducing_p);
267 : thread_phase = tphase_C_rdone;
268 : }
269 : break;
270 : case tphase_C_rdone:
271 : if (mpi_reduce_min_done()) {
272 : atomic_fetch_sub_explicit(&c_b, 1U,
273 : memory_order_relaxed);
274 : // no sync needed
275 : atomic_store_explicit(&c_a, 0, memory_order_relaxed);
276 : last_gvt = timer_new();
277 : thread_phase = tphase_wait;
278 : return *reducing_p;
279 : }
280 : break;
281 : case tphase_B_wait_msgs:
282 : // don't need to sync: we already synced on c_f in tphase_D
283 : if (!atomic_load_explicit(remote_msg_received +
284 : !gvt_phase_green, memory_order_relaxed)) {
285 : thread_phase = tphase_wait;
286 : atomic_fetch_sub_explicit(&c_b, 1U,
287 : memory_order_relaxed);
288 : }
289 : break;
290 : case tphase_wait:
291 : if (!atomic_load_explicit(&c_b, memory_order_relaxed)) {
292 : // this restarts the gvt phases for the second round of node
293 : // local gvt reduction or, if we already did that, it simply
294 : // ends the gvt reduction
295 : thread_phase = red_round;
296 : if (red_round) {
297 : atomic_fetch_add_explicit(&c_a, 1U,
298 : memory_order_relaxed);
299 : } else {
300 : if(!rid)
301 : mpi_control_msg_send_to(
302 : MSG_CTRL_GVT_DONE, 0);
303 : stats_time_take(STATS_GVT);
304 : }
305 : }
306 : break;
307 : }
308 : return 0;
309 : }
310 :
311 : #endif
312 :
313 0 : void gvt_on_msg_process(simtime_t msg_t)
314 : {
315 : if (unlikely(thread_phase && current_gvt > msg_t))
316 : current_gvt = msg_t;
317 : }
|