LCOV - code coverage report
Current view: top level - core/src/gvt - gvt.c Hit Total Coverage
Test: ROOT-Sim develop Documentation Coverage Lines: 4 16 25.0 %
Date: 2021-03-02 11:24:52

          Line data    Source code
       1           1 : /**
       2             :  * @file gvt/gvt.c
       3             :  *
       4             :  * @brief Global Virtual Time
       5             :  *
       6             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
       7             :  * SPDX-License-Identifier: GPL-3.0-only
       8             :  */
       9             : #include <gvt/gvt.h>
      10             : 
      11             : #include <arch/timer.h>
      12             : #include <core/init.h>
      13             : #include <datatypes/msg_queue.h>
      14             : #include <distributed/mpi.h>
      15             : #include <log/stats.h>
      16             : 
      17             : #include <memory.h>
      18             : #include <stdatomic.h>
      19             : 
      20             : /// A thread phase during the gvt algorithm computation
      21           0 : enum thread_phase_t {
      22             :         tphase_rdy = 0,
      23             :         tphase_A,
      24             :         tphase_B,
      25             : #ifdef ROOTSIM_MPI
      26             :         tphase_B_reduce,
      27             :         tphase_C,
      28             :         tphase_C_reduce,
      29             :         tphase_B_rdone,
      30             :         tphase_C_rdone,
      31             :         tphase_B_wait_msgs,
      32             : #endif
      33             :         tphase_wait
      34             : };
      35             : 
      36           0 : static __thread enum thread_phase_t thread_phase = tphase_rdy;
      37             : 
      38           0 : static timer_uint last_gvt;
      39           0 : static simtime_t reducing_p[MAX_THREADS];
      40           0 : static __thread simtime_t current_gvt;
      41             : 
      42             : #ifdef ROOTSIM_MPI
      43             : 
      44           0 : static atomic_uint sent_tot[MAX_NODES];
      45           0 : static _Atomic(nid_t) missing_nodes;
      46             : 
      47             : __thread bool gvt_phase_green = false;
      48             : __thread unsigned remote_msg_sent[MAX_NODES] = {0};
      49           0 : atomic_int remote_msg_received[2];
      50             : 
      51             : #endif
      52             : 
      53             : /**
      54             :  * @brief Initializes the gvt module in the node
      55             :  */
      56           1 : void gvt_global_init(void)
      57             : {
      58             :         last_gvt = timer_new();
      59             : }
      60             : 
      61           0 : static inline simtime_t gvt_node_reduce(void)
      62             : {
      63             :         unsigned i = n_threads - 1;
      64             :         simtime_t candidate = reducing_p[i];
      65             :         while(i--){
      66             :                 candidate = min(reducing_p[i], candidate);
      67             :         }
      68             :         return candidate;
      69             : }
      70             : 
      71             : #ifndef ROOTSIM_MPI
      72             : 
      73             : simtime_t gvt_phase_run(void)
      74             : {
      75             :         static atomic_uint c_a = 0;
      76             :         static atomic_uint c_b = 0;
      77             : 
      78             :         if(likely(thread_phase == tphase_rdy)) {
      79             :                 if (rid) {
      80             :                         if (likely(!atomic_load_explicit(&c_a,
      81             :                                 memory_order_relaxed)))
      82             :                                 return 0;
      83             :                 } else {
      84             :                         if (likely(global_config.gvt_period >
      85             :                                 timer_value(last_gvt) || atomic_load_explicit(
      86             :                                 &c_b, memory_order_relaxed)))
      87             :                                 return 0;
      88             :                 }
      89             :                 stats_time_start(STATS_GVT);
      90             :                 current_gvt = SIMTIME_MAX;
      91             :                 thread_phase = tphase_A;
      92             :                 atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
      93             :                 return 0;
      94             :         }
      95             : 
      96             :         switch (thread_phase) {
      97             :         default:
      98             :         case tphase_rdy:
      99             :                 __builtin_unreachable();
     100             :                 /* fallthrough */
     101             :         case tphase_A:
     102             :                 if (atomic_load_explicit(&c_a, memory_order_relaxed)
     103             :                         == n_threads) {
     104             :                         simtime_t this_t = msg_queue_time_peek();
     105             :                         reducing_p[rid] = min(current_gvt, this_t);
     106             :                         thread_phase = tphase_B;
     107             :                         atomic_fetch_add_explicit(&c_b, 1U,
     108             :                                 memory_order_release);
     109             :                 }
     110             :                 break;
     111             :         case tphase_B:
     112             :                 if (atomic_load_explicit(&c_b, memory_order_acquire)
     113             :                         == n_threads) {
     114             :                         thread_phase = tphase_wait;
     115             :                         atomic_fetch_sub_explicit(&c_a, 1U,
     116             :                                 memory_order_relaxed);
     117             :                         if(!rid){
     118             :                                 last_gvt = timer_new();
     119             :                         }
     120             :                         stats_time_take(STATS_GVT);
     121             :                         return gvt_node_reduce();
     122             :                 }
     123             :                 break;
     124             :         case tphase_wait:
     125             :                 if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
     126             :                         atomic_fetch_sub_explicit(&c_b, 1U,
     127             :                                 memory_order_relaxed);
     128             :                         thread_phase = tphase_rdy;
     129             :                 }
     130             :                 break;
     131             :         }
     132             : 
     133             :         return 0;
     134             : }
     135             : 
     136             : #else
     137             : 
     138           0 : static atomic_uint c_a = 0;
     139             : 
     140             : /**
     141             :  * @brief Handles a MSG_CTRL_GVT_START control message
     142             :  *
     143             :  * Called by the MPI layer in response to a MSG_CTRL_GVT_START control message
     144             :  */
     145           1 : void gvt_on_start_ctrl_msg(void)
     146             : {
     147             :         stats_time_start(STATS_GVT);
     148             :         current_gvt = SIMTIME_MAX;
     149             :         thread_phase = tphase_A;
     150             :         atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
     151             : }
     152             : 
     153             : /**
     154             :  * @brief Handles a MSG_CTRL_GVT_DONE control message
     155             :  *
     156             :  * Called by the MPI layer in response to a MSG_CTRL_GVT_DONE control message
     157             :  */
     158           1 : void gvt_on_done_ctrl_msg(void)
     159             : {
     160             :         atomic_fetch_sub_explicit(&missing_nodes, 1U, memory_order_relaxed);
     161             : }
     162             : 
     163           0 : simtime_t gvt_phase_run(void)
     164             : {
     165             :         static atomic_uint c_b = 0;
     166             :         static unsigned remote_msg_to_receive;
     167             :         static __thread bool red_round = false;
     168             : 
     169             :         if(likely(thread_phase == tphase_rdy)) {
     170             :                 if (nid || rid) {
     171             :                         if (likely(!atomic_load_explicit(&c_a,
     172             :                                 memory_order_relaxed)))
     173             :                                 return 0;
     174             :                 } else {
     175             :                         if (likely(global_config.gvt_period >
     176             :                                 timer_value(last_gvt) || atomic_load_explicit(
     177             :                                 &missing_nodes, memory_order_relaxed)))
     178             :                                 return 0;
     179             : 
     180             :                         atomic_store_explicit(&missing_nodes, n_nodes,
     181             :                                 memory_order_relaxed);
     182             :                         mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
     183             :                 }
     184             :                 stats_time_start(STATS_GVT);
     185             :                 current_gvt = SIMTIME_MAX;
     186             :                 thread_phase = tphase_A;
     187             :                 atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
     188             :                 return 0;
     189             :         }
     190             : 
     191             :         switch (thread_phase) {
     192             :         default:
     193             :         case tphase_rdy:
     194             :                 __builtin_unreachable();
     195             :                 /* fallthrough */
     196             :         case tphase_A:
     197             :                 if (atomic_load_explicit(&c_a, memory_order_relaxed)
     198             :                         == n_threads) {
     199             :                         simtime_t this_t = msg_queue_time_peek();
     200             :                         // this leverages the enum layout
     201             :                         thread_phase = tphase_B + (2 * red_round) + !rid;
     202             : 
     203             :                         red_round = !red_round;
     204             :                         if (red_round) {
     205             :                                 for(nid_t i = 0; i < n_nodes; ++i)
     206             :                                         atomic_fetch_add_explicit(&sent_tot[i],
     207             :                                                 remote_msg_sent[i],
     208             :                                                 memory_order_relaxed);
     209             :                                 // release renders visible the sum aggregation
     210             :                                 atomic_fetch_add_explicit(&c_b, 1U,
     211             :                                         memory_order_release);
     212             : 
     213             :                                 memset(remote_msg_sent, 0,
     214             :                                         sizeof(unsigned) * n_nodes);
     215             :                                 gvt_phase_green = !gvt_phase_green;
     216             :                                 current_gvt = min(current_gvt, this_t);
     217             :                         } else {
     218             :                                 reducing_p[rid] = min(current_gvt, this_t);
     219             :                                 // release renders visible the thread local gvt
     220             :                                 atomic_fetch_add_explicit(&c_b, 1U,
     221             :                                         memory_order_release);
     222             :                         }
     223             :                 }
     224             :                 break;
     225             :         case tphase_B:
     226             :                 // acquire is needed to sync with the remote_msg_received sum
     227             :                 // and the c_b counter
     228             :                 if (!atomic_load_explicit(&c_a, memory_order_acquire))
     229             :                         thread_phase = tphase_B_wait_msgs;
     230             :                 break;
     231             :         case tphase_B_reduce:
     232             :                 // acquire is needed to sync with all threads aggregated values
     233             :                 if (atomic_load_explicit(&c_b, memory_order_acquire) ==
     234             :                         n_threads) {
     235             :                         mpi_reduce_sum_scatter((unsigned *)sent_tot,
     236             :                                 &remote_msg_to_receive);
     237             :                         thread_phase = tphase_B_rdone;
     238             :                 }
     239             :                 break;
     240             :         case tphase_B_rdone:
     241             :                 if (mpi_reduce_sum_scatter_done()) {
     242             :                         atomic_fetch_sub_explicit(remote_msg_received +
     243             :                                 !gvt_phase_green, remote_msg_to_receive,
     244             :                                 memory_order_relaxed);
     245             :                         // release renders visible the remote_msg_received sum
     246             :                         // and the c_b counter
     247             :                         atomic_store_explicit(&c_a, 0, memory_order_release);
     248             :                         memset(sent_tot, 0, sizeof(atomic_uint) * n_nodes);
     249             :                         thread_phase = tphase_B_wait_msgs;
     250             :                 }
     251             :                 break;
     252             :         case tphase_C:
     253             :                 // no sync needed
     254             :                 if (!atomic_load_explicit(&c_a, memory_order_relaxed)) {
     255             :                         atomic_fetch_sub_explicit(&c_b, 1U,
     256             :                                 memory_order_relaxed);
     257             :                         thread_phase = tphase_wait;
     258             :                         return *reducing_p;
     259             :                 }
     260             :                 break;
     261             :         case tphase_C_reduce:
     262             :                 // acquire is needed to sync with all threads gvt local values
     263             :                 if (atomic_load_explicit(&c_b, memory_order_acquire) ==
     264             :                         n_threads) {
     265             :                         *reducing_p = gvt_node_reduce();
     266             :                         mpi_reduce_min(reducing_p);
     267             :                         thread_phase = tphase_C_rdone;
     268             :                 }
     269             :                 break;
     270             :         case tphase_C_rdone:
     271             :                 if (mpi_reduce_min_done()) {
     272             :                         atomic_fetch_sub_explicit(&c_b, 1U,
     273             :                                 memory_order_relaxed);
     274             :                         // no sync needed
     275             :                         atomic_store_explicit(&c_a, 0, memory_order_relaxed);
     276             :                         last_gvt = timer_new();
     277             :                         thread_phase = tphase_wait;
     278             :                         return *reducing_p;
     279             :                 }
     280             :                 break;
     281             :         case tphase_B_wait_msgs:
     282             :                 // don't need to sync: we already synced on c_f in tphase_D
     283             :                 if (!atomic_load_explicit(remote_msg_received +
     284             :                                 !gvt_phase_green, memory_order_relaxed)) {
     285             :                         thread_phase = tphase_wait;
     286             :                         atomic_fetch_sub_explicit(&c_b, 1U,
     287             :                                 memory_order_relaxed);
     288             :                 }
     289             :                 break;
     290             :         case tphase_wait:
     291             :                 if (!atomic_load_explicit(&c_b, memory_order_relaxed)) {
     292             :                 // this restarts the gvt phases for the second round of node
     293             :                 // local gvt reduction or, if we already did that, it simply
     294             :                 // ends the gvt reduction
     295             :                         thread_phase = red_round;
     296             :                         if (red_round) {
     297             :                                 atomic_fetch_add_explicit(&c_a, 1U,
     298             :                                         memory_order_relaxed);
     299             :                         } else {
     300             :                                 if(!rid)
     301             :                                         mpi_control_msg_send_to(
     302             :                                                 MSG_CTRL_GVT_DONE, 0);
     303             :                                 stats_time_take(STATS_GVT);
     304             :                         }
     305             :                 }
     306             :                 break;
     307             :         }
     308             :         return 0;
     309             : }
     310             : 
     311             : #endif
     312             : 
     313           0 : void gvt_on_msg_process(simtime_t msg_t)
     314             : {
     315             :         if (unlikely(thread_phase && current_gvt > msg_t))
     316             :                 current_gvt = msg_t;
     317             : }

Generated by: LCOV version 1.14