Line data Source code
1 1 : /**
2 : * @file lp/process.c
3 : *
4 : * @brief LP state management functions
5 : *
6 : * LP state management functions
7 : *
8 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
9 : * SPDX-License-Identifier: GPL-3.0-only
10 : */
11 : #include <lp/process.h>
12 :
13 : #include <datatypes/msg_queue.h>
14 : #include <distributed/mpi.h>
15 : #include <gvt/gvt.h>
16 : #include <log/stats.h>
17 : #include <lp/lp.h>
18 : #include <mm/msg_allocator.h>
19 : #include <serial/serial.h>
20 :
21 0 : static _Thread_local bool silent_processing = false;
22 0 : static _Thread_local dyn_array(struct lp_msg *) early_antis;
23 :
24 : void ScheduleNewEvent_pr(lp_id_t receiver, simtime_t timestamp,
25 : unsigned event_type, const void *payload, unsigned payload_size)
26 : {
27 : if (unlikely(silent_processing))
28 : return;
29 :
30 : struct process_data *proc_p = ¤t_lp->p;
31 : struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
32 : payload, payload_size);
33 :
34 : #if LOG_LEVEL <= LOG_DEBUG
35 : msg->send = current_lp - lps;
36 : msg->send_t = array_peek(proc_p->past_msgs)->dest_t;
37 : #endif
38 :
39 : nid_t dest_nid = lid_to_nid(receiver);
40 : if (dest_nid != nid) {
41 : mpi_remote_msg_send(msg, dest_nid);
42 : msg_allocator_free_at_gvt(msg);
43 : } else {
44 : atomic_store_explicit(&msg->flags, 0U, memory_order_relaxed);
45 : msg_queue_insert(msg);
46 : }
47 : array_push(proc_p->sent_msgs, msg);
48 : }
49 :
50 0 : void process_global_init(void)
51 : {
52 : serial_model_init();
53 : }
54 :
55 0 : void process_global_fini(void)
56 : {
57 : ProcessEvent(0, 0, MODEL_FINI, NULL, 0, NULL);
58 : }
59 :
60 0 : void process_init(void)
61 : {
62 : array_init(early_antis);
63 : }
64 :
65 0 : void process_fini(void)
66 : {
67 : array_fini(early_antis);
68 : }
69 :
70 : /**
71 : * @brief Initializes the processing module in the current LP
72 : */
73 1 : void process_lp_init(void)
74 : {
75 : struct lp_ctx *this_lp = current_lp;
76 : struct process_data *proc_p = ¤t_lp->p;
77 :
78 : array_init(proc_p->past_msgs);
79 : array_init(proc_p->sent_msgs);
80 :
81 : struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
82 : NULL, 0U);
83 :
84 : array_push(proc_p->past_msgs, msg);
85 : array_push(proc_p->sent_msgs, NULL);
86 : ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
87 :
88 : model_allocator_checkpoint_next_force_full();
89 : model_allocator_checkpoint_take(0);
90 : }
91 :
92 : /**
93 : * @brief Deinitializes the LP by calling the model's DEINIT handler
94 : */
95 1 : void process_lp_deinit(void)
96 : {
97 : struct lp_ctx *this_lp = current_lp;
98 : ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
99 : this_lp->lib_ctx_p->state_s);
100 : }
101 :
102 : /**
103 : * @brief Finalizes the processing module in the current LP
104 : */
105 1 : void process_lp_fini(void)
106 : {
107 : struct process_data *proc_p = ¤t_lp->p;
108 :
109 : array_fini(proc_p->sent_msgs);
110 :
111 : for (array_count_t i = 0; i < array_count(proc_p->past_msgs); ++i) {
112 : struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
113 : uint32_t flags = atomic_load_explicit(&msg->flags,
114 : memory_order_relaxed);
115 : if (!(flags & MSG_FLAG_ANTI))
116 : msg_allocator_free(msg);
117 : }
118 : array_fini(proc_p->past_msgs);
119 : }
120 :
121 0 : static inline void silent_execution(struct process_data *proc_p,
122 : array_count_t last_i, array_count_t past_i)
123 : {
124 : silent_processing = true;
125 :
126 : void *state_p = current_lp->lib_ctx_p->state_s;
127 : for (array_count_t k = last_i + 1; k < past_i; ++k) {
128 : const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
129 : stats_time_start(STATS_MSG_SILENT);
130 : ProcessEvent_pr(
131 : msg->dest,
132 : msg->dest_t,
133 : msg->m_type,
134 : msg->pl,
135 : msg->pl_size,
136 : state_p
137 : );
138 : stats_time_take(STATS_MSG_SILENT);
139 : }
140 :
141 : silent_processing = false;
142 : }
143 :
144 0 : static inline void send_anti_messages(struct process_data *proc_p,
145 : array_count_t past_i)
146 : {
147 : array_count_t sent_i = array_count(proc_p->sent_msgs) - 1;
148 : array_count_t b = array_count(proc_p->past_msgs) - past_i;
149 : do {
150 : struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
151 : b -= msg == NULL;
152 : --sent_i;
153 : } while(b);
154 :
155 : for (array_count_t i = sent_i + 1; i < array_count(proc_p->sent_msgs);
156 : ++i) {
157 : struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
158 : if (!msg)
159 : continue;
160 :
161 : nid_t dest_nid = lid_to_nid(msg->dest);
162 : if (dest_nid != nid) {
163 : mpi_remote_anti_msg_send(msg, dest_nid);
164 : } else {
165 : int msg_status = atomic_fetch_add_explicit(&msg->flags,
166 : MSG_FLAG_ANTI, memory_order_relaxed);
167 : if (msg_status & MSG_FLAG_PROCESSED)
168 : msg_queue_insert(msg);
169 : }
170 : }
171 : array_count(proc_p->sent_msgs) = sent_i + 1;
172 : }
173 :
174 0 : static inline void reinsert_invalid_past_messages(struct process_data *proc_p,
175 : array_count_t past_i)
176 : {
177 : for (array_count_t i = past_i; i < array_count(proc_p->past_msgs);
178 : ++i) {
179 : struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
180 : int msg_status = atomic_fetch_add_explicit(&msg->flags,
181 : -MSG_FLAG_PROCESSED, memory_order_relaxed);
182 : if (!(msg_status & MSG_FLAG_ANTI))
183 : msg_queue_insert(msg);
184 : }
185 :
186 : array_count(proc_p->past_msgs) = past_i;
187 : }
188 :
189 0 : static void do_rollback(struct process_data *proc_p, array_count_t past_i)
190 : {
191 : stats_time_start(STATS_ROLLBACK);
192 :
193 : array_count_t last_i = model_allocator_checkpoint_restore(past_i - 1);
194 : silent_execution(proc_p, last_i, past_i);
195 : send_anti_messages(proc_p, past_i);
196 : reinsert_invalid_past_messages(proc_p, past_i);
197 :
198 : stats_time_take(STATS_ROLLBACK);
199 : }
200 :
201 0 : static inline array_count_t match_anti_msg(const struct process_data *proc_p,
202 : const struct lp_msg *a_msg)
203 : {
204 : array_count_t past_i = array_count(proc_p->past_msgs) - 1;
205 : while (array_get_at(proc_p->past_msgs, past_i) != a_msg)
206 : --past_i;
207 : return past_i;
208 : }
209 :
210 0 : static inline array_count_t match_straggler_msg(
211 : const struct process_data *proc_p, const struct lp_msg *s_msg)
212 : {
213 : array_count_t past_i = array_count(proc_p->past_msgs) - 2;
214 : while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg))
215 : --past_i;
216 : return past_i + 1;
217 : }
218 :
219 0 : static inline array_count_t match_remote_msg(const struct process_data *proc_p,
220 : const struct lp_msg *r_msg)
221 : {
222 : uint32_t m_id = r_msg->raw_flags >> 2, m_seq = r_msg->m_seq;
223 : array_count_t past_i = array_count(proc_p->past_msgs) - 1;
224 : while (past_i) {
225 : struct lp_msg *msg = array_get_at(proc_p->past_msgs, past_i);
226 : if (msg->raw_flags >> 2 == m_id && msg->m_seq == m_seq) {
227 : msg->raw_flags |= MSG_FLAG_ANTI;
228 : break;
229 : }
230 : --past_i;
231 : }
232 :
233 : return past_i;
234 : }
235 :
236 0 : static inline void handle_remote_anti_msg(struct process_data *proc_p,
237 : struct lp_msg *msg)
238 : {
239 : array_count_t past_i = match_remote_msg(proc_p, msg);
240 : if (unlikely(!past_i)) {
241 : msg->raw_flags >>= 2;
242 : array_push(early_antis, msg);
243 : return;
244 : }
245 : struct lp_msg *old_msg = array_get_at (proc_p->past_msgs, past_i);
246 : do_rollback(proc_p, past_i);
247 : termination_on_lp_rollback(msg->dest_t);
248 : msg_allocator_free(old_msg);
249 : msg_allocator_free(msg);
250 : }
251 :
252 0 : static inline bool check_early_anti_messages(struct lp_msg *msg)
253 : {
254 : uint32_t m_id;
255 : if (likely(!array_count(early_antis) || !(m_id = msg->raw_flags >> 2)))
256 : return false;
257 : uint32_t m_seq = msg->m_seq;
258 : if (!m_id)
259 : return false;
260 : for (array_count_t i = 0; i < array_count(early_antis); ++i) {
261 : struct lp_msg *a_msg = array_get_at(early_antis, i);
262 : if (a_msg->raw_flags == m_id && a_msg->m_seq == m_seq) {
263 : msg_allocator_free(msg);
264 : msg_allocator_free(a_msg);
265 : array_get_at(early_antis, i) = array_peek(early_antis);
266 : --array_count(early_antis);
267 : return true;
268 : }
269 : }
270 : return false;
271 : }
272 :
273 0 : void process_msg(void)
274 : {
275 : struct lp_msg *msg = msg_queue_extract();
276 : if (unlikely(!msg)) {
277 : current_lp = NULL;
278 : return;
279 : }
280 :
281 : gvt_on_msg_extraction(msg->dest_t);
282 :
283 : struct lp_ctx *this_lp = &lps[msg->dest];
284 : struct process_data *proc_p = &this_lp->p;
285 : current_lp = this_lp;
286 :
287 : uint32_t flags = atomic_fetch_add_explicit(&msg->flags,
288 : MSG_FLAG_PROCESSED, memory_order_relaxed);
289 :
290 : if (unlikely(flags & MSG_FLAG_ANTI)) {
291 : if (flags > (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
292 : handle_remote_anti_msg(proc_p, msg);
293 : } else if (flags == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
294 : array_count_t past_i = match_anti_msg(proc_p, msg);
295 : do_rollback(proc_p, past_i);
296 : termination_on_lp_rollback(msg->dest_t);
297 : msg_allocator_free(msg);
298 : }
299 : return;
300 : }
301 :
302 : if (unlikely(check_early_anti_messages(msg)))
303 : return;
304 :
305 : if (unlikely(!msg_is_before(array_peek(proc_p->past_msgs), msg))) {
306 : array_count_t past_i = match_straggler_msg(proc_p, msg);
307 : do_rollback(proc_p, past_i);
308 : termination_on_lp_rollback(msg->dest_t);
309 : }
310 :
311 : array_push(proc_p->sent_msgs, NULL);
312 : array_push(proc_p->past_msgs, msg);
313 :
314 : stats_time_start(STATS_MSG_PROCESSED);
315 : ProcessEvent_pr(
316 : msg->dest,
317 : msg->dest_t,
318 : msg->m_type,
319 : msg->pl,
320 : msg->pl_size,
321 : this_lp->lib_ctx_p->state_s
322 : );
323 : stats_time_take(STATS_MSG_PROCESSED);
324 :
325 : model_allocator_checkpoint_take(array_count(proc_p->past_msgs) - 1);
326 : termination_on_msg_process(msg->dest_t);
327 : }
|