The ROme OpTimistic Simulator  3.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
mpi.c
Go to the documentation of this file.
1 
16 #include <distributed/mpi.h>
17 
18 #ifdef ROOTSIM_MPI
19 
20 #include <core/core.h>
21 #include <core/sync.h>
22 #include <datatypes/array.h>
23 #include <datatypes/msg_queue.h>
25 #include <gvt/gvt.h>
26 #include <gvt/termination.h>
27 #include <mm/msg_allocator.h>
28 
29 #include <mpi.h>
30 
31 #define MSG_CTRL_RAW (MSG_CTRL_TERMINATION + 1)
32 
33 #ifdef ROOTSIM_MPI_SERIALIZABLE
34 
35 static bool mpi_serialize;
36 static spinlock_t mpi_spinlock;
37 
38 #define mpi_lock() if (mpi_serialize) spin_lock(&mpi_spinlock)
39 #define mpi_unlock() if (mpi_serialize) spin_unlock(&mpi_spinlock)
40 #define mpi_trylock() (!mpi_serialize || spin_trylock(&mpi_spinlock))
41 
42 #else
43 
44 #define mpi_lock()
45 #define mpi_unlock()
46 #define mpi_trylock() (true)
47 
48 #endif
49 
59 static void comm_error_handler(MPI_Comm *comm, int *err_code_p, ...)
60 {
61  (void) comm;
62  log_log(LOG_FATAL, "MPI error with code %d!", *err_code_p);
63 
64  int err_len;
65  char err_str[MPI_MAX_ERROR_STRING];
66  MPI_Error_string(*err_code_p, err_str, &err_len);
67  log_log(LOG_FATAL, "MPI error msg is %s ", err_str);
68 
69  exit(-1);
70 }
71 
77 void mpi_global_init(int *argc_p, char ***argv_p)
78 {
79  int thread_lvl = MPI_THREAD_SINGLE;
80  MPI_Init_thread(argc_p, argv_p, MPI_THREAD_MULTIPLE, &thread_lvl);
81 
82  if (thread_lvl < MPI_THREAD_MULTIPLE) {
83  if (thread_lvl < MPI_THREAD_SERIALIZED) {
85  "This MPI implementation does not support threaded access");
86  abort();
87  } else {
88 #ifdef ROOTSIM_MPI_SERIALIZABLE
89  mpi_serialize = true;
90  spin_init(&mpi_spinlock);
91 #else
93  "This MPI implementation only supports serialized calls: you need to build ROOT-Sim with -Dserialized_mpi=true");
94  abort();
95 #endif
96  }
97  }
98 
99  MPI_Errhandler err_handler;
100  if (MPI_Comm_create_errhandler(comm_error_handler, &err_handler)) {
101  log_log(LOG_FATAL, "Unable to create MPI error handler");
102  abort();
103  }
104 
105  MPI_Comm_set_errhandler(MPI_COMM_WORLD, err_handler);
106 
107  int helper;
108  MPI_Comm_rank(MPI_COMM_WORLD, &helper);
109  nid = helper;
110  MPI_Comm_size(MPI_COMM_WORLD, &helper);
111  n_nodes = helper;
112 }
113 
117 void mpi_global_fini(void)
118 {
119  MPI_Errhandler err_handler;
120  MPI_Comm_get_errhandler(MPI_COMM_WORLD, &err_handler);
121  MPI_Errhandler_free(&err_handler);
122 
123  MPI_Finalize();
124 }
125 
138 void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
139 {
140  msg->msg_id = msg_id_get(msg, gvt_phase_get());
141  gvt_on_remote_msg_send(dest_nid);
142 
143  mpi_lock();
144  MPI_Request req;
145  MPI_Isend(msg, msg_bare_size(msg), MPI_BYTE, dest_nid, 0,
146  MPI_COMM_WORLD, &req);
147  MPI_Request_free(&req);
148  mpi_unlock();
149 }
150 
163 void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
164 {
165  msg_id_anti_phase_set(msg->msg_id, gvt_phase_get());
166  gvt_on_remote_msg_send(dest_nid);
167 
168  mpi_lock();
169  MPI_Request req;
170  MPI_Isend(&msg->msg_id, sizeof(msg->msg_id), MPI_BYTE, dest_nid, 0,
171  MPI_COMM_WORLD, &req);
172  MPI_Request_free(&req);
173  mpi_unlock();
174 }
175 
181 {
182  MPI_Request req;
183  nid_t i = n_nodes;
184  mpi_lock();
185  while (i--) {
186  if(i == nid)
187  continue;
188  MPI_Isend(NULL, 0, MPI_BYTE, i, ctrl, MPI_COMM_WORLD, &req);
189  MPI_Request_free(&req);
190  }
191  mpi_unlock();
192 }
193 
200 {
201  MPI_Request req;
202  mpi_lock();
203  MPI_Isend(NULL, 0, MPI_BYTE, dest, ctrl, MPI_COMM_WORLD, &req);
204  MPI_Request_free(&req);
205  mpi_unlock();
206 }
207 
219 {
220  int pending;
221  MPI_Message mpi_msg;
222  MPI_Status status;
223 
224  while (1) {
225  if (!mpi_trylock())
226  return;
227 
228  MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
229  &pending, &mpi_msg, &status);
230 
231  if (!pending) {
232  mpi_unlock();
233  return;
234  }
235 
236  if (unlikely(status.MPI_TAG)) {
237  MPI_Mrecv(NULL, 0, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
238  mpi_unlock();
239  switch(status.MPI_TAG){
240  case MSG_CTRL_GVT_START:
242  break;
243  case MSG_CTRL_GVT_DONE:
245  break;
247  termination_on_ctrl_msg();
248  break;
249  }
250  } else {
251  int size;
252  MPI_Get_count(&status, MPI_BYTE, &size);
253 
254  if (unlikely(size == sizeof(uintptr_t))) {
255  uintptr_t anti_id;
256  MPI_Mrecv(&anti_id, size, MPI_BYTE, &mpi_msg,
257  MPI_STATUS_IGNORE);
258  mpi_unlock();
259 
260  remote_msg_map_match(anti_id,
261  status.MPI_SOURCE, NULL);
262  gvt_on_remote_msg_receive(
263  msg_id_anti_phase_get(anti_id));
264  } else {
265  mpi_unlock();
266 
267  struct lp_msg *msg = msg_allocator_alloc(size -
268  offsetof(struct lp_msg, pl));
269 
270  mpi_lock();
271  MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
272  MPI_STATUS_IGNORE);
273  mpi_unlock();
274 
275  uintptr_t msg_id = msg->msg_id;
276  atomic_store_explicit(&msg->flags, 0U,
277  memory_order_relaxed);
278  remote_msg_map_match(msg_id,
279  status.MPI_SOURCE, msg);
280  msg_queue_insert(msg);
281  gvt_on_remote_msg_receive(
282  msg_id_phase_get(msg_id));
283  }
284  }
285  }
286 }
287 
288 static MPI_Request reduce_sum_scatter_req = MPI_REQUEST_NULL;
289 
303 void mpi_reduce_sum_scatter(const unsigned node_vals[n_nodes], unsigned *result)
304 {
305  mpi_lock();
306  MPI_Ireduce_scatter_block(node_vals, result, 1, MPI_UNSIGNED, MPI_SUM,
307  MPI_COMM_WORLD, &reduce_sum_scatter_req);
308  mpi_unlock();
309 }
310 
316 {
317  int flag = 0;
318  mpi_lock();
319  MPI_Test(&reduce_sum_scatter_req, &flag, MPI_STATUS_IGNORE);
320  mpi_unlock();
321  return flag;
322 }
323 
324 static MPI_Request reduce_min_req = MPI_REQUEST_NULL;
325 
339 void mpi_reduce_min(simtime_t *node_min_p)
340 {
341  static simtime_t min_buff;
342  min_buff = *node_min_p;
343  mpi_lock();
344  MPI_Iallreduce(&min_buff, node_min_p, 1, MPI_DOUBLE, MPI_MIN,
345  MPI_COMM_WORLD, &reduce_min_req);
346  mpi_unlock();
347 }
348 
354 {
355  int flag = 0;
356  mpi_lock();
357  MPI_Test(&reduce_min_req, &flag, MPI_STATUS_IGNORE);
358  mpi_unlock();
359  return flag;
360 }
361 
366 {
367  mpi_lock();
368  MPI_Barrier(MPI_COMM_WORLD);
369  mpi_unlock();
370 }
371 
381 void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
382 {
383  mpi_lock();
384  MPI_Send(data, data_size, MPI_BYTE, dest, MSG_CTRL_RAW, MPI_COMM_WORLD);
385  mpi_unlock();
386 }
387 
397 void *mpi_blocking_data_rcv(int *data_size_p, nid_t src)
398 {
399  MPI_Status status;
400  MPI_Message mpi_msg;
401  mpi_lock();
402  MPI_Mprobe(src, MSG_CTRL_RAW, MPI_COMM_WORLD, &mpi_msg, &status);
403  int data_size;
404  MPI_Get_count(&status, MPI_BYTE, &data_size);
405  char *ret = mm_alloc(data_size);
406  MPI_Mrecv(ret, data_size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
407  if (data_size_p != NULL)
408  *data_size_p = data_size;
409  mpi_unlock();
410  return ret;
411 }
412 
413 #endif
simtime_t
double simtime_t
The type used to represent logical time in the simulation.
Definition: core.h:62
lp_msg::flags
atomic_int flags
The flags to handle local anti messages.
Definition: msg.h:49
mpi_remote_anti_msg_send
void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
Sends a model anti-message to a LP residing on another node.
Definition: mpi.c:163
mpi_remote_msg_handle
void mpi_remote_msg_handle(void)
Empties the queue of incoming MPI messages, doing the right thing for each one of them.
Definition: mpi.c:218
msg_ctrl_tag
msg_ctrl_tag
A control message MPI tag value.
Definition: mpi.h:23
MSG_CTRL_GVT_DONE
@ MSG_CTRL_GVT_DONE
Used by slaves to signal their completion of the gvt protocol.
Definition: mpi.h:27
nid
nid_t nid
The node identifier of the node.
Definition: core.c:20
mpi_global_init
void mpi_global_init(int *argc_p, char ***argv_p)
Initializes the MPI environment.
Definition: mpi.c:77
MSG_CTRL_GVT_START
@ MSG_CTRL_GVT_START
Used by the master to start a new gvt reduction operation.
Definition: mpi.h:25
sync.h
Easier Synchronization primitives.
nid_t
int nid_t
Used to identify MPI nodes in a distributed environment.
Definition: core.h:79
mm_alloc
void * mm_alloc(size_t mem_size)
A version of the stdlib malloc() used internally.
Definition: mm.h:26
comm_error_handler
static void comm_error_handler(MPI_Comm *comm, int *err_code_p,...)
Handles a MPI error.
Definition: mpi.c:59
spin_init
#define spin_init(lck_p)
Initializes a spinlock.
Definition: sync.h:24
lp_msg
A model simulation message.
Definition: msg.h:34
mpi_node_barrier
void mpi_node_barrier(void)
A node barrier.
Definition: mpi.c:365
mpi_reduce_min
void mpi_reduce_min(simtime_t *node_min_p)
Computes the min-reduction operation across all nodes.
Definition: mpi.c:339
msg_allocator.h
Memory management functions for messages.
mpi_reduce_sum_scatter_done
bool mpi_reduce_sum_scatter_done(void)
Checks if a previous mpi_reduce_sum_scatter() operation has completed.
Definition: mpi.c:315
mpi_blocking_data_rcv
void * mpi_blocking_data_rcv(int *data_size_p, nid_t src)
Receives a byte buffer from another node.
Definition: mpi.c:397
lp_msg::dest
lp_id_t dest
The id of the recipient LP.
Definition: msg.h:36
array.h
Dynamic array datatype.
termination.h
Termination detection module.
mpi_control_msg_broadcast
void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
Sends a platform control message to all the other nodes.
Definition: mpi.c:180
log_log
#define log_log(lvl,...)
Produces a log.
Definition: log.h:49
mpi.h
MPI Support Module.
mpi_reduce_sum_scatter
void mpi_reduce_sum_scatter(const unsigned node_vals[n_nodes], unsigned *result)
Computes the sum-reduction-scatter operation across all nodes.
Definition: mpi.c:303
LOG_FATAL
#define LOG_FATAL
The logging level reserved to unexpected, fatal conditions.
Definition: log.h:35
msg_queue.h
Message queue datatype.
gvt_on_start_ctrl_msg
void gvt_on_start_ctrl_msg(void)
Handles a MSG_CTRL_GVT_START control message.
Definition: gvt.c:145
mpi_remote_msg_send
void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
Sends a model message to a LP residing on another node.
Definition: mpi.c:138
gvt_on_done_ctrl_msg
void gvt_on_done_ctrl_msg(void)
Handles a MSG_CTRL_GVT_DONE control message.
Definition: gvt.c:158
MSG_CTRL_TERMINATION
@ MSG_CTRL_TERMINATION
Used in broadcast to signal that local LPs can terminate.
Definition: mpi.h:29
remote_msg_map.h
Message map datatype.
lp_msg::pl
unsigned char pl[48]
The initial part of the payload.
Definition: msg.h:54
msg_queue_insert
void msg_queue_insert(struct lp_msg *msg)
Inserts a message in the queue.
Definition: msg_queue.c:168
mpi_global_fini
void mpi_global_fini(void)
Finalizes the MPI environment.
Definition: mpi.c:117
core.h
Core ROOT-Sim functionalities.
unlikely
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:59
mpi_blocking_data_send
void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
Sends a byte buffer to another node.
Definition: mpi.c:381
lp_msg::msg_id
uintptr_t msg_id
The message unique id, used for inter-node anti messages.
Definition: msg.h:51
mpi_reduce_min_done
bool mpi_reduce_min_done(void)
Checks if a previous mpi_reduce_min() operation has completed.
Definition: mpi.c:353
gvt.h
Global Virtual Time.
mpi_control_msg_send_to
void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
Sends a platform control message to a specific nodes.
Definition: mpi.c:199
spinlock_t
atomic_flag spinlock_t
The type of a spinlock, an efficient lock primitive in contended scenarios.
Definition: sync.h:18