Line data Source code
1 1 : /**
2 : * @file lp/process.c
3 : *
4 : * @brief LP state management functions
5 : *
6 : * LP state management functions
7 : *
8 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
9 : * SPDX-License-Identifier: GPL-3.0-only
10 : */
11 : #include <lp/process.h>
12 :
13 : #include <datatypes/msg_queue.h>
14 : #include <distributed/mpi.h>
15 : #include <gvt/gvt.h>
16 : #include <log/stats.h>
17 : #include <lp/lp.h>
18 : #include <mm/msg_allocator.h>
19 : #include <serial/serial.h>
20 :
21 0 : static _Thread_local bool silent_processing = false;
22 :
23 0 : void ScheduleNewEvent_pr(lp_id_t receiver, simtime_t timestamp,
24 : unsigned event_type, const void *payload, unsigned payload_size)
25 : {
26 : if (unlikely(silent_processing))
27 : return;
28 :
29 : struct process_data *proc_p = ¤t_lp->p;
30 : struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
31 : payload, payload_size);
32 :
33 : #if LOG_LEVEL <= LOG_DEBUG
34 : msg->send = current_lp - lps;
35 : msg->send_t = array_peek(proc_p->past_msgs)->dest_t;
36 : #endif
37 :
38 : #ifdef ROOTSIM_MPI
39 : nid_t dest_nid = lid_to_nid(receiver);
40 : if (dest_nid != nid) {
41 : mpi_remote_msg_send(msg, dest_nid);
42 : msg_allocator_free_at_gvt(msg);
43 : } else {
44 : #else
45 : {
46 : #endif
47 : atomic_store_explicit(&msg->flags, 0U, memory_order_relaxed);
48 : msg_queue_insert(msg);
49 : }
50 : array_push(proc_p->sent_msgs, msg);
51 : }
52 :
53 0 : void process_global_init(void)
54 : {
55 : serial_model_init();
56 : }
57 :
58 0 : void process_global_fini(void)
59 : {
60 : ProcessEvent(0, 0, MODEL_FINI, NULL, 0, NULL);
61 : }
62 :
63 : /**
64 : * @brief Initializes the processing module in the current LP
65 : */
66 1 : void process_lp_init(void)
67 : {
68 : struct lp_ctx *this_lp = current_lp;
69 : struct process_data *proc_p = ¤t_lp->p;
70 :
71 : array_init(proc_p->past_msgs);
72 : array_init(proc_p->sent_msgs);
73 :
74 : struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
75 : NULL, 0U);
76 :
77 : array_push(proc_p->past_msgs, msg);
78 : array_push(proc_p->sent_msgs, NULL);
79 : ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
80 :
81 : model_allocator_checkpoint_next_force_full();
82 : model_allocator_checkpoint_take(0);
83 : }
84 :
85 : /**
86 : * @brief Deinitializes the LP by calling the model's DEINIT handler
87 : */
88 1 : void process_lp_deinit(void)
89 : {
90 : struct lp_ctx *this_lp = current_lp;
91 : ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
92 : this_lp->lib_ctx_p->state_s);
93 : }
94 :
95 : /**
96 : * @brief Finalizes the processing module in the current LP
97 : */
98 1 : void process_lp_fini(void)
99 : {
100 : struct process_data *proc_p = ¤t_lp->p;
101 :
102 : array_fini(proc_p->sent_msgs);
103 :
104 : for (array_count_t i = 0; i < array_count(proc_p->past_msgs); ++i) {
105 : msg_allocator_free(array_get_at(proc_p->past_msgs, i));
106 : }
107 : array_fini(proc_p->past_msgs);
108 : }
109 :
110 0 : static inline void silent_execution(struct process_data *proc_p,
111 : array_count_t last_i, array_count_t past_i)
112 : {
113 : silent_processing = true;
114 :
115 : void *state_p = current_lp->lib_ctx_p->state_s;
116 : for (array_count_t k = last_i + 1; k <= past_i; ++k) {
117 : const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
118 : stats_time_start(STATS_MSG_SILENT);
119 : ProcessEvent_pr(
120 : msg->dest,
121 : msg->dest_t,
122 : msg->m_type,
123 : msg->pl,
124 : msg->pl_size,
125 : state_p
126 : );
127 : stats_time_take(STATS_MSG_SILENT);
128 : }
129 :
130 : silent_processing = false;
131 : }
132 :
133 0 : static inline void send_anti_messages(struct process_data *proc_p,
134 : array_count_t past_i)
135 : {
136 : array_count_t sent_i = array_count(proc_p->sent_msgs) - 1;
137 : array_count_t b = array_count(proc_p->past_msgs) - 1 - past_i;
138 : do {
139 : struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
140 : b -= msg == NULL;
141 : --sent_i;
142 : } while(b);
143 :
144 : for (array_count_t i = sent_i + 1; i < array_count(proc_p->sent_msgs);
145 : ++i) {
146 : struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
147 : if (!msg)
148 : continue;
149 :
150 : #ifdef ROOTSIM_MPI
151 : nid_t dest_nid = lid_to_nid(msg->dest);
152 : if (dest_nid != nid) {
153 : mpi_remote_anti_msg_send(msg, dest_nid);
154 : } else {
155 : #else
156 : {
157 : #endif
158 : int msg_status = atomic_fetch_add_explicit(&msg->flags,
159 : MSG_FLAG_ANTI, memory_order_relaxed);
160 : if (msg_status & MSG_FLAG_PROCESSED)
161 : msg_queue_insert(msg);
162 : }
163 : }
164 : array_count(proc_p->sent_msgs) = sent_i + 1;
165 : }
166 :
167 0 : static inline void reinsert_invalid_past_messages(struct process_data *proc_p,
168 : array_count_t past_i)
169 : {
170 : for (array_count_t i = past_i + 1; i < array_count(proc_p->past_msgs);
171 : ++i) {
172 : struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
173 : int msg_status = atomic_fetch_add_explicit(&msg->flags,
174 : -MSG_FLAG_PROCESSED, memory_order_relaxed);
175 : if (!(msg_status & MSG_FLAG_ANTI))
176 : msg_queue_insert(msg);
177 : }
178 :
179 : array_count(proc_p->past_msgs) = past_i + 1;
180 : }
181 :
182 0 : static void handle_rollback(struct process_data *proc_p, array_count_t past_i)
183 : {
184 : stats_time_start(STATS_ROLLBACK);
185 :
186 : array_count_t last_i = model_allocator_checkpoint_restore(past_i);
187 : silent_execution(proc_p, last_i, past_i);
188 : send_anti_messages(proc_p, past_i);
189 : reinsert_invalid_past_messages(proc_p, past_i);
190 :
191 : stats_time_take(STATS_ROLLBACK);
192 : }
193 :
194 0 : static inline array_count_t match_anti_msg(
195 : const struct process_data *proc_p, const struct lp_msg *a_msg)
196 : {
197 : array_count_t past_i = array_count(proc_p->past_msgs) - 1;
198 : while (array_get_at(proc_p->past_msgs, past_i) != a_msg) {
199 : --past_i;
200 : }
201 : return past_i - 1;
202 : }
203 :
204 0 : static inline array_count_t match_straggler_msg(
205 : const struct process_data *proc_p, const struct lp_msg *s_msg)
206 : {
207 : array_count_t past_i = array_count(proc_p->past_msgs) - 2;
208 : while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg)) {
209 : --past_i;
210 : }
211 : return past_i;
212 : }
213 :
214 0 : void process_msg(void)
215 : {
216 : struct lp_msg *msg = msg_queue_extract();
217 : if (unlikely(!msg)) {
218 : current_lp = NULL;
219 : return;
220 : }
221 :
222 : struct lp_ctx *this_lp = &lps[msg->dest];
223 : struct process_data *proc_p = &this_lp->p;
224 : current_lp = this_lp;
225 :
226 : int msg_status = atomic_fetch_add_explicit(&msg->flags,
227 : MSG_FLAG_PROCESSED, memory_order_relaxed);
228 : if (unlikely(msg_status & MSG_FLAG_ANTI)) {
229 : if (msg_status == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
230 : array_count_t past_i = match_anti_msg(proc_p, msg);
231 : handle_rollback(proc_p, past_i);
232 : termination_on_lp_rollback(msg->dest_t);
233 : }
234 : msg_allocator_free(msg);
235 : return;
236 : }
237 :
238 : if (unlikely(!msg_is_before(array_peek(proc_p->past_msgs), msg))) {
239 : array_count_t past_i = match_straggler_msg(proc_p, msg);
240 : handle_rollback(proc_p, past_i);
241 : termination_on_lp_rollback(msg->dest_t);
242 : }
243 :
244 : array_push(proc_p->sent_msgs, NULL);
245 : array_push(proc_p->past_msgs, msg);
246 :
247 : stats_time_start(STATS_MSG_PROCESSED);
248 : ProcessEvent_pr(
249 : msg->dest,
250 : msg->dest_t,
251 : msg->m_type,
252 : msg->pl,
253 : msg->pl_size,
254 : this_lp->lib_ctx_p->state_s
255 : );
256 : stats_time_take(STATS_MSG_PROCESSED);
257 :
258 : model_allocator_checkpoint_take(array_count(proc_p->past_msgs) - 1);
259 : termination_on_msg_process(msg->dest_t);
260 : gvt_on_msg_process(msg->dest_t);
261 : }
|