LCOV - code coverage report
Current view: top level - core/src/lp - process.c Hit Total Coverage
Test: ROOT-Sim develop Documentation Coverage Lines: 4 15 26.7 %
Date: 2021-03-02 11:24:52

          Line data    Source code
       1           1 : /**
       2             :  * @file lp/process.c
       3             :  *
       4             :  * @brief LP state management functions
       5             :  *
       6             :  * LP state management functions
       7             :  *
       8             :  * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
       9             :  * SPDX-License-Identifier: GPL-3.0-only
      10             :  */
      11             : #include <lp/process.h>
      12             : 
      13             : #include <datatypes/msg_queue.h>
      14             : #include <distributed/mpi.h>
      15             : #include <gvt/gvt.h>
      16             : #include <log/stats.h>
      17             : #include <lp/lp.h>
      18             : #include <mm/msg_allocator.h>
      19             : #include <serial/serial.h>
      20             : 
      21           0 : static _Thread_local bool silent_processing = false;
      22             : 
      23           0 : void ScheduleNewEvent_pr(lp_id_t receiver, simtime_t timestamp,
      24             :         unsigned event_type, const void *payload, unsigned payload_size)
      25             : {
      26             :         if (unlikely(silent_processing))
      27             :                 return;
      28             : 
      29             :         struct process_data *proc_p = &current_lp->p;
      30             :         struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
      31             :                 payload, payload_size);
      32             : 
      33             : #if LOG_LEVEL <= LOG_DEBUG
      34             :         msg->send = current_lp - lps;
      35             :         msg->send_t = array_peek(proc_p->past_msgs)->dest_t;
      36             : #endif
      37             : 
      38             : #ifdef ROOTSIM_MPI
      39             :         nid_t dest_nid = lid_to_nid(receiver);
      40             :         if (dest_nid != nid) {
      41             :                 mpi_remote_msg_send(msg, dest_nid);
      42             :                 msg_allocator_free_at_gvt(msg);
      43             :         } else {
      44             : #else
      45             :         {
      46             : #endif
      47             :                 atomic_store_explicit(&msg->flags, 0U, memory_order_relaxed);
      48             :                 msg_queue_insert(msg);
      49             :         }
      50             :         array_push(proc_p->sent_msgs, msg);
      51             : }
      52             : 
      53           0 : void process_global_init(void)
      54             : {
      55             :         serial_model_init();
      56             : }
      57             : 
      58           0 : void process_global_fini(void)
      59             : {
      60             :         ProcessEvent(0, 0, MODEL_FINI, NULL, 0, NULL);
      61             : }
      62             : 
      63             : /**
      64             :  * @brief Initializes the processing module in the current LP
      65             :  */
      66           1 : void process_lp_init(void)
      67             : {
      68             :         struct lp_ctx *this_lp = current_lp;
      69             :         struct process_data *proc_p = &current_lp->p;
      70             : 
      71             :         array_init(proc_p->past_msgs);
      72             :         array_init(proc_p->sent_msgs);
      73             : 
      74             :         struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
      75             :                 NULL, 0U);
      76             : 
      77             :         array_push(proc_p->past_msgs, msg);
      78             :         array_push(proc_p->sent_msgs, NULL);
      79             :         ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
      80             : 
      81             :         model_allocator_checkpoint_next_force_full();
      82             :         model_allocator_checkpoint_take(0);
      83             : }
      84             : 
      85             : /**
      86             :  * @brief Deinitializes the LP by calling the model's DEINIT handler
      87             :  */
      88           1 : void process_lp_deinit(void)
      89             : {
      90             :         struct lp_ctx *this_lp = current_lp;
      91             :         ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
      92             :                            this_lp->lib_ctx_p->state_s);
      93             : }
      94             : 
      95             : /**
      96             :  * @brief Finalizes the processing module in the current LP
      97             :  */
      98           1 : void process_lp_fini(void)
      99             : {
     100             :         struct process_data *proc_p = &current_lp->p;
     101             : 
     102             :         array_fini(proc_p->sent_msgs);
     103             : 
     104             :         for (array_count_t i = 0; i < array_count(proc_p->past_msgs); ++i) {
     105             :                 msg_allocator_free(array_get_at(proc_p->past_msgs, i));
     106             :         }
     107             :         array_fini(proc_p->past_msgs);
     108             : }
     109             : 
     110           0 : static inline void silent_execution(struct process_data *proc_p,
     111             :                                     array_count_t last_i, array_count_t past_i)
     112             : {
     113             :         silent_processing = true;
     114             : 
     115             :         void *state_p = current_lp->lib_ctx_p->state_s;
     116             :         for (array_count_t k = last_i + 1; k <= past_i; ++k) {
     117             :                 const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
     118             :                 stats_time_start(STATS_MSG_SILENT);
     119             :                 ProcessEvent_pr(
     120             :                         msg->dest,
     121             :                         msg->dest_t,
     122             :                         msg->m_type,
     123             :                         msg->pl,
     124             :                         msg->pl_size,
     125             :                         state_p
     126             :                 );
     127             :                 stats_time_take(STATS_MSG_SILENT);
     128             :         }
     129             : 
     130             :         silent_processing = false;
     131             : }
     132             : 
     133           0 : static inline void send_anti_messages(struct process_data *proc_p,
     134             :                                       array_count_t past_i)
     135             : {
     136             :         array_count_t sent_i = array_count(proc_p->sent_msgs) - 1;
     137             :         array_count_t b = array_count(proc_p->past_msgs) - 1 - past_i;
     138             :         do {
     139             :                 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
     140             :                 b -= msg == NULL;
     141             :                 --sent_i;
     142             :         } while(b);
     143             : 
     144             :         for (array_count_t i = sent_i + 1; i < array_count(proc_p->sent_msgs);
     145             :                 ++i) {
     146             :                 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
     147             :                 if (!msg)
     148             :                         continue;
     149             : 
     150             : #ifdef ROOTSIM_MPI
     151             :                 nid_t dest_nid = lid_to_nid(msg->dest);
     152             :                 if (dest_nid != nid) {
     153             :                         mpi_remote_anti_msg_send(msg, dest_nid);
     154             :                 } else {
     155             : #else
     156             :                 {
     157             : #endif
     158             :                         int msg_status = atomic_fetch_add_explicit(&msg->flags,
     159             :                                 MSG_FLAG_ANTI, memory_order_relaxed);
     160             :                         if (msg_status & MSG_FLAG_PROCESSED)
     161             :                                 msg_queue_insert(msg);
     162             :                 }
     163             :         }
     164             :         array_count(proc_p->sent_msgs) = sent_i + 1;
     165             : }
     166             : 
     167           0 : static inline void reinsert_invalid_past_messages(struct process_data *proc_p,
     168             :                                                   array_count_t past_i)
     169             : {
     170             :         for (array_count_t i = past_i + 1; i < array_count(proc_p->past_msgs);
     171             :                 ++i) {
     172             :                 struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
     173             :                 int msg_status = atomic_fetch_add_explicit(&msg->flags,
     174             :                         -MSG_FLAG_PROCESSED, memory_order_relaxed);
     175             :                 if (!(msg_status & MSG_FLAG_ANTI))
     176             :                         msg_queue_insert(msg);
     177             :         }
     178             : 
     179             :         array_count(proc_p->past_msgs) = past_i + 1;
     180             : }
     181             : 
     182           0 : static void handle_rollback(struct process_data *proc_p, array_count_t past_i)
     183             : {
     184             :         stats_time_start(STATS_ROLLBACK);
     185             : 
     186             :         array_count_t last_i = model_allocator_checkpoint_restore(past_i);
     187             :         silent_execution(proc_p, last_i, past_i);
     188             :         send_anti_messages(proc_p, past_i);
     189             :         reinsert_invalid_past_messages(proc_p, past_i);
     190             : 
     191             :         stats_time_take(STATS_ROLLBACK);
     192             : }
     193             : 
     194           0 : static inline array_count_t match_anti_msg(
     195             :         const struct process_data *proc_p, const struct lp_msg *a_msg)
     196             : {
     197             :         array_count_t past_i = array_count(proc_p->past_msgs) - 1;
     198             :         while (array_get_at(proc_p->past_msgs, past_i) != a_msg) {
     199             :                 --past_i;
     200             :         }
     201             :         return past_i - 1;
     202             : }
     203             : 
     204           0 : static inline array_count_t match_straggler_msg(
     205             :         const struct process_data *proc_p, const struct lp_msg *s_msg)
     206             : {
     207             :         array_count_t past_i = array_count(proc_p->past_msgs) - 2;
     208             :         while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg)) {
     209             :                 --past_i;
     210             :         }
     211             :         return past_i;
     212             : }
     213             : 
     214           0 : void process_msg(void)
     215             : {
     216             :         struct lp_msg *msg = msg_queue_extract();
     217             :         if (unlikely(!msg)) {
     218             :                 current_lp = NULL;
     219             :                 return;
     220             :         }
     221             : 
     222             :         struct lp_ctx *this_lp = &lps[msg->dest];
     223             :         struct process_data *proc_p = &this_lp->p;
     224             :         current_lp = this_lp;
     225             : 
     226             :         int msg_status = atomic_fetch_add_explicit(&msg->flags,
     227             :                 MSG_FLAG_PROCESSED, memory_order_relaxed);
     228             :         if (unlikely(msg_status & MSG_FLAG_ANTI)) {
     229             :                 if (msg_status == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
     230             :                         array_count_t past_i = match_anti_msg(proc_p, msg);
     231             :                         handle_rollback(proc_p, past_i);
     232             :                         termination_on_lp_rollback(msg->dest_t);
     233             :                 }
     234             :                 msg_allocator_free(msg);
     235             :                 return;
     236             :         }
     237             : 
     238             :         if (unlikely(!msg_is_before(array_peek(proc_p->past_msgs), msg))) {
     239             :                 array_count_t past_i = match_straggler_msg(proc_p, msg);
     240             :                 handle_rollback(proc_p, past_i);
     241             :                 termination_on_lp_rollback(msg->dest_t);
     242             :         }
     243             : 
     244             :         array_push(proc_p->sent_msgs, NULL);
     245             :         array_push(proc_p->past_msgs, msg);
     246             : 
     247             :         stats_time_start(STATS_MSG_PROCESSED);
     248             :         ProcessEvent_pr(
     249             :                 msg->dest,
     250             :                 msg->dest_t,
     251             :                 msg->m_type,
     252             :                 msg->pl,
     253             :                 msg->pl_size,
     254             :                 this_lp->lib_ctx_p->state_s
     255             :         );
     256             :         stats_time_take(STATS_MSG_PROCESSED);
     257             : 
     258             :         model_allocator_checkpoint_take(array_count(proc_p->past_msgs) - 1);
     259             :         termination_on_msg_process(msg->dest_t);
     260             :         gvt_on_msg_process(msg->dest_t);
     261             : }

Generated by: LCOV version 1.14