LCOV - code coverage report
Current view: top level - core/src/distributed - mpi.c Hit Total Coverage
Test: ROOT-Sim master Documentation Coverage Lines: 16 22 72.7 %
Date: 2021-03-25 15:11:55

          Line data    Source code
       1           1 : /**
       2             :  * @file distributed/mpi.c
       3             :  *
       4             :  * @brief MPI Support Module
       5             :  *
       6             :  * This module implements all basic MPI facilities to let the distributed
       7             :  * execution of a simulation model take place consistently.
       8             :  *
       9             :  * Several facilities are thread-safe, others are not. Check carefully which
      10             :  * of these can be used by worker threads without coordination when relying
      11             :  * on this module.
      12             :  *
      13             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
      14             :  * SPDX-License-Identifier: GPL-3.0-only
      15             :  */
      16             : #include <distributed/mpi.h>
      17             : 
      18             : #include <core/core.h>
      19             : #include <core/sync.h>
      20             : #include <datatypes/array.h>
      21             : #include <datatypes/msg_queue.h>
      22             : #include <gvt/gvt.h>
      23             : #include <gvt/termination.h>
      24             : #include <mm/msg_allocator.h>
      25             : 
      26             : #include <mpi.h>
      27             : 
      28           0 : #define MSG_CTRL_RAW (MSG_CTRL_TERMINATION + 1)
      29             : 
      30             : #ifdef ROOTSIM_MPI_SERIALIZABLE
      31             : 
      32             : static bool mpi_serialize;
      33             : static spinlock_t mpi_spinlock;
      34             : 
      35             : #define mpi_lock()      if (mpi_serialize) spin_lock(&mpi_spinlock)
      36             : #define mpi_unlock()    if (mpi_serialize) spin_unlock(&mpi_spinlock)
      37             : #define mpi_trylock()   (!mpi_serialize || spin_trylock(&mpi_spinlock))
      38             : 
      39             : #else
      40             : 
      41           0 : #define mpi_lock()
      42           0 : #define mpi_unlock()
      43           0 : #define mpi_trylock()   (true)
      44             : 
      45             : #endif
      46             : 
      47             : /**
      48             :  * @brief Handles a MPI error
      49             :  * @param comm the MPI communicator involved in the error
      50             :  * @param err_code_p a pointer to the error code
      51             :  * @param ... an implementation specific list of additional arguments in which
      52             :  *            we are not interested
      53             :  *
      54             :  * This is registered in mpi_global_init() to print meaningful MPI errors
      55             :  */
      56           1 : static void comm_error_handler(MPI_Comm *comm, int *err_code_p, ...)
      57             : {
      58             :         (void) comm;
      59             :         log_log(LOG_FATAL, "MPI error with code %d!", *err_code_p);
      60             : 
      61             :         int err_len;
      62             :         char err_str[MPI_MAX_ERROR_STRING];
      63             :         MPI_Error_string(*err_code_p, err_str, &err_len);
      64             :         log_log(LOG_FATAL, "MPI error msg is %s ", err_str);
      65             : 
      66             :         exit(-1);
      67             : }
      68             : 
      69             : /**
      70             :  * @brief Initializes the MPI environment
      71             :  * @param argc_p a pointer to the OS supplied argc
      72             :  * @param argv_p a pointer to the OS supplied argv
      73             :  */
      74           1 : void mpi_global_init(int *argc_p, char ***argv_p)
      75             : {
      76             :         int thread_lvl = MPI_THREAD_SINGLE;
      77             :         MPI_Init_thread(argc_p, argv_p, MPI_THREAD_MULTIPLE, &thread_lvl);
      78             : 
      79             :         if (thread_lvl < MPI_THREAD_MULTIPLE) {
      80             :                 if (thread_lvl < MPI_THREAD_SERIALIZED) {
      81             :                         log_log(LOG_FATAL,
      82             :                                 "This MPI implementation does not support threaded access");
      83             :                         abort();
      84             :                 } else {
      85             : #ifdef ROOTSIM_MPI_SERIALIZABLE
      86             :                         mpi_serialize = true;
      87             :                         spin_init(&mpi_spinlock);
      88             : #else
      89             :                         log_log(LOG_FATAL,
      90             :                                 "This MPI implementation only supports serialized calls: "
      91             :                                 "you need to build ROOT-Sim with -Dserialized_mpi=true");
      92             :                         abort();
      93             : #endif
      94             :                 }
      95             :         }
      96             : 
      97             :         MPI_Errhandler err_handler;
      98             :         if (MPI_Comm_create_errhandler(comm_error_handler, &err_handler)) {
      99             :                 log_log(LOG_FATAL, "Unable to create MPI error handler");
     100             :                 abort();
     101             :         }
     102             : 
     103             :         MPI_Comm_set_errhandler(MPI_COMM_WORLD, err_handler);
     104             : 
     105             :         int helper;
     106             :         MPI_Comm_rank(MPI_COMM_WORLD, &helper);
     107             :         nid = helper;
     108             :         MPI_Comm_size(MPI_COMM_WORLD, &helper);
     109             :         n_nodes = helper;
     110             : }
     111             : 
     112             : /**
     113             :  * @brief Finalizes the MPI environment
     114             :  */
     115           1 : void mpi_global_fini(void)
     116             : {
     117             :         MPI_Errhandler err_handler;
     118             :         MPI_Comm_get_errhandler(MPI_COMM_WORLD, &err_handler);
     119             :         MPI_Errhandler_free(&err_handler);
     120             : 
     121             :         MPI_Finalize();
     122             : }
     123             : 
     124             : /**
     125             :  * @brief Sends a model message to a LP residing on another node
     126             :  * @param msg the message to send
     127             :  * @param dest_nid the id of the node where the targeted LP resides
     128             :  *
     129             :  * This function also calls the relevant handlers in order to keep, for example,
     130             :  * the non blocking gvt algorithm running.
     131             :  * Note that when this function returns, the message may have not been actually
     132             :  * sent. We don't need to actively check for sending completion: the platform,
     133             :  * during the fossil collection, leverages the gvt to make sure the message has
     134             :  * been indeed sent and processed before freeing it.
     135             :  */
     136           1 : void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
     137             : {
     138             :         gvt_remote_msg_send(msg, dest_nid);
     139             : 
     140             :         mpi_lock();
     141             :         MPI_Request req;
     142             :         MPI_Isend(msg, msg_bare_size(msg), MPI_BYTE, dest_nid, 0,
     143             :                         MPI_COMM_WORLD, &req);
     144             :         MPI_Request_free(&req);
     145             :         mpi_unlock();
     146             : }
     147             : 
     148             : /**
     149             :  * @brief Sends a model anti-message to a LP residing on another node
     150             :  * @param msg the message to rollback
     151             :  * @param dest_nid the id of the node where the targeted LP resides
     152             :  *
     153             :  * This function also calls the relevant handlers in order to keep, for example,
     154             :  * the non blocking gvt algorithm running.
     155             :  * Note that when this function returns, the anti-message may have not been sent
     156             :  * yet. We don't need to actively check for sending completion: the platform,
     157             :  * during the fossil collection, leverages the gvt to make sure the message has
     158             :  * been indeed sent and processed before freeing it.
     159             :  */
     160           1 : void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
     161             : {
     162             :         gvt_remote_anti_msg_send(msg, dest_nid);
     163             : 
     164             :         mpi_lock();
     165             :         MPI_Request req;
     166             :         MPI_Isend(msg, msg_anti_size(), MPI_BYTE, dest_nid, 0, MPI_COMM_WORLD,
     167             :                         &req);
     168             :         MPI_Request_free(&req);
     169             :         mpi_unlock();
     170             : }
     171             : 
     172             : /**
     173             :  * @brief Sends a platform control message to all the nodes, including self
     174             :  * @param ctrl the control message to send
     175             :  */
     176           1 : void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
     177             : {
     178             :         MPI_Request req;
     179             :         nid_t i = n_nodes;
     180             :         mpi_lock();
     181             :         while (i--) {
     182             :                 MPI_Isend(NULL, 0, MPI_BYTE, i, ctrl, MPI_COMM_WORLD, &req);
     183             :                 MPI_Request_free(&req);
     184             :         }
     185             :         mpi_unlock();
     186             : }
     187             : 
     188             : /**
     189             :  * @brief Sends a platform control message to a specific nodes
     190             :  * @param ctrl the control message to send
     191             :  * @param dest the id of the destination node
     192             :  */
     193           1 : void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
     194             : {
     195             :         MPI_Request req;
     196             :         mpi_lock();
     197             :         MPI_Isend(NULL, 0, MPI_BYTE, dest, ctrl, MPI_COMM_WORLD, &req);
     198             :         MPI_Request_free(&req);
     199             :         mpi_unlock();
     200             : }
     201             : 
     202             : /**
     203             :  * @brief Empties the queue of incoming MPI messages, doing the right thing for
     204             :  *        each one of them.
     205             :  *
     206             :  * This routine checks, using the MPI probing mechanism, for new remote messages
     207             :  * and it handles them accordingly.
     208             :  * Control messages are handled by the respective platform handler.
     209             :  * Simulation messages are unpacked and put in the queue.
     210             :  * Anti-messages are matched and accordingly processed by the message map.
     211             :  */
     212           1 : void mpi_remote_msg_handle(void)
     213             : {
     214             :         int pending;
     215             :         MPI_Message mpi_msg;
     216             :         MPI_Status status;
     217             : 
     218             :         while (1) {
     219             :                 if (!mpi_trylock())
     220             :                         return;
     221             : 
     222             :                 MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
     223             :                         &pending, &mpi_msg, &status);
     224             : 
     225             :                 if (!pending) {
     226             :                         mpi_unlock();
     227             :                         return;
     228             :                 }
     229             : 
     230             :                 if (unlikely(status.MPI_TAG)) {
     231             :                         MPI_Mrecv(NULL, 0, MPI_BYTE, &mpi_msg,
     232             :                                         MPI_STATUS_IGNORE);
     233             :                         mpi_unlock();
     234             :                         switch (status.MPI_TAG) {
     235             :                         case MSG_CTRL_GVT_START:
     236             :                                 gvt_start_processing();
     237             :                                 break;
     238             :                         case MSG_CTRL_GVT_DONE:
     239             :                                 gvt_on_done_ctrl_msg();
     240             :                                 break;
     241             :                         case MSG_CTRL_TERMINATION:
     242             :                                 termination_on_ctrl_msg();
     243             :                                 break;
     244             :                         default:
     245             :                                 __builtin_unreachable();
     246             :                         }
     247             :                         continue;
     248             :                 }
     249             : 
     250             :                 int size;
     251             :                 MPI_Get_count(&status, MPI_BYTE, &size);
     252             :                 struct lp_msg *msg;
     253             :                 if (unlikely(size == msg_anti_size())) {
     254             :                         msg = msg_allocator_alloc(0);
     255             :                         MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
     256             :                                         MPI_STATUS_IGNORE);
     257             :                         mpi_unlock();
     258             : 
     259             :                         gvt_remote_anti_msg_receive(msg);
     260             :                 } else {
     261             :                         msg = msg_allocator_alloc(size -
     262             :                                         offsetof(struct lp_msg, pl));
     263             :                         MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
     264             :                                         MPI_STATUS_IGNORE);
     265             :                         mpi_unlock();
     266             : 
     267             :                         gvt_remote_msg_receive(msg);
     268             :                 }
     269             :                 msg_queue_insert(msg);
     270             :         }
     271             : }
     272             : 
     273           0 : static MPI_Request reduce_sum_scatter_req = MPI_REQUEST_NULL;
     274             : 
     275             : /**
     276             :  * @brief Computes the sum-reduction-scatter operation across all nodes.
     277             :  * @param node_vals a pointer to the addendum vector from the calling node.
     278             :  * @param result a pointer where the nid-th component of the sum will be stored.
     279             :  *
     280             :  * Each node supplies a n_nodes components vector. The sum of all these vector
     281             :  * is computed and the nid-th component of this vector is stored in @a result.
     282             :  * It is expected that only a single thread calls this function at a time.
     283             :  * Each node has to call this function else the result can't be computed.
     284             :  * It is possible to have a single mpi_reduce_sum_scatter() operation pending at
     285             :  * a time. Both arguments must point to valid memory regions until
     286             :  * mpi_reduce_sum_scatter_done() returns true.
     287             :  */
     288           1 : void mpi_reduce_sum_scatter(const uint32_t values[n_nodes], unsigned *result)
     289             : {
     290             :         mpi_lock();
     291             :         MPI_Ireduce_scatter_block(values, result, 1, MPI_UINT32_T, MPI_SUM,
     292             :                         MPI_COMM_WORLD, &reduce_sum_scatter_req);
     293             :         mpi_unlock();
     294             : }
     295             : 
     296             : /**
     297             :  * @brief Checks if a previous mpi_reduce_sum_scatter() operation has completed.
     298             :  * @return true if the previous operation has been completed, false otherwise.
     299             :  */
     300           1 : bool mpi_reduce_sum_scatter_done(void)
     301             : {
     302             :         int flag = 0;
     303             :         mpi_lock();
     304             :         MPI_Test(&reduce_sum_scatter_req, &flag, MPI_STATUS_IGNORE);
     305             :         mpi_unlock();
     306             :         return flag;
     307             : }
     308             : 
     309           0 : static MPI_Request reduce_min_req = MPI_REQUEST_NULL;
     310             : 
     311             : /**
     312             :  * @brief Computes the min-reduction operation across all nodes.
     313             :  * @param node_min_p a pointer to the value from the calling node which will
     314             :  *                   also be used to store the computed minimum.
     315             :  *
     316             :  * Each node supplies a single simtime_t value. The minimum of all these values
     317             :  * is computed and stored in @a node_min_p itself.
     318             :  * It is expected that only a single thread calls this function at a time.
     319             :  * Each node has to call this function else the result can't be computed.
     320             :  * It is possible to have a single mpi_reduce_min() operation pending at a time.
     321             :  * Both arguments must point to valid memory regions until mpi_reduce_min_done()
     322             :  * returns true.
     323             :  */
     324           1 : void mpi_reduce_min(double *node_min_p)
     325             : {
     326             :         static double min_buff;
     327             :         min_buff = *node_min_p;
     328             :         mpi_lock();
     329             :         MPI_Iallreduce(&min_buff, node_min_p, 1, MPI_DOUBLE, MPI_MIN,
     330             :                 MPI_COMM_WORLD, &reduce_min_req);
     331             :         mpi_unlock();
     332             : }
     333             : 
     334             : /**
     335             :  * @brief Checks if a previous mpi_reduce_min() operation has completed.
     336             :  * @return true if the previous operation has been completed, false otherwise.
     337             :  */
     338           1 : bool mpi_reduce_min_done(void)
     339             : {
     340             :         int flag = 0;
     341             :         mpi_lock();
     342             :         MPI_Test(&reduce_min_req, &flag, MPI_STATUS_IGNORE);
     343             :         mpi_unlock();
     344             :         return flag;
     345             : }
     346             : 
     347             : /**
     348             :  * @brief A node barrier
     349             :  */
     350           1 : void mpi_node_barrier(void)
     351             : {
     352             :         mpi_lock();
     353             :         MPI_Barrier(MPI_COMM_WORLD);
     354             :         mpi_unlock();
     355             : }
     356             : 
     357             : /**
     358             :  * @brief Sends a byte buffer to another node
     359             :  * @param data a pointer to the buffer to send
     360             :  * @param data_size the buffer size
     361             :  * @param dest the id of the destination node
     362             :  *
     363             :  * This operation blocks the execution flow until the destination node receives
     364             :  * the data with mpi_raw_data_blocking_rcv().
     365             :  */
     366           1 : void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
     367             : {
     368             :         mpi_lock();
     369             :         MPI_Send(data, data_size, MPI_BYTE, dest, MSG_CTRL_RAW, MPI_COMM_WORLD);
     370             :         mpi_unlock();
     371             : }
     372             : 
     373             : /**
     374             :  * @brief Receives a byte buffer from another node
     375             :  * @param data_size_p where to write the size of the received data
     376             :  * @param src the id of the sender node
     377             :  * @return the buffer allocated with mm_alloc() containing the received data
     378             :  *
     379             :  * This operation blocks the execution flow until the sender node actually sends
     380             :  * the data with mpi_raw_data_blocking_send().
     381             :  */
     382           1 : void *mpi_blocking_data_rcv(int *data_size_p, nid_t src)
     383             : {
     384             :         MPI_Status status;
     385             :         MPI_Message mpi_msg;
     386             :         mpi_lock();
     387             :         MPI_Mprobe(src, MSG_CTRL_RAW, MPI_COMM_WORLD, &mpi_msg, &status);
     388             :         int data_size;
     389             :         MPI_Get_count(&status, MPI_BYTE, &data_size);
     390             :         char *ret = mm_alloc(data_size);
     391             :         MPI_Mrecv(ret, data_size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
     392             :         if (data_size_p != NULL)
     393             :                 *data_size_p = data_size;
     394             :         mpi_unlock();
     395             :         return ret;
     396             : }

Generated by: LCOV version 1.14