The ROme OpTimistic Simulator  3.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
gvt.c
Go to the documentation of this file.
1 
9 #include <gvt/gvt.h>
10 
11 #include <arch/timer.h>
12 #include <core/init.h>
13 #include <datatypes/msg_queue.h>
14 #include <distributed/mpi.h>
15 #include <log/stats.h>
16 
17 #include <memory.h>
18 #include <stdatomic.h>
19 
22  tphase_rdy = 0,
23  tphase_A,
24  tphase_B,
25 #ifdef ROOTSIM_MPI
26  tphase_B_reduce,
27  tphase_C,
28  tphase_C_reduce,
29  tphase_B_rdone,
30  tphase_C_rdone,
31  tphase_B_wait_msgs,
32 #endif
33  tphase_wait
34 };
35 
36 static __thread enum thread_phase_t thread_phase = tphase_rdy;
37 
38 static timer_uint last_gvt;
39 static simtime_t reducing_p[MAX_THREADS];
40 static __thread simtime_t current_gvt;
41 
42 #ifdef ROOTSIM_MPI
43 
44 static atomic_uint sent_tot[MAX_NODES];
45 static _Atomic(nid_t) missing_nodes;
46 
47 __thread bool gvt_phase_green = false;
48 __thread unsigned remote_msg_sent[MAX_NODES] = {0};
49 atomic_int remote_msg_received[2];
50 
51 #endif
52 
56 void gvt_global_init(void)
57 {
58  last_gvt = timer_new();
59 }
60 
61 static inline simtime_t gvt_node_reduce(void)
62 {
63  unsigned i = n_threads - 1;
64  simtime_t candidate = reducing_p[i];
65  while(i--){
66  candidate = min(reducing_p[i], candidate);
67  }
68  return candidate;
69 }
70 
71 #ifndef ROOTSIM_MPI
72 
73 simtime_t gvt_phase_run(void)
74 {
75  static atomic_uint c_a = 0;
76  static atomic_uint c_b = 0;
77 
78  if(likely(thread_phase == tphase_rdy)) {
79  if (rid) {
80  if (likely(!atomic_load_explicit(&c_a,
81  memory_order_relaxed)))
82  return 0;
83  } else {
85  timer_value(last_gvt) || atomic_load_explicit(
86  &c_b, memory_order_relaxed)))
87  return 0;
88  }
89  stats_time_start(STATS_GVT);
90  current_gvt = SIMTIME_MAX;
91  thread_phase = tphase_A;
92  atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
93  return 0;
94  }
95 
96  switch (thread_phase) {
97  default:
98  case tphase_rdy:
99  __builtin_unreachable();
100  /* fallthrough */
101  case tphase_A:
102  if (atomic_load_explicit(&c_a, memory_order_relaxed)
103  == n_threads) {
104  simtime_t this_t = msg_queue_time_peek();
105  reducing_p[rid] = min(current_gvt, this_t);
106  thread_phase = tphase_B;
107  atomic_fetch_add_explicit(&c_b, 1U,
108  memory_order_release);
109  }
110  break;
111  case tphase_B:
112  if (atomic_load_explicit(&c_b, memory_order_acquire)
113  == n_threads) {
114  thread_phase = tphase_wait;
115  atomic_fetch_sub_explicit(&c_a, 1U,
116  memory_order_relaxed);
117  if(!rid){
118  last_gvt = timer_new();
119  }
120  stats_time_take(STATS_GVT);
121  return gvt_node_reduce();
122  }
123  break;
124  case tphase_wait:
125  if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
126  atomic_fetch_sub_explicit(&c_b, 1U,
127  memory_order_relaxed);
128  thread_phase = tphase_rdy;
129  }
130  break;
131  }
132 
133  return 0;
134 }
135 
136 #else
137 
138 static atomic_uint c_a = 0;
139 
146 {
147  stats_time_start(STATS_GVT);
148  current_gvt = SIMTIME_MAX;
149  thread_phase = tphase_A;
150  atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
151 }
152 
159 {
160  atomic_fetch_sub_explicit(&missing_nodes, 1U, memory_order_relaxed);
161 }
162 
163 simtime_t gvt_phase_run(void)
164 {
165  static atomic_uint c_b = 0;
166  static unsigned remote_msg_to_receive;
167  static __thread bool red_round = false;
168 
169  if(likely(thread_phase == tphase_rdy)) {
170  if (nid || rid) {
171  if (likely(!atomic_load_explicit(&c_a,
172  memory_order_relaxed)))
173  return 0;
174  } else {
176  timer_value(last_gvt) || atomic_load_explicit(
177  &missing_nodes, memory_order_relaxed)))
178  return 0;
179 
180  atomic_store_explicit(&missing_nodes, n_nodes,
181  memory_order_relaxed);
183  }
184  stats_time_start(STATS_GVT);
185  current_gvt = SIMTIME_MAX;
186  thread_phase = tphase_A;
187  atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
188  return 0;
189  }
190 
191  switch (thread_phase) {
192  default:
193  case tphase_rdy:
194  __builtin_unreachable();
195  /* fallthrough */
196  case tphase_A:
197  if (atomic_load_explicit(&c_a, memory_order_relaxed)
198  == n_threads) {
199  simtime_t this_t = msg_queue_time_peek();
200  // this leverages the enum layout
201  thread_phase = tphase_B + (2 * red_round) + !rid;
202 
203  red_round = !red_round;
204  if (red_round) {
205  for(nid_t i = 0; i < n_nodes; ++i)
206  atomic_fetch_add_explicit(&sent_tot[i],
207  remote_msg_sent[i],
208  memory_order_relaxed);
209  // release renders visible the sum aggregation
210  atomic_fetch_add_explicit(&c_b, 1U,
211  memory_order_release);
212 
213  memset(remote_msg_sent, 0,
214  sizeof(unsigned) * n_nodes);
215  gvt_phase_green = !gvt_phase_green;
216  current_gvt = min(current_gvt, this_t);
217  } else {
218  reducing_p[rid] = min(current_gvt, this_t);
219  // release renders visible the thread local gvt
220  atomic_fetch_add_explicit(&c_b, 1U,
221  memory_order_release);
222  }
223  }
224  break;
225  case tphase_B:
226  // acquire is needed to sync with the remote_msg_received sum
227  // and the c_b counter
228  if (!atomic_load_explicit(&c_a, memory_order_acquire))
229  thread_phase = tphase_B_wait_msgs;
230  break;
231  case tphase_B_reduce:
232  // acquire is needed to sync with all threads aggregated values
233  if (atomic_load_explicit(&c_b, memory_order_acquire) ==
234  n_threads) {
235  mpi_reduce_sum_scatter((unsigned *)sent_tot,
236  &remote_msg_to_receive);
237  thread_phase = tphase_B_rdone;
238  }
239  break;
240  case tphase_B_rdone:
242  atomic_fetch_sub_explicit(remote_msg_received +
243  !gvt_phase_green, remote_msg_to_receive,
244  memory_order_relaxed);
245  // release renders visible the remote_msg_received sum
246  // and the c_b counter
247  atomic_store_explicit(&c_a, 0, memory_order_release);
248  memset(sent_tot, 0, sizeof(atomic_uint) * n_nodes);
249  thread_phase = tphase_B_wait_msgs;
250  }
251  break;
252  case tphase_C:
253  // no sync needed
254  if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
255  atomic_fetch_sub_explicit(&c_b, 1U,
256  memory_order_relaxed);
257  thread_phase = tphase_wait;
258  return *reducing_p;
259  }
260  break;
261  case tphase_C_reduce:
262  // acquire is needed to sync with all threads gvt local values
263  if (atomic_load_explicit(&c_b, memory_order_acquire) ==
264  n_threads) {
265  *reducing_p = gvt_node_reduce();
266  mpi_reduce_min(reducing_p);
267  thread_phase = tphase_C_rdone;
268  }
269  break;
270  case tphase_C_rdone:
271  if (mpi_reduce_min_done()) {
272  atomic_fetch_sub_explicit(&c_b, 1U,
273  memory_order_relaxed);
274  // no sync needed
275  atomic_store_explicit(&c_a, 0, memory_order_relaxed);
276  last_gvt = timer_new();
277  thread_phase = tphase_wait;
278  return *reducing_p;
279  }
280  break;
281  case tphase_B_wait_msgs:
282  // don't need to sync: we already synced on c_f in tphase_D
283  if (!atomic_load_explicit(remote_msg_received +
284  !gvt_phase_green, memory_order_relaxed)) {
285  thread_phase = tphase_wait;
286  atomic_fetch_sub_explicit(&c_b, 1U,
287  memory_order_relaxed);
288  }
289  break;
290  case tphase_wait:
291  if (!atomic_load_explicit(&c_b, memory_order_relaxed)) {
292  // this restarts the gvt phases for the second round of node
293  // local gvt reduction or, if we already did that, it simply
294  // ends the gvt reduction
295  thread_phase = red_round;
296  if (red_round) {
297  atomic_fetch_add_explicit(&c_a, 1U,
298  memory_order_relaxed);
299  } else {
300  if(!rid)
302  MSG_CTRL_GVT_DONE, 0);
303  stats_time_take(STATS_GVT);
304  }
305  }
306  break;
307  }
308  return 0;
309 }
310 
311 #endif
312 
313 void gvt_on_msg_process(simtime_t msg_t)
314 {
315  if (unlikely(thread_phase && current_gvt > msg_t))
316  current_gvt = msg_t;
317 }
simtime_t
double simtime_t
The type used to represent logical time in the simulation.
Definition: core.h:62
MSG_CTRL_GVT_DONE
@ MSG_CTRL_GVT_DONE
Used by slaves to signal their completion of the gvt protocol.
Definition: mpi.h:27
nid
nid_t nid
The node identifier of the node.
Definition: core.c:20
MSG_CTRL_GVT_START
@ MSG_CTRL_GVT_START
Used by the master to start a new gvt reduction operation.
Definition: mpi.h:25
nid_t
int nid_t
Used to identify MPI nodes in a distributed environment.
Definition: core.h:79
MAX_THREADS
#define MAX_THREADS
The maximum number of supported threads.
Definition: core.h:71
simulation_configuration::gvt_period
unsigned gvt_period
The gvt period expressed in microseconds.
Definition: init.h:24
mpi_reduce_min
void mpi_reduce_min(simtime_t *node_min_p)
Computes the min-reduction operation across all nodes.
Definition: mpi.c:339
n_threads
rid_t n_threads
The total number of MPI nodes in the simulation.
Definition: core.c:15
mpi_reduce_sum_scatter_done
bool mpi_reduce_sum_scatter_done(void)
Checks if a previous mpi_reduce_sum_scatter() operation has completed.
Definition: mpi.c:315
timer_uint
uint_fast64_t timer_uint
Definition: timer.h:20
SIMTIME_MAX
#define SIMTIME_MAX
The maximum value of the logical simulation time, semantically never.
Definition: core.h:64
stats.h
Statistics module.
mpi_control_msg_broadcast
void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
Sends a platform control message to all the other nodes.
Definition: mpi.c:180
init.h
Initialization routines.
mpi.h
MPI Support Module.
timer.h
Timers.
mpi_reduce_sum_scatter
void mpi_reduce_sum_scatter(const unsigned node_vals[n_nodes], unsigned *result)
Computes the sum-reduction-scatter operation across all nodes.
Definition: mpi.c:303
rid
__thread rid_t rid
The identifier of the thread.
Definition: core.c:16
msg_queue.h
Message queue datatype.
gvt_on_start_ctrl_msg
void gvt_on_start_ctrl_msg(void)
Handles a MSG_CTRL_GVT_START control message.
Definition: gvt.c:145
gvt_on_done_ctrl_msg
void gvt_on_done_ctrl_msg(void)
Handles a MSG_CTRL_GVT_DONE control message.
Definition: gvt.c:158
likely
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:57
MAX_NODES
#define MAX_NODES
The maximum number of supported MPI nodes.
Definition: core.h:68
unlikely
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:59
global_config
struct simulation_configuration global_config
The configuration filled in by init_args_parse()
Definition: init.c:27
timer_value
timer_uint timer_value(timer_uint start)
Computes a time interval measure using a previous timer_uint value.
mpi_reduce_min_done
bool mpi_reduce_min_done(void)
Checks if a previous mpi_reduce_min() operation has completed.
Definition: mpi.c:353
msg_queue_time_peek
simtime_t msg_queue_time_peek(void)
Peeks the timestamp of the next message from the queue.
Definition: msg_queue.c:137
gvt.h
Global Virtual Time.
thread_phase_t
thread_phase_t
A thread phase during the gvt algorithm computation.
Definition: gvt.c:21
mpi_control_msg_send_to
void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
Sends a platform control message to a specific nodes.
Definition: mpi.c:199
gvt_global_init
void gvt_global_init(void)
Initializes the gvt module in the node.
Definition: gvt.c:56
timer_new
timer_uint timer_new(void)
Gets a new starting point for an time interval measure.