LCOV - code coverage report
Current view: top level - core/src/lp - process.c Hit Total Coverage
Test: ROOT-Sim master Documentation Coverage Lines: 4 20 20.0 %
Date: 2021-03-25 15:11:55

          Line data    Source code
       1           1 : /**
       2             :  * @file lp/process.c
       3             :  *
       4             :  * @brief LP state management functions
       5             :  *
       6             :  * LP state management functions
       7             :  *
       8             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
       9             :  * SPDX-License-Identifier: GPL-3.0-only
      10             :  */
      11             : #include <lp/process.h>
      12             : 
      13             : #include <datatypes/msg_queue.h>
      14             : #include <distributed/mpi.h>
      15             : #include <gvt/gvt.h>
      16             : #include <log/stats.h>
      17             : #include <lp/lp.h>
      18             : #include <mm/msg_allocator.h>
      19             : #include <serial/serial.h>
      20             : 
      21           0 : static _Thread_local bool silent_processing = false;
      22           0 : static _Thread_local dyn_array(struct lp_msg *) early_antis;
      23             : 
      24             : void ScheduleNewEvent_pr(lp_id_t receiver, simtime_t timestamp,
      25             :                 unsigned event_type, const void *payload, unsigned payload_size)
      26             : {
      27             :         if (unlikely(silent_processing))
      28             :                 return;
      29             : 
      30             :         struct process_data *proc_p = &current_lp->p;
      31             :         struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
      32             :                 payload, payload_size);
      33             : 
      34             : #if LOG_LEVEL <= LOG_DEBUG
      35             :         msg->send = current_lp - lps;
      36             :         msg->send_t = array_peek(proc_p->past_msgs)->dest_t;
      37             : #endif
      38             : 
      39             :         nid_t dest_nid = lid_to_nid(receiver);
      40             :         if (dest_nid != nid) {
      41             :                 mpi_remote_msg_send(msg, dest_nid);
      42             :                 msg_allocator_free_at_gvt(msg);
      43             :         } else {
      44             :                 atomic_store_explicit(&msg->flags, 0U, memory_order_relaxed);
      45             :                 msg_queue_insert(msg);
      46             :         }
      47             :         array_push(proc_p->sent_msgs, msg);
      48             : }
      49             : 
      50           0 : void process_global_init(void)
      51             : {
      52             :         serial_model_init();
      53             : }
      54             : 
      55           0 : void process_global_fini(void)
      56             : {
      57             :         ProcessEvent(0, 0, MODEL_FINI, NULL, 0, NULL);
      58             : }
      59             : 
      60           0 : void process_init(void)
      61             : {
      62             :         array_init(early_antis);
      63             : }
      64             : 
      65           0 : void process_fini(void)
      66             : {
      67             :         array_fini(early_antis);
      68             : }
      69             : 
      70             : /**
      71             :  * @brief Initializes the processing module in the current LP
      72             :  */
      73           1 : void process_lp_init(void)
      74             : {
      75             :         struct lp_ctx *this_lp = current_lp;
      76             :         struct process_data *proc_p = &current_lp->p;
      77             : 
      78             :         array_init(proc_p->past_msgs);
      79             :         array_init(proc_p->sent_msgs);
      80             : 
      81             :         struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
      82             :                         NULL, 0U);
      83             : 
      84             :         array_push(proc_p->past_msgs, msg);
      85             :         array_push(proc_p->sent_msgs, NULL);
      86             :         ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
      87             : 
      88             :         model_allocator_checkpoint_next_force_full();
      89             :         model_allocator_checkpoint_take(0);
      90             : }
      91             : 
      92             : /**
      93             :  * @brief Deinitializes the LP by calling the model's DEINIT handler
      94             :  */
      95           1 : void process_lp_deinit(void)
      96             : {
      97             :         struct lp_ctx *this_lp = current_lp;
      98             :         ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
      99             :                            this_lp->lib_ctx_p->state_s);
     100             : }
     101             : 
     102             : /**
     103             :  * @brief Finalizes the processing module in the current LP
     104             :  */
     105           1 : void process_lp_fini(void)
     106             : {
     107             :         struct process_data *proc_p = &current_lp->p;
     108             : 
     109             :         array_fini(proc_p->sent_msgs);
     110             : 
     111             :         for (array_count_t i = 0; i < array_count(proc_p->past_msgs); ++i) {
     112             :                 struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
     113             :                 uint32_t flags = atomic_load_explicit(&msg->flags,
     114             :                                 memory_order_relaxed);
     115             :                 if (!(flags & MSG_FLAG_ANTI))
     116             :                         msg_allocator_free(msg);
     117             :         }
     118             :         array_fini(proc_p->past_msgs);
     119             : }
     120             : 
     121           0 : static inline void silent_execution(struct process_data *proc_p,
     122             :                 array_count_t last_i, array_count_t past_i)
     123             : {
     124             :         silent_processing = true;
     125             : 
     126             :         void *state_p = current_lp->lib_ctx_p->state_s;
     127             :         for (array_count_t k = last_i + 1; k < past_i; ++k) {
     128             :                 const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
     129             :                 stats_time_start(STATS_MSG_SILENT);
     130             :                 ProcessEvent_pr(
     131             :                         msg->dest,
     132             :                         msg->dest_t,
     133             :                         msg->m_type,
     134             :                         msg->pl,
     135             :                         msg->pl_size,
     136             :                         state_p
     137             :                 );
     138             :                 stats_time_take(STATS_MSG_SILENT);
     139             :         }
     140             : 
     141             :         silent_processing = false;
     142             : }
     143             : 
     144           0 : static inline void send_anti_messages(struct process_data *proc_p,
     145             :                 array_count_t past_i)
     146             : {
     147             :         array_count_t sent_i = array_count(proc_p->sent_msgs) - 1;
     148             :         array_count_t b = array_count(proc_p->past_msgs) - past_i;
     149             :         do {
     150             :                 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
     151             :                 b -= msg == NULL;
     152             :                 --sent_i;
     153             :         } while(b);
     154             : 
     155             :         for (array_count_t i = sent_i + 1; i < array_count(proc_p->sent_msgs);
     156             :                         ++i) {
     157             :                 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
     158             :                 if (!msg)
     159             :                         continue;
     160             : 
     161             :                 nid_t dest_nid = lid_to_nid(msg->dest);
     162             :                 if (dest_nid != nid) {
     163             :                         mpi_remote_anti_msg_send(msg, dest_nid);
     164             :                 } else {
     165             :                         int msg_status = atomic_fetch_add_explicit(&msg->flags,
     166             :                                 MSG_FLAG_ANTI, memory_order_relaxed);
     167             :                         if (msg_status & MSG_FLAG_PROCESSED)
     168             :                                 msg_queue_insert(msg);
     169             :                 }
     170             :         }
     171             :         array_count(proc_p->sent_msgs) = sent_i + 1;
     172             : }
     173             : 
     174           0 : static inline void reinsert_invalid_past_messages(struct process_data *proc_p,
     175             :                 array_count_t past_i)
     176             : {
     177             :         for (array_count_t i = past_i; i < array_count(proc_p->past_msgs);
     178             :                         ++i) {
     179             :                 struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
     180             :                 int msg_status = atomic_fetch_add_explicit(&msg->flags,
     181             :                         -MSG_FLAG_PROCESSED, memory_order_relaxed);
     182             :                 if (!(msg_status & MSG_FLAG_ANTI))
     183             :                         msg_queue_insert(msg);
     184             :         }
     185             : 
     186             :         array_count(proc_p->past_msgs) = past_i;
     187             : }
     188             : 
     189           0 : static void do_rollback(struct process_data *proc_p, array_count_t past_i)
     190             : {
     191             :         stats_time_start(STATS_ROLLBACK);
     192             : 
     193             :         array_count_t last_i = model_allocator_checkpoint_restore(past_i - 1);
     194             :         silent_execution(proc_p, last_i, past_i);
     195             :         send_anti_messages(proc_p, past_i);
     196             :         reinsert_invalid_past_messages(proc_p, past_i);
     197             : 
     198             :         stats_time_take(STATS_ROLLBACK);
     199             : }
     200             : 
     201           0 : static inline array_count_t match_anti_msg(const struct process_data *proc_p,
     202             :                 const struct lp_msg *a_msg)
     203             : {
     204             :         array_count_t past_i = array_count(proc_p->past_msgs) - 1;
     205             :         while (array_get_at(proc_p->past_msgs, past_i) != a_msg)
     206             :                 --past_i;
     207             :         return past_i;
     208             : }
     209             : 
     210           0 : static inline array_count_t match_straggler_msg(
     211             :                 const struct process_data *proc_p, const struct lp_msg *s_msg)
     212             : {
     213             :         array_count_t past_i = array_count(proc_p->past_msgs) - 2;
     214             :         while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg))
     215             :                 --past_i;
     216             :         return past_i + 1;
     217             : }
     218             : 
     219           0 : static inline array_count_t match_remote_msg(const struct process_data *proc_p,
     220             :                 const struct lp_msg *r_msg)
     221             : {
     222             :         uint32_t m_id = r_msg->raw_flags >> 2, m_seq = r_msg->m_seq;
     223             :         array_count_t past_i = array_count(proc_p->past_msgs) - 1;
     224             :         while (past_i) {
     225             :                 struct lp_msg *msg = array_get_at(proc_p->past_msgs, past_i);
     226             :                 if (msg->raw_flags >> 2 == m_id && msg->m_seq == m_seq) {
     227             :                         msg->raw_flags |= MSG_FLAG_ANTI;
     228             :                         break;
     229             :                 }
     230             :                 --past_i;
     231             :         }
     232             : 
     233             :         return past_i;
     234             : }
     235             : 
     236           0 : static inline void handle_remote_anti_msg(struct process_data *proc_p,
     237             :                 struct lp_msg *msg)
     238             : {
     239             :         array_count_t past_i = match_remote_msg(proc_p, msg);
     240             :         if (unlikely(!past_i)) {
     241             :                 msg->raw_flags >>= 2;
     242             :                 array_push(early_antis, msg);
     243             :                 return;
     244             :         }
     245             :         struct lp_msg *old_msg = array_get_at (proc_p->past_msgs, past_i);
     246             :         do_rollback(proc_p, past_i);
     247             :         termination_on_lp_rollback(msg->dest_t);
     248             :         msg_allocator_free(old_msg);
     249             :         msg_allocator_free(msg);
     250             : }
     251             : 
     252           0 : static inline bool check_early_anti_messages(struct lp_msg *msg)
     253             : {
     254             :         uint32_t m_id;
     255             :         if (likely(!array_count(early_antis) || !(m_id = msg->raw_flags >> 2)))
     256             :                 return false;
     257             :         uint32_t m_seq = msg->m_seq;
     258             :         if (!m_id)
     259             :                 return false;
     260             :         for (array_count_t i = 0; i < array_count(early_antis); ++i) {
     261             :                 struct lp_msg *a_msg = array_get_at(early_antis, i);
     262             :                 if (a_msg->raw_flags == m_id && a_msg->m_seq == m_seq) {
     263             :                         msg_allocator_free(msg);
     264             :                         msg_allocator_free(a_msg);
     265             :                         array_get_at(early_antis, i) = array_peek(early_antis);
     266             :                         --array_count(early_antis);
     267             :                         return true;
     268             :                 }
     269             :         }
     270             :         return false;
     271             : }
     272             : 
     273           0 : void process_msg(void)
     274             : {
     275             :         struct lp_msg *msg = msg_queue_extract();
     276             :         if (unlikely(!msg)) {
     277             :                 current_lp = NULL;
     278             :                 return;
     279             :         }
     280             : 
     281             :         gvt_on_msg_extraction(msg->dest_t);
     282             : 
     283             :         struct lp_ctx *this_lp = &lps[msg->dest];
     284             :         struct process_data *proc_p = &this_lp->p;
     285             :         current_lp = this_lp;
     286             : 
     287             :         uint32_t flags = atomic_fetch_add_explicit(&msg->flags,
     288             :                         MSG_FLAG_PROCESSED, memory_order_relaxed);
     289             : 
     290             :         if (unlikely(flags & MSG_FLAG_ANTI)) {
     291             :                 if (flags > (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
     292             :                         handle_remote_anti_msg(proc_p, msg);
     293             :                  } else if (flags == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
     294             :                         array_count_t past_i = match_anti_msg(proc_p, msg);
     295             :                         do_rollback(proc_p, past_i);
     296             :                         termination_on_lp_rollback(msg->dest_t);
     297             :                         msg_allocator_free(msg);
     298             :                 }
     299             :                 return;
     300             :         }
     301             : 
     302             :         if (unlikely(check_early_anti_messages(msg)))
     303             :                 return;
     304             : 
     305             :         if (unlikely(!msg_is_before(array_peek(proc_p->past_msgs), msg))) {
     306             :                 array_count_t past_i = match_straggler_msg(proc_p, msg);
     307             :                 do_rollback(proc_p, past_i);
     308             :                 termination_on_lp_rollback(msg->dest_t);
     309             :         }
     310             : 
     311             :         array_push(proc_p->sent_msgs, NULL);
     312             :         array_push(proc_p->past_msgs, msg);
     313             : 
     314             :         stats_time_start(STATS_MSG_PROCESSED);
     315             :         ProcessEvent_pr(
     316             :                 msg->dest,
     317             :                 msg->dest_t,
     318             :                 msg->m_type,
     319             :                 msg->pl,
     320             :                 msg->pl_size,
     321             :                 this_lp->lib_ctx_p->state_s
     322             :         );
     323             :         stats_time_take(STATS_MSG_PROCESSED);
     324             : 
     325             :         model_allocator_checkpoint_take(array_count(proc_p->past_msgs) - 1);
     326             :         termination_on_msg_process(msg->dest_t);
     327             : }

Generated by: LCOV version 1.14