LCOV - code coverage report
Current view: top level - core/src/distributed - mpi.c Hit Total Coverage
Test: ROOT-Sim develop Documentation Coverage Lines: 16 22 72.7 %
Date: 2021-03-02 11:24:52

          Line data    Source code
       1           1 : /**
       2             :  * @file distributed/mpi.c
       3             :  *
       4             :  * @brief MPI Support Module
       5             :  *
       6             :  * This module implements all basic MPI facilities to let the distributed
       7             :  * execution of a simulation model take place consistently.
       8             :  *
       9             :  * Several facilities are thread-safe, others are not. Check carefully which
      10             :  * of these can be used by worker threads without coordination when relying
      11             :  * on this module.
      12             :  *
      13             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
      14             :  * SPDX-License-Identifier: GPL-3.0-only
      15             :  */
      16             : #include <distributed/mpi.h>
      17             : 
      18             : #ifdef ROOTSIM_MPI
      19             : 
      20             : #include <core/core.h>
      21             : #include <core/sync.h>
      22             : #include <datatypes/array.h>
      23             : #include <datatypes/msg_queue.h>
      24             : #include <datatypes/remote_msg_map.h>
      25             : #include <gvt/gvt.h>
      26             : #include <gvt/termination.h>
      27             : #include <mm/msg_allocator.h>
      28             : 
      29             : #include <mpi.h>
      30             : 
      31           0 : #define MSG_CTRL_RAW (MSG_CTRL_TERMINATION + 1)
      32             : 
      33             : #ifdef ROOTSIM_MPI_SERIALIZABLE
      34             : 
      35             : static bool mpi_serialize;
      36             : static spinlock_t mpi_spinlock;
      37             : 
      38             : #define mpi_lock()      if (mpi_serialize) spin_lock(&mpi_spinlock)
      39             : #define mpi_unlock()    if (mpi_serialize) spin_unlock(&mpi_spinlock)
      40             : #define mpi_trylock()   (!mpi_serialize || spin_trylock(&mpi_spinlock))
      41             : 
      42             : #else
      43             : 
      44           0 : #define mpi_lock()
      45           0 : #define mpi_unlock()
      46           0 : #define mpi_trylock()   (true)
      47             : 
      48             : #endif
      49             : 
      50             : /**
      51             :  * @brief Handles a MPI error
      52             :  * @param comm the MPI communicator involved in the error
      53             :  * @param err_code_p a pointer to the error code
      54             :  * @param ... an implementation specific list of additional arguments in which
      55             :  *            we are not interested
      56             :  *
      57             :  * This handler is registered in mpi_global_init() to print meaningful MPI errors
      58             :  */
      59           1 : static void comm_error_handler(MPI_Comm *comm, int *err_code_p, ...)
      60             : {
      61             :         (void) comm;
      62             :         log_log(LOG_FATAL, "MPI error with code %d!", *err_code_p);
      63             : 
      64             :         int err_len;
      65             :         char err_str[MPI_MAX_ERROR_STRING];
      66             :         MPI_Error_string(*err_code_p, err_str, &err_len);
      67             :         log_log(LOG_FATAL, "MPI error msg is %s ", err_str);
      68             : 
      69             :         exit(-1);
      70             : }
      71             : 
      72             : /**
      73             :  * @brief Initializes the MPI environment
      74             :  * @param argc_p a pointer to the OS supplied argc
      75             :  * @param argv_p a pointer to the OS supplied argv
      76             :  */
      77           1 : void mpi_global_init(int *argc_p, char ***argv_p)
      78             : {
      79             :         int thread_lvl = MPI_THREAD_SINGLE;
      80             :         MPI_Init_thread(argc_p, argv_p, MPI_THREAD_MULTIPLE, &thread_lvl);
      81             : 
      82             :         if (thread_lvl < MPI_THREAD_MULTIPLE) {
      83             :                 if (thread_lvl < MPI_THREAD_SERIALIZED) {
      84             :                         log_log(LOG_FATAL,
      85             :                                 "This MPI implementation does not support threaded access");
      86             :                         abort();
      87             :                 } else {
      88             : #ifdef ROOTSIM_MPI_SERIALIZABLE
      89             :                         mpi_serialize = true;
      90             :                         spin_init(&mpi_spinlock);
      91             : #else
      92             :                         log_log(LOG_FATAL,
      93             :                                 "This MPI implementation only supports serialized calls: you need to build ROOT-Sim with -Dserialized_mpi=true");
      94             :                         abort();
      95             : #endif
      96             :                 }
      97             :         }
      98             : 
      99             :         MPI_Errhandler err_handler;
     100             :         if (MPI_Comm_create_errhandler(comm_error_handler, &err_handler)) {
     101             :                 log_log(LOG_FATAL, "Unable to create MPI error handler");
     102             :                 abort();
     103             :         }
     104             : 
     105             :         MPI_Comm_set_errhandler(MPI_COMM_WORLD, err_handler);
     106             : 
     107             :         int helper;
     108             :         MPI_Comm_rank(MPI_COMM_WORLD, &helper);
     109             :         nid = helper;
     110             :         MPI_Comm_size(MPI_COMM_WORLD, &helper);
     111             :         n_nodes = helper;
     112             : }
     113             : 
     114             : /**
     115             :  * @brief Finalizes the MPI environment
     116             :  */
     117           1 : void mpi_global_fini(void)
     118             : {
     119             :         MPI_Errhandler err_handler;
     120             :         MPI_Comm_get_errhandler(MPI_COMM_WORLD, &err_handler);
     121             :         MPI_Errhandler_free(&err_handler);
     122             : 
     123             :         MPI_Finalize();
     124             : }
     125             : 
     126             : /**
     127             :  * @brief Sends a model message to a LP residing on another node
     128             :  * @param msg the message to send
     129             :  * @param dest_nid the id of the node where the targeted LP resides
     130             :  *
     131             :  * This function also calls the relevant handlers in order to keep, for example,
     132             :  * the non blocking gvt algorithm running.
     133             :  * Note that when this function returns, the message may have not been actually
     134             :  * sent. We don't need to actively check for sending completion: the platform,
     135             :  * during the fossil collection, leverages the gvt to make sure the message has
     136             :  * been indeed sent and processed before freeing it.
     137             :  */
     138           1 : void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
     139             : {
     140             :         msg->msg_id = msg_id_get(msg, gvt_phase_get());
     141             :         gvt_on_remote_msg_send(dest_nid);
     142             : 
     143             :         mpi_lock();
     144             :         MPI_Request req;
     145             :         MPI_Isend(msg, msg_bare_size(msg), MPI_BYTE, dest_nid, 0,
     146             :                 MPI_COMM_WORLD, &req);
     147             :         MPI_Request_free(&req);
     148             :         mpi_unlock();
     149             : }
     150             : 
     151             : /**
     152             :  * @brief Sends a model anti-message to a LP residing on another node
     153             :  * @param msg the message to rollback
     154             :  * @param dest_nid the id of the node where the targeted LP resides
     155             :  *
     156             :  * This function also calls the relevant handlers in order to keep, for example,
     157             :  * the non blocking gvt algorithm running.
     158             :  * Note that when this function returns, the anti-message may have not been sent
     159             :  * yet. We don't need to actively check for sending completion: the platform,
     160             :  * during the fossil collection, leverages the gvt to make sure the message has
     161             :  * been indeed sent and processed before freeing it.
     162             :  */
     163           1 : void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
     164             : {
     165             :         msg_id_anti_phase_set(msg->msg_id, gvt_phase_get());
     166             :         gvt_on_remote_msg_send(dest_nid);
     167             : 
     168             :         mpi_lock();
     169             :         MPI_Request req;
     170             :         MPI_Isend(&msg->msg_id, sizeof(msg->msg_id), MPI_BYTE, dest_nid, 0,
     171             :                 MPI_COMM_WORLD, &req);
     172             :         MPI_Request_free(&req);
     173             :         mpi_unlock();
     174             : }
     175             : 
     176             : /**
     177             :  * @brief Sends a platform control message to all the other nodes
     178             :  * @param ctrl the control message to send
     179             :  */
     180           1 : void mpi_control_msg_broadcast(enum msg_ctrl_tag ctrl)
     181             : {
     182             :         MPI_Request req;
     183             :         nid_t i = n_nodes;
     184             :         mpi_lock();
     185             :         while (i--) {
     186             :                 if(i == nid)
     187             :                         continue;
     188             :                 MPI_Isend(NULL, 0, MPI_BYTE, i, ctrl, MPI_COMM_WORLD, &req);
     189             :                 MPI_Request_free(&req);
     190             :         }
     191             :         mpi_unlock();
     192             : }
     193             : 
     194             : /**
     195             :  * @brief Sends a platform control message to a specific nodes
     196             :  * @param ctrl the control message to send
     197             :  * @param dest the id of the destination node
     198             :  */
     199           1 : void mpi_control_msg_send_to(enum msg_ctrl_tag ctrl, nid_t dest)
     200             : {
     201             :         MPI_Request req;
     202             :         mpi_lock();
     203             :         MPI_Isend(NULL, 0, MPI_BYTE, dest, ctrl, MPI_COMM_WORLD, &req);
     204             :         MPI_Request_free(&req);
     205             :         mpi_unlock();
     206             : }
     207             : 
     208             : /**
     209             :  * @brief Empties the queue of incoming MPI messages, doing the right thing for
     210             :  *        each one of them.
     211             :  *
     212             :  * This routine checks, using the MPI probing mechanism, for new remote messages
     213             :  * and it handles them accordingly.
     214             :  * Control messages are handled by the respective platform handler.
     215             :  * Simulation messages are unpacked and put in the queue.
     216             :  * Anti-messages are matched and accordingly processed by the message map.
     217             :  */
     218           1 : void mpi_remote_msg_handle(void)
     219             : {
     220             :         int pending;
     221             :         MPI_Message mpi_msg;
     222             :         MPI_Status status;
     223             : 
     224             :         while (1) {
     225             :                 if (!mpi_trylock())
     226             :                         return;
     227             : 
     228             :                 MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
     229             :                         &pending, &mpi_msg, &status);
     230             : 
     231             :                 if (!pending) {
     232             :                         mpi_unlock();
     233             :                         return;
     234             :                 }
     235             : 
     236             :                 if (unlikely(status.MPI_TAG)) {
     237             :                         MPI_Mrecv(NULL, 0, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
     238             :                         mpi_unlock();
     239             :                         switch(status.MPI_TAG){
     240             :                         case MSG_CTRL_GVT_START:
     241             :                                 gvt_on_start_ctrl_msg();
     242             :                                 break;
     243             :                         case MSG_CTRL_GVT_DONE:
     244             :                                 gvt_on_done_ctrl_msg();
     245             :                                 break;
     246             :                         case MSG_CTRL_TERMINATION:
     247             :                                 termination_on_ctrl_msg();
     248             :                                 break;
     249             :                         }
     250             :                 } else {
     251             :                         int size;
     252             :                         MPI_Get_count(&status, MPI_BYTE, &size);
     253             : 
     254             :                         if (unlikely(size == sizeof(uintptr_t))) {
     255             :                                 uintptr_t anti_id;
     256             :                                 MPI_Mrecv(&anti_id, size, MPI_BYTE, &mpi_msg,
     257             :                                         MPI_STATUS_IGNORE);
     258             :                                 mpi_unlock();
     259             : 
     260             :                                 remote_msg_map_match(anti_id,
     261             :                                         status.MPI_SOURCE, NULL);
     262             :                                 gvt_on_remote_msg_receive(
     263             :                                         msg_id_anti_phase_get(anti_id));
     264             :                         } else {
     265             :                                 mpi_unlock();
     266             : 
     267             :                                 struct lp_msg *msg = msg_allocator_alloc(size -
     268             :                                         offsetof(struct lp_msg, pl));
     269             : 
     270             :                                 mpi_lock();
     271             :                                 MPI_Mrecv(msg, size, MPI_BYTE, &mpi_msg,
     272             :                                         MPI_STATUS_IGNORE);
     273             :                                 mpi_unlock();
     274             : 
     275             :                                 uintptr_t msg_id = msg->msg_id;
     276             :                                 atomic_store_explicit(&msg->flags, 0U,
     277             :                                         memory_order_relaxed);
     278             :                                 remote_msg_map_match(msg_id,
     279             :                                         status.MPI_SOURCE, msg);
     280             :                                 msg_queue_insert(msg);
     281             :                                 gvt_on_remote_msg_receive(
     282             :                                         msg_id_phase_get(msg_id));
     283             :                         }
     284             :                 }
     285             :         }
     286             : }
     287             : 
     288           0 : static MPI_Request reduce_sum_scatter_req = MPI_REQUEST_NULL;
     289             : 
     290             : /**
     291             :  * @brief Computes the sum-reduction-scatter operation across all nodes.
     292             :  * @param node_vals a pointer to the addendum vector from the calling node.
     293             :  * @param result a pointer where the nid-th component of the sum will be stored.
     294             :  *
     295             :  * Each node supplies a n_nodes components vector. The sum of all these vector
     296             :  * is computed and the nid-th component of this vector is stored in @a result.
     297             :  * It is expected that only a single thread calls this function at a time.
     298             :  * Each node has to call this function else the result can't be computed.
     299             :  * It is possible to have a single mpi_reduce_sum_scatter() operation pending at
     300             :  * a time. Both arguments must point to valid memory regions until
     301             :  * mpi_reduce_sum_scatter_done() returns true.
     302             :  */
     303           1 : void mpi_reduce_sum_scatter(const unsigned node_vals[n_nodes], unsigned *result)
     304             : {
     305             :         mpi_lock();
     306             :         MPI_Ireduce_scatter_block(node_vals, result, 1, MPI_UNSIGNED, MPI_SUM,
     307             :                 MPI_COMM_WORLD, &reduce_sum_scatter_req);
     308             :         mpi_unlock();
     309             : }
     310             : 
     311             : /**
     312             :  * @brief Checks if a previous mpi_reduce_sum_scatter() operation has completed.
     313             :  * @return true if the previous operation has been completed, false otherwise.
     314             :  */
     315           1 : bool mpi_reduce_sum_scatter_done(void)
     316             : {
     317             :         int flag = 0;
     318             :         mpi_lock();
     319             :         MPI_Test(&reduce_sum_scatter_req, &flag, MPI_STATUS_IGNORE);
     320             :         mpi_unlock();
     321             :         return flag;
     322             : }
     323             : 
     324           0 : static MPI_Request reduce_min_req = MPI_REQUEST_NULL;
     325             : 
     326             : /**
     327             :  * @brief Computes the min-reduction operation across all nodes.
     328             :  * @param node_min_p a pointer to the value from the calling node which will
     329             :  *                   also be used to store the computed minimum.
     330             :  *
     331             :  * Each node supplies a single simtime_t value. The minimum of all these values
     332             :  * is computed and stored in @a node_min_p itself.
     333             :  * It is expected that only a single thread calls this function at a time.
     334             :  * Each node has to call this function else the result can't be computed.
     335             :  * It is possible to have a single mpi_reduce_min() operation pending at a time.
     336             :  * Both arguments must point to valid memory regions until mpi_reduce_min_done()
     337             :  * returns true.
     338             :  */
     339           1 : void mpi_reduce_min(simtime_t *node_min_p)
     340             : {
     341             :         static simtime_t min_buff;
     342             :         min_buff = *node_min_p;
     343             :         mpi_lock();
     344             :         MPI_Iallreduce(&min_buff, node_min_p, 1, MPI_DOUBLE, MPI_MIN,
     345             :                 MPI_COMM_WORLD, &reduce_min_req);
     346             :         mpi_unlock();
     347             : }
     348             : 
     349             : /**
     350             :  * @brief Checks if a previous mpi_reduce_min() operation has completed.
     351             :  * @return true if the previous operation has been completed, false otherwise.
     352             :  */
     353           1 : bool mpi_reduce_min_done(void)
     354             : {
     355             :         int flag = 0;
     356             :         mpi_lock();
     357             :         MPI_Test(&reduce_min_req, &flag, MPI_STATUS_IGNORE);
     358             :         mpi_unlock();
     359             :         return flag;
     360             : }
     361             : 
     362             : /**
     363             :  * @brief A node barrier
     364             :  */
     365           1 : void mpi_node_barrier(void)
     366             : {
     367             :         mpi_lock();
     368             :         MPI_Barrier(MPI_COMM_WORLD);
     369             :         mpi_unlock();
     370             : }
     371             : 
     372             : /**
     373             :  * @brief Sends a byte buffer to another node
     374             :  * @param data a pointer to the buffer to send
     375             :  * @param data_size the buffer size
     376             :  * @param dest the id of the destination node
     377             :  *
     378             :  * This operation blocks the execution flow until the destination node receives
     379             :  * the data with mpi_raw_data_blocking_rcv().
     380             :  */
     381           1 : void mpi_blocking_data_send(const void *data, int data_size, nid_t dest)
     382             : {
     383             :         mpi_lock();
     384             :         MPI_Send(data, data_size, MPI_BYTE, dest, MSG_CTRL_RAW, MPI_COMM_WORLD);
     385             :         mpi_unlock();
     386             : }
     387             : 
     388             : /**
     389             :  * @brief Receives a byte buffer from another node
     390             :  * @param data_size_p where to write the size of the received data
     391             :  * @param src the id of the sender node
     392             :  * @return the buffer allocated with mm_alloc() containing the received data
     393             :  *
     394             :  * This operation blocks the execution flow until the sender node actually sends
     395             :  * the data with mpi_raw_data_blocking_send().
     396             :  */
     397           1 : void *mpi_blocking_data_rcv(int *data_size_p, nid_t src)
     398             : {
     399             :         MPI_Status status;
     400             :         MPI_Message mpi_msg;
     401             :         mpi_lock();
     402             :         MPI_Mprobe(src, MSG_CTRL_RAW, MPI_COMM_WORLD, &mpi_msg, &status);
     403             :         int data_size;
     404             :         MPI_Get_count(&status, MPI_BYTE, &data_size);
     405             :         char *ret = mm_alloc(data_size);
     406             :         MPI_Mrecv(ret, data_size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
     407             :         if (data_size_p != NULL)
     408             :                 *data_size_p = data_size;
     409             :         mpi_unlock();
     410             :         return ret;
     411             : }
     412             : 
     413             : #endif

Generated by: LCOV version 1.14