Line data Source code
1 1 : /**
2 : * @file gvt/gvt.c
3 : *
4 : * @brief Global Virtual Time
5 : *
6 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
7 : * SPDX-License-Identifier: GPL-3.0-only
8 : */
9 : #include <gvt/gvt.h>
10 :
11 : #include <arch/timer.h>
12 : #include <core/init.h>
13 : #include <datatypes/msg_queue.h>
14 : #include <distributed/mpi.h>
15 : #include <log/stats.h>
16 :
17 : #include <memory.h>
18 : #include <stdatomic.h>
19 :
20 : /// A thread phase during the gvt algorithm computation
21 0 : enum thread_phase {
22 : thread_phase_idle = 0,
23 : thread_phase_A,
24 : thread_phase_B,
25 : thread_phase_C,
26 : thread_phase_D
27 : };
28 :
29 0 : enum node_phase {
30 : node_phase_redux_first = 0,
31 : node_sent_reduce,
32 : node_sent_reduce_wait,
33 : node_sent_wait,
34 : node_phase_redux_second,
35 : node_min_reduce,
36 : node_min_reduce_wait,
37 : node_min_wait,
38 : node_done
39 : };
40 :
41 0 : static _Thread_local enum thread_phase thread_phase = thread_phase_idle;
42 :
43 0 : static timer_uint gvt_timer;
44 0 : static simtime_t reducing_p[MAX_THREADS];
45 0 : static _Thread_local simtime_t current_gvt;
46 1 : static _Atomic(rid_t) c_a = 0;
47 : static _Atomic(rid_t) c_b = 0;
48 :
49 : static _Atomic(nid_t) gvt_nodes;
50 :
51 : _Thread_local _Bool gvt_phase;
52 : _Thread_local uint32_t remote_msg_seq[2][MAX_NODES];
53 : _Thread_local uint32_t remote_msg_received[2];
54 :
55 : /**
56 : * @brief Initializes the gvt module in the node
57 : */
58 : void gvt_global_init(void)
59 : {
60 : gvt_timer = timer_new();
61 : }
62 :
63 : /**
64 : * @brief Handles a MSG_CTRL_GVT_START control message
65 : *
66 : * Called by the MPI layer in response to a MSG_CTRL_GVT_START control message,
67 : * but also internally to start a new reduction
68 : */
69 1 : void gvt_start_processing(void)
70 : {
71 : stats_time_start(STATS_GVT);
72 : current_gvt = SIMTIME_MAX;
73 : thread_phase = thread_phase_A;
74 : }
75 :
76 : /**
77 : * @brief Handles a MSG_CTRL_GVT_DONE control message
78 : *
79 : * Called by the MPI layer in response to a MSG_CTRL_GVT_DONE control message
80 : */
81 1 : void gvt_on_done_ctrl_msg(void)
82 : {
83 : atomic_fetch_sub_explicit(&gvt_nodes, 1U, memory_order_relaxed);
84 : }
85 :
86 0 : void gvt_on_msg_extraction(simtime_t msg_t)
87 : {
88 : if (unlikely(thread_phase && current_gvt > msg_t))
89 : current_gvt = msg_t;
90 : }
91 :
92 0 : static inline simtime_t gvt_node_reduce(void)
93 : {
94 : unsigned i = n_threads - 1;
95 : simtime_t candidate = reducing_p[i];
96 : while (i--)
97 : candidate = min(reducing_p[i], candidate);
98 :
99 : return candidate;
100 : }
101 :
102 0 : static bool gvt_thread_phase_run(void)
103 : {
104 : switch (thread_phase) {
105 : case thread_phase_A:
106 : if (atomic_load_explicit(&c_a, memory_order_relaxed))
107 : break;
108 : current_gvt = min(current_gvt, msg_queue_time_peek());
109 : thread_phase = thread_phase_B;
110 : atomic_fetch_add_explicit(&c_b, 1U, memory_order_relaxed);
111 : break;
112 : case thread_phase_B:
113 : if (atomic_load_explicit(&c_b, memory_order_relaxed) !=
114 : n_threads)
115 : break;
116 : thread_phase = thread_phase_C;
117 : atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
118 : break;
119 : case thread_phase_C:
120 : if (atomic_load_explicit(&c_a, memory_order_relaxed) !=
121 : n_threads)
122 : break;
123 : reducing_p[rid] = min(current_gvt, msg_queue_time_peek());
124 : thread_phase = thread_phase_D;
125 : atomic_fetch_sub_explicit(&c_b, 1U, memory_order_release);
126 : break;
127 : case thread_phase_D:
128 : if (atomic_load_explicit(&c_b, memory_order_acquire))
129 : break;
130 : thread_phase = thread_phase_idle;
131 : atomic_fetch_sub_explicit(&c_a, 1U, memory_order_relaxed);
132 : return true;
133 : default:
134 : __builtin_unreachable();
135 : }
136 : return false;
137 : }
138 :
139 : #ifdef ROOTSIM_MPI
140 :
141 0 : static bool gvt_node_phase_run(void)
142 : {
143 : static _Thread_local enum node_phase node_phase = node_phase_redux_first;
144 : static _Thread_local uint32_t last_seq[2][MAX_NODES];
145 : static _Atomic(uint32_t) total_sent[MAX_NODES];
146 : static _Atomic(int32_t) total_msg_received;
147 : static uint32_t remote_msg_to_receive;
148 : static _Atomic(rid_t) c_c;
149 : static _Atomic(rid_t) c_d;
150 :
151 : switch (node_phase) {
152 : case node_phase_redux_first:
153 : case node_phase_redux_second:
154 : if (!gvt_thread_phase_run())
155 : break;
156 :
157 : gvt_phase = gvt_phase ^ !node_phase;
158 : thread_phase = thread_phase_A;
159 : ++node_phase;
160 : break;
161 : case node_sent_reduce:
162 : if (atomic_load_explicit(&c_a, memory_order_relaxed))
163 : break;
164 :
165 : for (nid_t i = n_nodes - 1; i >= 0; --i)
166 : atomic_fetch_add_explicit(&total_sent[i],
167 : remote_msg_seq[!gvt_phase][i] -
168 : last_seq[!gvt_phase][i],
169 : memory_order_relaxed);
170 : memcpy(last_seq[!gvt_phase], remote_msg_seq[!gvt_phase],
171 : sizeof(uint32_t) * n_nodes);
172 :
173 : atomic_fetch_add_explicit(&total_msg_received, 1U,
174 : memory_order_relaxed);
175 : // synchronizes total_sent and sent values zeroing
176 : if (atomic_fetch_add_explicit(&c_c, 1U, memory_order_acq_rel) !=
177 : n_threads - 1) {
178 : node_phase = node_sent_wait;
179 : break;
180 : }
181 : mpi_reduce_sum_scatter((uint32_t *)total_sent,
182 : &remote_msg_to_receive);
183 : node_phase = node_sent_reduce_wait;
184 : break;
185 : case node_sent_reduce_wait:
186 : if (!mpi_reduce_sum_scatter_done())
187 : break;
188 : atomic_fetch_sub_explicit(&total_msg_received,
189 : remote_msg_to_receive + n_threads,
190 : memory_order_relaxed);
191 : node_phase = node_sent_wait;
192 : break;
193 : case node_sent_wait: {
194 : int32_t r = atomic_fetch_add_explicit(&total_msg_received,
195 : remote_msg_received[!gvt_phase],
196 : memory_order_relaxed);
197 : remote_msg_received[!gvt_phase] = 0;
198 : if (r)
199 : break;
200 : uint32_t q = n_nodes / n_threads + 1;
201 : memset(total_sent + rid * q, 0, q * sizeof(*total_sent));
202 : node_phase = node_phase_redux_second;
203 : break;
204 : }
205 : case node_min_reduce:
206 : if (atomic_fetch_add_explicit(&c_d, 1U, memory_order_relaxed)) {
207 : node_phase = node_min_wait;
208 : break;
209 : }
210 : *reducing_p = gvt_node_reduce();
211 : mpi_reduce_min(reducing_p);
212 : node_phase = node_min_reduce_wait;
213 : break;
214 : case node_min_reduce_wait:
215 : if (atomic_load_explicit(&c_d, memory_order_relaxed) !=
216 : n_threads || !mpi_reduce_min_done())
217 : break;
218 : atomic_fetch_sub_explicit(&c_c, n_threads, memory_order_release);
219 : node_phase = node_done;
220 : return true;
221 : case node_min_wait:
222 : if (atomic_load_explicit(&c_c, memory_order_acquire))
223 : break;
224 : node_phase = node_done;
225 : return true;
226 : case node_done:
227 : node_phase = node_phase_redux_first;
228 : thread_phase = thread_phase_idle;
229 : if (atomic_fetch_sub_explicit(&c_d, 1U, memory_order_relaxed) ==
230 : 1)
231 : mpi_control_msg_send_to(MSG_CTRL_GVT_DONE, 0);
232 : break;
233 : default:
234 : __builtin_unreachable();
235 : }
236 : return false;
237 : }
238 :
239 0 : simtime_t gvt_phase_run(void)
240 : {
241 : if (unlikely(thread_phase)) {
242 : if (!gvt_node_phase_run())
243 : return 0.0;
244 : if (!rid && !nid)
245 : gvt_timer = timer_new();
246 : stats_time_take(STATS_GVT);
247 : return *reducing_p;
248 : }
249 :
250 : if (unlikely(atomic_load_explicit(&c_b, memory_order_relaxed)))
251 : gvt_start_processing();
252 :
253 : if (unlikely(!rid && !nid &&
254 : global_config.gvt_period < timer_value(gvt_timer) &&
255 : !atomic_load_explicit(&gvt_nodes, memory_order_relaxed))) {
256 : atomic_fetch_add_explicit(&gvt_nodes, n_nodes, memory_order_relaxed);
257 : mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
258 : }
259 :
260 : return 0;
261 : }
262 :
263 : #else
264 :
265 : simtime_t gvt_phase_run(void)
266 : {
267 : if (unlikely(thread_phase)) {
268 : if (!gvt_thread_phase_run())
269 : return 0.0;
270 : if (!rid)
271 : gvt_timer = timer_new();
272 : stats_time_take(STATS_GVT);
273 : return gvt_node_reduce();
274 : }
275 :
276 : if (unlikely(atomic_load_explicit(&c_b, memory_order_relaxed)))
277 : gvt_start_processing();
278 :
279 : if (unlikely(!rid && global_config.gvt_period < timer_value(gvt_timer)))
280 : mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
281 :
282 : return 0;
283 : }
284 :
285 : #endif
286 :
287 0 : extern void gvt_remote_msg_send(struct lp_msg *msg, nid_t dest_nid);
288 0 : extern void gvt_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid);
289 0 : extern void gvt_remote_msg_receive(struct lp_msg *msg);
290 0 : extern void gvt_remote_anti_msg_receive(struct lp_msg *msg);
|