Line data Source code
1 1 : /**
2 : * @file distributed/mpi.c
3 : *
4 : * @brief MPI Support Module
5 : *
6 : * This module implements all basic MPI facilities to let the distributed
7 : * execution of a simulation model take place consistently.
8 : *
9 : * Several facilities are thread-safe, others are not. Check carefully which
10 : * of these can be used by worker threads without coordination when relying
11 : * on this module.
12 : *
13 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
14 : * SPDX-License-Identifier: GPL-3.0-only
15 : */
16 : #include <distributed/mpi.h>
17 :
18 : #ifdef ROOTSIM_MPI
19 :
20 : #include <core/core.h>
21 : #include <core/sync.h>
22 : #include <datatypes/array.h>
23 : #include <datatypes/msg_queue.h>
24 : #include <datatypes/remote_msg_map.h>
25 : #include <gvt/gvt.h>
26 : #include <gvt/termination.h>
27 : #include <mm/msg_allocator.h>
28 :
29 : #include <mpi.h>
30 :
31 0 : #define MSG_CTRL_RAW (MSG_CTRL_TERMINATION + 1)
32 :
33 : #ifdef ROOTSIM_MPI_SERIALIZABLE
34 :
35 : static bool mpi_serialize;
36 : static spinlock_t mpi_spinlock;
37 :
38 : #define mpi_lock() if (mpi_serialize) spin_lock(&mpi_spinlock)
39 : #define mpi_unlock() if (mpi_serialize) spin_unlock(&mpi_spinlock)
40 : #define mpi_trylock() (!mpi_serialize || spin_trylock(&mpi_spinlock))
41 :
42 : #else
43 :
44 0 : #define mpi_lock()
45 0 : #define mpi_unlock()
46 0 : #define mpi_trylock() (true)
47 :
48 : #endif
49 :
50 : /**
51 : * @brief Handles a MPI error
52 : * @param comm the MPI communicator involved in the error
53 : * @param err_code_p a pointer to the error code
54 : * @param ... an implementation specific list of additional arguments in which
55 : * we are not interested
56 : *
57 : * This handler is registered in mpi_global_init() to print meaningful MPI errors
58 : */
59 1 : static void comm_error_handler(MPI_Comm *comm, int *err_code_p, ...)
60 : {
61 : (void) comm;
62 : log_log(LOG_FATAL, "MPI error with code %d!", *err_code_p);
63 :
64 : int err_len;
65 : char err_str[MPI_MAX_ERROR_STRING];
66 : MPI_Error_string(*err_code_p, err_str, &err_len);
67 : log_log(LOG_FATAL, "MPI error msg is %s ", err_str);
68 :
69 : exit(-1);
70 : }
71 :
72 : /**
73 : * @brief Initializes the MPI environment
74 : * @param argc_p a pointer to the OS supplied argc
75 : * @param argv_p a pointer to the OS supplied argv
76 : */
77 1 : void mpi_global_init(int *argc_p, char ***argv_p)
78 : {
79 : int thread_lvl = MPI_THREAD_SINGLE;
80 : MPI_Init_thread(argc_p, argv_p, MPI_THREAD_MULTIPLE, &thread_lvl);
81 :
82 : if (thread_lvl < MPI_THREAD_MULTIPLE) {
83 : if (thread_lvl < MPI_THREAD_SERIALIZED) {
84 : log_log(LOG_FATAL,
85 : "This MPI implementation does not support threaded access");
86 : abort();
87 : } else {
88 : #ifdef ROOTSIM_MPI_SERIALIZABLE
89 : mpi_serialize = true;
90 : spin_init(&mpi_spinlock);
91 : #else
92 : log_log(LOG_FATAL,
93 : "This MPI implementation only supports serialized calls: you need to build ROOT-Sim with -Dserialized_mpi=true");
94 : abort();
95 : #endif
96 : }
97 : }
98 :
99 : MPI_Errhandler err_handler;
100 : if (MPI_Comm_create_errhandler(comm_error_handler, &err_handler)) {
101 : log_log(LOG_FATAL, "Unable to create MPI error handler");
102 : abort();
103 : }
104 :
105 : MPI_Comm_set_errhandler(MPI_COMM_WORLD, err_handler);
106 :
107 : int helper;
108 : MPI_Comm_rank(MPI_COMM_WORLD, &helper);
109 : nid = helper;
110 : MPI_Comm_size(MPI_COMM_WORLD, &helper);
111 : n_nodes = helper;
112 : }
113 :
114 : /**
115 : * @brief Finalizes the MPI environment
116 : */
117 1 : void mpi_global_fini(void)
118 : {
119 : MPI_Errhandler err_handler;
120 : MPI_Comm_get_errhandler(MPI_COMM_WORLD, &err_handler);
121 : MPI_Errhandler_free(&err_handler);
122 :
123 : MPI_Finalize();
124 : }
125 :
126 : /**
127 : * @brief Sends a model message to a LP residing on another node
128 : * @param msg the message to send
129 : * @param dest_nid the id of the node where the targeted LP resides
130 : *
131 : * This function also calls the relevant handlers in order to keep, for example,
132 : * the non blocking gvt algorithm running.
133 : * Note that when this function returns, the message may have not been actually
134 : * sent. We don't need to actively check for sending completion: the platform,
135 : * during the fossil collection, leverages the gvt to make sure the message has
136 : * been indeed sent and processed before freeing it.
137 : */
138 1 : void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
139 : {
140 : msg->msg_id = msg_id_get(msg, gvt_phase_get());
141 : gvt_on_remote_msg_send(dest_nid);
142 :
143 : mpi_lock();
144 : MPI_Request req;
145 : MPI_Isend(msg, msg_bare_size(msg), MPI_BYTE, dest_nid, 0,
146 : MPI_COMM_WORLD, &req);
147 : MPI_Request_free(&req);
148 : mpi_unlock();
149 : }
150 :
151 : /**
152 : * @brief Sends a model anti-message to a LP residing on another node
153 : * @param msg the message to rollback
154 : * @param dest_nid the id of the node where the targeted LP resides
155 : *
156 : * This function also calls the relevant handlers in order to keep, for example,
157 : * the non blocking gvt algorithm running.
158 : * Note that when this function returns, the anti-message may have not been sent
159 : * yet. We don't need to actively check for sending completion: the platform,
160 : * during the fossil collection, leverages the gvt to make sure the message has
161 : * been indeed sent and processed before freeing it.
162 : */
163 1 : void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
164 : {
165 : msg_id_anti_phase_set(msg->msg_id, gvt_phase_get());
166 : gvt_on_remote_msg_send(dest_nid);
167 :
168 : mpi_lock();
169 : MPI_Request req;
170 : MPI_Isend(&msg->msg_id, sizeof(msg->msg_id), MPI_BYTE, dest_nid, 0,
171 : MPI_COMM_WORLD, &req);
172 : MPI_Request_free(&req);
173 : mpi_unlock();
174 : }
175 :
176 : /**
177 : * @brief Sends a platform control message to all the other nodes
178 : * @param ctrl the control message to send
179 : */
180 1 : void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
181 : {
182 : MPI_Request req;
183 : nid_t i = n_nodes;
184 : mpi_lock();
185 : while (i--) {
186 : if(i == nid)
187 : continue;
188 : MPI_Isend(NULL, 0, MPI_BYTE, i, ctrl, MPI_COMM_WORLD, &req);
189 : MPI_Request_free(&req);
190 : }
191 : mpi_unlock();
192 : }
193 :
194 : /**
195 : * @brief Sends a platform control message to a specific nodes
196 : * @param ctrl the control message to send
197 : * @param dest the id of the destination node
198 : */
199 1 : void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
200 : {
201 : MPI_Request req;
202 : mpi_lock();
203 : MPI_Isend(NULL, 0, MPI_BYTE, dest, ctrl, MPI_COMM_WORLD, &req);
204 : MPI_Request_free(&req);
205 : mpi_unlock();
206 : }
207 :
208 : /**
209 : * @brief Empties the queue of incoming MPI messages, doing the right thing for
210 : * each one of them.
211 : *
212 : * This routine checks, using the MPI probing mechanism, for new remote messages
213 : * and it handles them accordingly.
214 : * Control messages are handled by the respective platform handler.
215 : * Simulation messages are unpacked and put in the queue.
216 : * Anti-messages are matched and accordingly processed by the message map.
217 : */
218 1 : void mpi_remote_msg_handle(void)
219 : {
220 : int pending;
221 : MPI_Message mpi_msg;
222 : MPI_Status status;
223 :
224 : while (1) {
225 : if (!mpi_trylock())
226 : return;
227 :
228 : MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
229 : &pending, &mpi_msg, &status);
230 :
231 : if (!pending) {
232 : mpi_unlock();
233 : return;
234 : }
235 :
236 : if (unlikely(status.MPI_TAG)) {
237 : MPI_Mrecv(NULL, 0, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
238 : mpi_unlock();
239 : switch(status.MPI_TAG){
240 : case MSG_CTRL_GVT_START:
241 : gvt_on_start_ctrl_msg();
242 : break;
243 : case MSG_CTRL_GVT_DONE:
244 : gvt_on_done_ctrl_msg();
245 : break;
246 : case MSG_CTRL_TERMINATION:
247 : termination_on_ctrl_msg();
248 : break;
249 : }
250 : } else {
251 : int size;
252 : MPI_Get_count(&status, MPI_BYTE, &size);
253 :
254 : if (unlikely(size == sizeof(uintptr_t))) {
255 : uintptr_t anti_id;
256 : MPI_Mrecv(&anti_id, size, MPI_BYTE, &mpi_msg,
257 : MPI_STATUS_IGNORE);
258 : mpi_unlock();
259 :
260 : remote_msg_map_match(anti_id,
261 : status.MPI_SOURCE, NULL);
262 : gvt_on_remote_msg_receive(
263 : msg_id_anti_phase_get(anti_id));
264 : } else {
265 : mpi_unlock();
266 :
267 : struct lp_msg *msg = msg_allocator_alloc(size -
268 : offsetof(struct lp_msg, pl));
269 :
270 : mpi_lock();
271 : MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
272 : MPI_STATUS_IGNORE);
273 : mpi_unlock();
274 :
275 : uintptr_t msg_id = msg->msg_id;
276 : atomic_store_explicit(&msg->flags, 0U,
277 : memory_order_relaxed);
278 : remote_msg_map_match(msg_id,
279 : status.MPI_SOURCE, msg);
280 : msg_queue_insert(msg);
281 : gvt_on_remote_msg_receive(
282 : msg_id_phase_get(msg_id));
283 : }
284 : }
285 : }
286 : }
287 :
288 0 : static MPI_Request reduce_sum_scatter_req = MPI_REQUEST_NULL;
289 :
290 : /**
291 : * @brief Computes the sum-reduction-scatter operation across all nodes.
292 : * @param node_vals a pointer to the addendum vector from the calling node.
293 : * @param result a pointer where the nid-th component of the sum will be stored.
294 : *
295 : * Each node supplies a n_nodes components vector. The sum of all these vector
296 : * is computed and the nid-th component of this vector is stored in @a result.
297 : * It is expected that only a single thread calls this function at a time.
298 : * Each node has to call this function else the result can't be computed.
299 : * It is possible to have a single mpi_reduce_sum_scatter() operation pending at
300 : * a time. Both arguments must point to valid memory regions until
301 : * mpi_reduce_sum_scatter_done() returns true.
302 : */
303 1 : void mpi_reduce_sum_scatter(const unsigned node_vals[n_nodes], unsigned *result)
304 : {
305 : mpi_lock();
306 : MPI_Ireduce_scatter_block(node_vals, result, 1, MPI_UNSIGNED, MPI_SUM,
307 : MPI_COMM_WORLD, &reduce_sum_scatter_req);
308 : mpi_unlock();
309 : }
310 :
311 : /**
312 : * @brief Checks if a previous mpi_reduce_sum_scatter() operation has completed.
313 : * @return true if the previous operation has been completed, false otherwise.
314 : */
315 1 : bool mpi_reduce_sum_scatter_done(void)
316 : {
317 : int flag = 0;
318 : mpi_lock();
319 : MPI_Test(&reduce_sum_scatter_req, &flag, MPI_STATUS_IGNORE);
320 : mpi_unlock();
321 : return flag;
322 : }
323 :
324 0 : static MPI_Request reduce_min_req = MPI_REQUEST_NULL;
325 :
326 : /**
327 : * @brief Computes the min-reduction operation across all nodes.
328 : * @param node_min_p a pointer to the value from the calling node which will
329 : * also be used to store the computed minimum.
330 : *
331 : * Each node supplies a single simtime_t value. The minimum of all these values
332 : * is computed and stored in @a node_min_p itself.
333 : * It is expected that only a single thread calls this function at a time.
334 : * Each node has to call this function else the result can't be computed.
335 : * It is possible to have a single mpi_reduce_min() operation pending at a time.
336 : * Both arguments must point to valid memory regions until mpi_reduce_min_done()
337 : * returns true.
338 : */
339 1 : void mpi_reduce_min(simtime_t *node_min_p)
340 : {
341 : static simtime_t min_buff;
342 : min_buff = *node_min_p;
343 : mpi_lock();
344 : MPI_Iallreduce(&min_buff, node_min_p, 1, MPI_DOUBLE, MPI_MIN,
345 : MPI_COMM_WORLD, &reduce_min_req);
346 : mpi_unlock();
347 : }
348 :
349 : /**
350 : * @brief Checks if a previous mpi_reduce_min() operation has completed.
351 : * @return true if the previous operation has been completed, false otherwise.
352 : */
353 1 : bool mpi_reduce_min_done(void)
354 : {
355 : int flag = 0;
356 : mpi_lock();
357 : MPI_Test(&reduce_min_req, &flag, MPI_STATUS_IGNORE);
358 : mpi_unlock();
359 : return flag;
360 : }
361 :
362 : /**
363 : * @brief A node barrier
364 : */
365 1 : void mpi_node_barrier(void)
366 : {
367 : mpi_lock();
368 : MPI_Barrier(MPI_COMM_WORLD);
369 : mpi_unlock();
370 : }
371 :
372 : /**
373 : * @brief Sends a byte buffer to another node
374 : * @param data a pointer to the buffer to send
375 : * @param data_size the buffer size
376 : * @param dest the id of the destination node
377 : *
378 : * This operation blocks the execution flow until the destination node receives
379 : * the data with mpi_raw_data_blocking_rcv().
380 : */
381 1 : void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
382 : {
383 : mpi_lock();
384 : MPI_Send(data, data_size, MPI_BYTE, dest, MSG_CTRL_RAW, MPI_COMM_WORLD);
385 : mpi_unlock();
386 : }
387 :
388 : /**
389 : * @brief Receives a byte buffer from another node
390 : * @param data_size_p where to write the size of the received data
391 : * @param src the id of the sender node
392 : * @return the buffer allocated with mm_alloc() containing the received data
393 : *
394 : * This operation blocks the execution flow until the sender node actually sends
395 : * the data with mpi_raw_data_blocking_send().
396 : */
397 1 : void *mpi_blocking_data_rcv(int *data_size_p, nid_t src)
398 : {
399 : MPI_Status status;
400 : MPI_Message mpi_msg;
401 : mpi_lock();
402 : MPI_Mprobe(src, MSG_CTRL_RAW, MPI_COMM_WORLD, &mpi_msg, &status);
403 : int data_size;
404 : MPI_Get_count(&status, MPI_BYTE, &data_size);
405 : char *ret = mm_alloc(data_size);
406 : MPI_Mrecv(ret, data_size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
407 : if (data_size_p != NULL)
408 : *data_size_p = data_size;
409 : mpi_unlock();
410 : return ret;
411 : }
412 :
413 : #endif
|