LCOV - code coverage report
Current view: top level - core/src/gvt - gvt.c Hit Total Coverage
Test: ROOT-Sim master Documentation Coverage Lines: 4 19 21.1 %
Date: 2021-03-25 15:11:55

          Line data    Source code
       1           1 : /**
       2             :  * @file gvt/gvt.c
       3             :  *
       4             :  * @brief Global Virtual Time
       5             :  *
       6             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
       7             :  * SPDX-License-Identifier: GPL-3.0-only
       8             :  */
       9             : #include <gvt/gvt.h>
      10             : 
      11             : #include <arch/timer.h>
      12             : #include <core/init.h>
      13             : #include <datatypes/msg_queue.h>
      14             : #include <distributed/mpi.h>
      15             : #include <log/stats.h>
      16             : 
      17             : #include <memory.h>
      18             : #include <stdatomic.h>
      19             : 
      20             : /// A thread phase during the gvt algorithm computation
      21           0 : enum thread_phase {
      22             :         thread_phase_idle = 0,
      23             :         thread_phase_A,
      24             :         thread_phase_B,
      25             :         thread_phase_C,
      26             :         thread_phase_D
      27             : };
      28             : 
      29           0 : enum node_phase {
      30             :         node_phase_redux_first = 0,
      31             :         node_sent_reduce,
      32             :         node_sent_reduce_wait,
      33             :         node_sent_wait,
      34             :         node_phase_redux_second,
      35             :         node_min_reduce,
      36             :         node_min_reduce_wait,
      37             :         node_min_wait,
      38             :         node_done
      39             : };
      40             : 
      41           0 : static _Thread_local enum thread_phase thread_phase = thread_phase_idle;
      42             : 
      43           0 : static timer_uint gvt_timer;
      44           0 : static simtime_t reducing_p[MAX_THREADS];
      45           0 : static _Thread_local simtime_t current_gvt;
      46           1 : static _Atomic(rid_t) c_a = 0;
      47             : static _Atomic(rid_t) c_b = 0;
      48             : 
      49             : static _Atomic(nid_t) gvt_nodes;
      50             : 
      51             : _Thread_local _Bool gvt_phase;
      52             : _Thread_local uint32_t remote_msg_seq[2][MAX_NODES];
      53             : _Thread_local uint32_t remote_msg_received[2];
      54             : 
      55             : /**
      56             :  * @brief Initializes the gvt module in the node
      57             :  */
      58             : void gvt_global_init(void)
      59             : {
      60             :         gvt_timer = timer_new();
      61             : }
      62             : 
      63             : /**
      64             :  * @brief Handles a MSG_CTRL_GVT_START control message
      65             :  *
      66             :  * Called by the MPI layer in response to a MSG_CTRL_GVT_START control message,
      67             :  * but also internally to start a new reduction
      68             :  */
      69           1 : void gvt_start_processing(void)
      70             : {
      71             :         stats_time_start(STATS_GVT);
      72             :         current_gvt = SIMTIME_MAX;
      73             :         thread_phase = thread_phase_A;
      74             : }
      75             : 
      76             : /**
      77             :  * @brief Handles a MSG_CTRL_GVT_DONE control message
      78             :  *
      79             :  * Called by the MPI layer in response to a MSG_CTRL_GVT_DONE control message
      80             :  */
      81           1 : void gvt_on_done_ctrl_msg(void)
      82             : {
      83             :         atomic_fetch_sub_explicit(&gvt_nodes, 1U, memory_order_relaxed);
      84             : }
      85             : 
      86           0 : void gvt_on_msg_extraction(simtime_t msg_t)
      87             : {
      88             :         if (unlikely(thread_phase && current_gvt > msg_t))
      89             :                 current_gvt = msg_t;
      90             : }
      91             : 
      92           0 : static inline simtime_t gvt_node_reduce(void)
      93             : {
      94             :         unsigned i = n_threads - 1;
      95             :         simtime_t candidate = reducing_p[i];
      96             :         while (i--)
      97             :                 candidate = min(reducing_p[i], candidate);
      98             : 
      99             :         return candidate;
     100             : }
     101             : 
     102           0 : static bool gvt_thread_phase_run(void)
     103             : {
     104             :         switch (thread_phase) {
     105             :         case thread_phase_A:
     106             :                 if (atomic_load_explicit(&c_a, memory_order_relaxed))
     107             :                         break;
     108             :                 current_gvt = min(current_gvt, msg_queue_time_peek());
     109             :                 thread_phase = thread_phase_B;
     110             :                 atomic_fetch_add_explicit(&c_b, 1U, memory_order_relaxed);
     111             :                 break;
     112             :         case thread_phase_B:
     113             :                 if (atomic_load_explicit(&c_b, memory_order_relaxed) !=
     114             :                                 n_threads)
     115             :                         break;
     116             :                 thread_phase = thread_phase_C;
     117             :                 atomic_fetch_add_explicit(&c_a, 1U, memory_order_relaxed);
     118             :                 break;
     119             :         case thread_phase_C:
     120             :                 if (atomic_load_explicit(&c_a, memory_order_relaxed) !=
     121             :                                 n_threads)
     122             :                         break;
     123             :                 reducing_p[rid] = min(current_gvt, msg_queue_time_peek());
     124             :                 thread_phase = thread_phase_D;
     125             :                 atomic_fetch_sub_explicit(&c_b, 1U, memory_order_release);
     126             :                 break;
     127             :         case thread_phase_D:
     128             :                 if (atomic_load_explicit(&c_b, memory_order_acquire))
     129             :                         break;
     130             :                 thread_phase = thread_phase_idle;
     131             :                 atomic_fetch_sub_explicit(&c_a, 1U, memory_order_relaxed);
     132             :                 return true;
     133             :         default:
     134             :                 __builtin_unreachable();
     135             :         }
     136             :         return false;
     137             : }
     138             : 
     139             : #ifdef ROOTSIM_MPI
     140             : 
     141           0 : static bool gvt_node_phase_run(void)
     142             : {
     143             :         static _Thread_local enum node_phase node_phase = node_phase_redux_first;
     144             :         static _Thread_local uint32_t last_seq[2][MAX_NODES];
     145             :         static _Atomic(uint32_t) total_sent[MAX_NODES];
     146             :         static _Atomic(int32_t) total_msg_received;
     147             :         static uint32_t remote_msg_to_receive;
     148             :         static _Atomic(rid_t) c_c;
     149             :         static _Atomic(rid_t) c_d;
     150             : 
     151             :         switch (node_phase) {
     152             :         case node_phase_redux_first:
     153             :         case node_phase_redux_second:
     154             :                 if (!gvt_thread_phase_run())
     155             :                         break;
     156             : 
     157             :                 gvt_phase = gvt_phase ^ !node_phase;
     158             :                 thread_phase = thread_phase_A;
     159             :                 ++node_phase;
     160             :                 break;
     161             :         case node_sent_reduce:
     162             :                 if (atomic_load_explicit(&c_a, memory_order_relaxed))
     163             :                         break;
     164             : 
     165             :                 for (nid_t i = n_nodes - 1; i >= 0; --i)
     166             :                         atomic_fetch_add_explicit(&total_sent[i],
     167             :                                         remote_msg_seq[!gvt_phase][i] -
     168             :                                         last_seq[!gvt_phase][i],
     169             :                                         memory_order_relaxed);
     170             :                 memcpy(last_seq[!gvt_phase], remote_msg_seq[!gvt_phase],
     171             :                                 sizeof(uint32_t) * n_nodes);
     172             : 
     173             :                 atomic_fetch_add_explicit(&total_msg_received, 1U,
     174             :                                 memory_order_relaxed);
     175             :                 // synchronizes total_sent and sent values zeroing
     176             :                 if (atomic_fetch_add_explicit(&c_c, 1U, memory_order_acq_rel) !=
     177             :                                 n_threads - 1) {
     178             :                         node_phase = node_sent_wait;
     179             :                         break;
     180             :                 }
     181             :                 mpi_reduce_sum_scatter((uint32_t *)total_sent,
     182             :                                 &remote_msg_to_receive);
     183             :                 node_phase = node_sent_reduce_wait;
     184             :                 break;
     185             :         case node_sent_reduce_wait:
     186             :                 if (!mpi_reduce_sum_scatter_done())
     187             :                         break;
     188             :                 atomic_fetch_sub_explicit(&total_msg_received,
     189             :                                 remote_msg_to_receive + n_threads,
     190             :                                 memory_order_relaxed);
     191             :                 node_phase = node_sent_wait;
     192             :                 break;
     193             :         case node_sent_wait: {
     194             :                 int32_t r = atomic_fetch_add_explicit(&total_msg_received,
     195             :                                 remote_msg_received[!gvt_phase],
     196             :                                 memory_order_relaxed);
     197             :                 remote_msg_received[!gvt_phase] = 0;
     198             :                 if (r)
     199             :                         break;
     200             :                 uint32_t q = n_nodes / n_threads + 1;
     201             :                 memset(total_sent + rid * q, 0, q * sizeof(*total_sent));
     202             :                 node_phase = node_phase_redux_second;
     203             :                 break;
     204             :         }
     205             :         case node_min_reduce:
     206             :                 if (atomic_fetch_add_explicit(&c_d, 1U, memory_order_relaxed)) {
     207             :                         node_phase = node_min_wait;
     208             :                         break;
     209             :                 }
     210             :                 *reducing_p = gvt_node_reduce();
     211             :                 mpi_reduce_min(reducing_p);
     212             :                 node_phase = node_min_reduce_wait;
     213             :                 break;
     214             :         case node_min_reduce_wait:
     215             :                 if (atomic_load_explicit(&c_d, memory_order_relaxed) !=
     216             :                                 n_threads || !mpi_reduce_min_done())
     217             :                         break;
     218             :                 atomic_fetch_sub_explicit(&c_c, n_threads, memory_order_release);
     219             :                 node_phase = node_done;
     220             :                 return true;
     221             :         case node_min_wait:
     222             :                 if (atomic_load_explicit(&c_c, memory_order_acquire))
     223             :                         break;
     224             :                 node_phase = node_done;
     225             :                 return true;
     226             :         case node_done:
     227             :                 node_phase = node_phase_redux_first;
     228             :                 thread_phase = thread_phase_idle;
     229             :                 if (atomic_fetch_sub_explicit(&c_d, 1U, memory_order_relaxed) ==
     230             :                                 1)
     231             :                         mpi_control_msg_send_to(MSG_CTRL_GVT_DONE, 0);
     232             :                 break;
     233             :         default:
     234             :                 __builtin_unreachable();
     235             :         }
     236             :         return false;
     237             : }
     238             : 
     239           0 : simtime_t gvt_phase_run(void)
     240             : {
     241             :         if (unlikely(thread_phase)) {
     242             :                 if (!gvt_node_phase_run())
     243             :                         return 0.0;
     244             :                 if (!rid && !nid)
     245             :                         gvt_timer = timer_new();
     246             :                 stats_time_take(STATS_GVT);
     247             :                 return *reducing_p;
     248             :         }
     249             : 
     250             :         if (unlikely(atomic_load_explicit(&c_b, memory_order_relaxed)))
     251             :                 gvt_start_processing();
     252             : 
     253             :         if (unlikely(!rid && !nid &&
     254             :                         global_config.gvt_period < timer_value(gvt_timer) &&
     255             :                         !atomic_load_explicit(&gvt_nodes, memory_order_relaxed))) {
     256             :                 atomic_fetch_add_explicit(&gvt_nodes, n_nodes, memory_order_relaxed);
     257             :                 mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
     258             :         }
     259             : 
     260             :         return 0;
     261             : }
     262             : 
     263             : #else
     264             : 
     265             : simtime_t gvt_phase_run(void)
     266             : {
     267             :         if (unlikely(thread_phase)) {
     268             :                 if (!gvt_thread_phase_run())
     269             :                         return 0.0;
     270             :                 if (!rid)
     271             :                         gvt_timer = timer_new();
     272             :                 stats_time_take(STATS_GVT);
     273             :                 return gvt_node_reduce();
     274             :         }
     275             : 
     276             :         if (unlikely(atomic_load_explicit(&c_b, memory_order_relaxed)))
     277             :                 gvt_start_processing();
     278             : 
     279             :         if (unlikely(!rid && global_config.gvt_period < timer_value(gvt_timer)))
     280             :                 mpi_control_msg_broadcast(MSG_CTRL_GVT_START);
     281             : 
     282             :         return 0;
     283             : }
     284             : 
     285             : #endif
     286             : 
     287           0 : extern void gvt_remote_msg_send(struct lp_msg *msg, nid_t dest_nid);
     288           0 : extern void gvt_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid);
     289           0 : extern void gvt_remote_msg_receive(struct lp_msg *msg);
     290           0 : extern void gvt_remote_anti_msg_receive(struct lp_msg *msg);

Generated by: LCOV version 1.14