Line data Source code
1 1 : /**
2 : * @file distributed/mpi.c
3 : *
4 : * @brief MPI Support Module
5 : *
6 : * This module implements all basic MPI facilities to let the distributed
7 : * execution of a simulation model take place consistently.
8 : *
9 : * Several facilities are thread-safe, others are not. Check carefully which
10 : * of these can be used by worker threads without coordination when relying
11 : * on this module.
12 : *
13 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
14 : * SPDX-License-Identifier: GPL-3.0-only
15 : */
16 : #include <distributed/mpi.h>
17 :
18 : #include <core/core.h>
19 : #include <core/sync.h>
20 : #include <datatypes/array.h>
21 : #include <datatypes/msg_queue.h>
22 : #include <gvt/gvt.h>
23 : #include <gvt/termination.h>
24 : #include <mm/msg_allocator.h>
25 :
26 : #include <mpi.h>
27 :
28 0 : #define MSG_CTRL_RAW (MSG_CTRL_TERMINATION + 1)
29 :
30 : #ifdef ROOTSIM_MPI_SERIALIZABLE
31 :
32 : static bool mpi_serialize;
33 : static spinlock_t mpi_spinlock;
34 :
35 : #define mpi_lock() if (mpi_serialize) spin_lock(&mpi_spinlock)
36 : #define mpi_unlock() if (mpi_serialize) spin_unlock(&mpi_spinlock)
37 : #define mpi_trylock() (!mpi_serialize || spin_trylock(&mpi_spinlock))
38 :
39 : #else
40 :
41 0 : #define mpi_lock()
42 0 : #define mpi_unlock()
43 0 : #define mpi_trylock() (true)
44 :
45 : #endif
46 :
47 : /**
48 : * @brief Handles a MPI error
49 : * @param comm the MPI communicator involved in the error
50 : * @param err_code_p a pointer to the error code
51 : * @param ... an implementation specific list of additional arguments in which
52 : * we are not interested
53 : *
54 : * This is registered in mpi_global_init() to print meaningful MPI errors
55 : */
56 1 : static void comm_error_handler(MPI_Comm *comm, int *err_code_p, ...)
57 : {
58 : (void) comm;
59 : log_log(LOG_FATAL, "MPI error with code %d!", *err_code_p);
60 :
61 : int err_len;
62 : char err_str[MPI_MAX_ERROR_STRING];
63 : MPI_Error_string(*err_code_p, err_str, &err_len);
64 : log_log(LOG_FATAL, "MPI error msg is %s ", err_str);
65 :
66 : exit(-1);
67 : }
68 :
69 : /**
70 : * @brief Initializes the MPI environment
71 : * @param argc_p a pointer to the OS supplied argc
72 : * @param argv_p a pointer to the OS supplied argv
73 : */
74 1 : void mpi_global_init(int *argc_p, char ***argv_p)
75 : {
76 : int thread_lvl = MPI_THREAD_SINGLE;
77 : MPI_Init_thread(argc_p, argv_p, MPI_THREAD_MULTIPLE, &thread_lvl);
78 :
79 : if (thread_lvl < MPI_THREAD_MULTIPLE) {
80 : if (thread_lvl < MPI_THREAD_SERIALIZED) {
81 : log_log(LOG_FATAL,
82 : "This MPI implementation does not support threaded access");
83 : abort();
84 : } else {
85 : #ifdef ROOTSIM_MPI_SERIALIZABLE
86 : mpi_serialize = true;
87 : spin_init(&mpi_spinlock);
88 : #else
89 : log_log(LOG_FATAL,
90 : "This MPI implementation only supports serialized calls: "
91 : "you need to build ROOT-Sim with -Dserialized_mpi=true");
92 : abort();
93 : #endif
94 : }
95 : }
96 :
97 : MPI_Errhandler err_handler;
98 : if (MPI_Comm_create_errhandler(comm_error_handler, &err_handler)) {
99 : log_log(LOG_FATAL, "Unable to create MPI error handler");
100 : abort();
101 : }
102 :
103 : MPI_Comm_set_errhandler(MPI_COMM_WORLD, err_handler);
104 :
105 : int helper;
106 : MPI_Comm_rank(MPI_COMM_WORLD, &helper);
107 : nid = helper;
108 : MPI_Comm_size(MPI_COMM_WORLD, &helper);
109 : n_nodes = helper;
110 : }
111 :
112 : /**
113 : * @brief Finalizes the MPI environment
114 : */
115 1 : void mpi_global_fini(void)
116 : {
117 : MPI_Errhandler err_handler;
118 : MPI_Comm_get_errhandler(MPI_COMM_WORLD, &err_handler);
119 : MPI_Errhandler_free(&err_handler);
120 :
121 : MPI_Finalize();
122 : }
123 :
124 : /**
125 : * @brief Sends a model message to a LP residing on another node
126 : * @param msg the message to send
127 : * @param dest_nid the id of the node where the targeted LP resides
128 : *
129 : * This function also calls the relevant handlers in order to keep, for example,
130 : * the non blocking gvt algorithm running.
131 : * Note that when this function returns, the message may have not been actually
132 : * sent. We don't need to actively check for sending completion: the platform,
133 : * during the fossil collection, leverages the gvt to make sure the message has
134 : * been indeed sent and processed before freeing it.
135 : */
136 1 : void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
137 : {
138 : gvt_remote_msg_send(msg, dest_nid);
139 :
140 : mpi_lock();
141 : MPI_Request req;
142 : MPI_Isend(msg, msg_bare_size(msg), MPI_BYTE, dest_nid, 0,
143 : MPI_COMM_WORLD, &req);
144 : MPI_Request_free(&req);
145 : mpi_unlock();
146 : }
147 :
148 : /**
149 : * @brief Sends a model anti-message to a LP residing on another node
150 : * @param msg the message to rollback
151 : * @param dest_nid the id of the node where the targeted LP resides
152 : *
153 : * This function also calls the relevant handlers in order to keep, for example,
154 : * the non blocking gvt algorithm running.
155 : * Note that when this function returns, the anti-message may have not been sent
156 : * yet. We don't need to actively check for sending completion: the platform,
157 : * during the fossil collection, leverages the gvt to make sure the message has
158 : * been indeed sent and processed before freeing it.
159 : */
160 1 : void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
161 : {
162 : gvt_remote_anti_msg_send(msg, dest_nid);
163 :
164 : mpi_lock();
165 : MPI_Request req;
166 : MPI_Isend(msg, msg_anti_size(), MPI_BYTE, dest_nid, 0, MPI_COMM_WORLD,
167 : &req);
168 : MPI_Request_free(&req);
169 : mpi_unlock();
170 : }
171 :
172 : /**
173 : * @brief Sends a platform control message to all the nodes, including self
174 : * @param ctrl the control message to send
175 : */
176 1 : void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
177 : {
178 : MPI_Request req;
179 : nid_t i = n_nodes;
180 : mpi_lock();
181 : while (i--) {
182 : MPI_Isend(NULL, 0, MPI_BYTE, i, ctrl, MPI_COMM_WORLD, &req);
183 : MPI_Request_free(&req);
184 : }
185 : mpi_unlock();
186 : }
187 :
188 : /**
189 : * @brief Sends a platform control message to a specific nodes
190 : * @param ctrl the control message to send
191 : * @param dest the id of the destination node
192 : */
193 1 : void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
194 : {
195 : MPI_Request req;
196 : mpi_lock();
197 : MPI_Isend(NULL, 0, MPI_BYTE, dest, ctrl, MPI_COMM_WORLD, &req);
198 : MPI_Request_free(&req);
199 : mpi_unlock();
200 : }
201 :
202 : /**
203 : * @brief Empties the queue of incoming MPI messages, doing the right thing for
204 : * each one of them.
205 : *
206 : * This routine checks, using the MPI probing mechanism, for new remote messages
207 : * and it handles them accordingly.
208 : * Control messages are handled by the respective platform handler.
209 : * Simulation messages are unpacked and put in the queue.
210 : * Anti-messages are matched and accordingly processed by the message map.
211 : */
212 1 : void mpi_remote_msg_handle(void)
213 : {
214 : int pending;
215 : MPI_Message mpi_msg;
216 : MPI_Status status;
217 :
218 : while (1) {
219 : if (!mpi_trylock())
220 : return;
221 :
222 : MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
223 : &pending, &mpi_msg, &status);
224 :
225 : if (!pending) {
226 : mpi_unlock();
227 : return;
228 : }
229 :
230 : if (unlikely(status.MPI_TAG)) {
231 : MPI_Mrecv(NULL, 0, MPI_BYTE, &mpi_msg,
232 : MPI_STATUS_IGNORE);
233 : mpi_unlock();
234 : switch (status.MPI_TAG) {
235 : case MSG_CTRL_GVT_START:
236 : gvt_start_processing();
237 : break;
238 : case MSG_CTRL_GVT_DONE:
239 : gvt_on_done_ctrl_msg();
240 : break;
241 : case MSG_CTRL_TERMINATION:
242 : termination_on_ctrl_msg();
243 : break;
244 : default:
245 : __builtin_unreachable();
246 : }
247 : continue;
248 : }
249 :
250 : int size;
251 : MPI_Get_count(&status, MPI_BYTE, &size);
252 : struct lp_msg *msg;
253 : if (unlikely(size == msg_anti_size())) {
254 : msg = msg_allocator_alloc(0);
255 : MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
256 : MPI_STATUS_IGNORE);
257 : mpi_unlock();
258 :
259 : gvt_remote_anti_msg_receive(msg);
260 : } else {
261 : msg = msg_allocator_alloc(size -
262 : offsetof(struct lp_msg, pl));
263 : MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
264 : MPI_STATUS_IGNORE);
265 : mpi_unlock();
266 :
267 : gvt_remote_msg_receive(msg);
268 : }
269 : msg_queue_insert(msg);
270 : }
271 : }
272 :
273 0 : static MPI_Request reduce_sum_scatter_req = MPI_REQUEST_NULL;
274 :
275 : /**
276 : * @brief Computes the sum-reduction-scatter operation across all nodes.
277 : * @param node_vals a pointer to the addendum vector from the calling node.
278 : * @param result a pointer where the nid-th component of the sum will be stored.
279 : *
280 : * Each node supplies a n_nodes components vector. The sum of all these vector
281 : * is computed and the nid-th component of this vector is stored in @a result.
282 : * It is expected that only a single thread calls this function at a time.
283 : * Each node has to call this function else the result can't be computed.
284 : * It is possible to have a single mpi_reduce_sum_scatter() operation pending at
285 : * a time. Both arguments must point to valid memory regions until
286 : * mpi_reduce_sum_scatter_done() returns true.
287 : */
288 1 : void mpi_reduce_sum_scatter(const uint32_t values[n_nodes], unsigned *result)
289 : {
290 : mpi_lock();
291 : MPI_Ireduce_scatter_block(values, result, 1, MPI_UINT32_T, MPI_SUM,
292 : MPI_COMM_WORLD, &reduce_sum_scatter_req);
293 : mpi_unlock();
294 : }
295 :
296 : /**
297 : * @brief Checks if a previous mpi_reduce_sum_scatter() operation has completed.
298 : * @return true if the previous operation has been completed, false otherwise.
299 : */
300 1 : bool mpi_reduce_sum_scatter_done(void)
301 : {
302 : int flag = 0;
303 : mpi_lock();
304 : MPI_Test(&reduce_sum_scatter_req, &flag, MPI_STATUS_IGNORE);
305 : mpi_unlock();
306 : return flag;
307 : }
308 :
309 0 : static MPI_Request reduce_min_req = MPI_REQUEST_NULL;
310 :
311 : /**
312 : * @brief Computes the min-reduction operation across all nodes.
313 : * @param node_min_p a pointer to the value from the calling node which will
314 : * also be used to store the computed minimum.
315 : *
316 : * Each node supplies a single simtime_t value. The minimum of all these values
317 : * is computed and stored in @a node_min_p itself.
318 : * It is expected that only a single thread calls this function at a time.
319 : * Each node has to call this function else the result can't be computed.
320 : * It is possible to have a single mpi_reduce_min() operation pending at a time.
321 : * Both arguments must point to valid memory regions until mpi_reduce_min_done()
322 : * returns true.
323 : */
324 1 : void mpi_reduce_min(double *node_min_p)
325 : {
326 : static double min_buff;
327 : min_buff = *node_min_p;
328 : mpi_lock();
329 : MPI_Iallreduce(&min_buff, node_min_p, 1, MPI_DOUBLE, MPI_MIN,
330 : MPI_COMM_WORLD, &reduce_min_req);
331 : mpi_unlock();
332 : }
333 :
334 : /**
335 : * @brief Checks if a previous mpi_reduce_min() operation has completed.
336 : * @return true if the previous operation has been completed, false otherwise.
337 : */
338 1 : bool mpi_reduce_min_done(void)
339 : {
340 : int flag = 0;
341 : mpi_lock();
342 : MPI_Test(&reduce_min_req, &flag, MPI_STATUS_IGNORE);
343 : mpi_unlock();
344 : return flag;
345 : }
346 :
347 : /**
348 : * @brief A node barrier
349 : */
350 1 : void mpi_node_barrier(void)
351 : {
352 : mpi_lock();
353 : MPI_Barrier(MPI_COMM_WORLD);
354 : mpi_unlock();
355 : }
356 :
357 : /**
358 : * @brief Sends a byte buffer to another node
359 : * @param data a pointer to the buffer to send
360 : * @param data_size the buffer size
361 : * @param dest the id of the destination node
362 : *
363 : * This operation blocks the execution flow until the destination node receives
364 : * the data with mpi_raw_data_blocking_rcv().
365 : */
366 1 : void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
367 : {
368 : mpi_lock();
369 : MPI_Send(data, data_size, MPI_BYTE, dest, MSG_CTRL_RAW, MPI_COMM_WORLD);
370 : mpi_unlock();
371 : }
372 :
373 : /**
374 : * @brief Receives a byte buffer from another node
375 : * @param data_size_p where to write the size of the received data
376 : * @param src the id of the sender node
377 : * @return the buffer allocated with mm_alloc() containing the received data
378 : *
379 : * This operation blocks the execution flow until the sender node actually sends
380 : * the data with mpi_raw_data_blocking_send().
381 : */
382 1 : void *mpi_blocking_data_rcv(int *data_size_p, nid_t src)
383 : {
384 : MPI_Status status;
385 : MPI_Message mpi_msg;
386 : mpi_lock();
387 : MPI_Mprobe(src, MSG_CTRL_RAW, MPI_COMM_WORLD, &mpi_msg, &status);
388 : int data_size;
389 : MPI_Get_count(&status, MPI_BYTE, &data_size);
390 : char *ret = mm_alloc(data_size);
391 : MPI_Mrecv(ret, data_size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
392 : if (data_size_p != NULL)
393 : *data_size_p = data_size;
394 : mpi_unlock();
395 : return ret;
396 : }
|