21 static _Thread_local
bool silent_processing =
false;
24 unsigned event_type,
const void *payload,
unsigned payload_size)
30 struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
31 payload, payload_size);
33 #if LOG_LEVEL <= LOG_DEBUG
34 msg->send = current_lp - lps;
35 msg->send_t =
array_peek(proc_p->past_msgs)->dest_t;
39 nid_t dest_nid = lid_to_nid(receiver);
40 if (dest_nid !=
nid) {
42 msg_allocator_free_at_gvt(msg);
47 atomic_store_explicit(&msg->
flags, 0U, memory_order_relaxed);
50 array_push(proc_p->sent_msgs, msg);
53 void process_global_init(
void)
58 void process_global_fini(
void)
68 struct lp_ctx *this_lp = current_lp;
71 array_init(proc_p->past_msgs);
72 array_init(proc_p->sent_msgs);
74 struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
77 array_push(proc_p->past_msgs, msg);
78 array_push(proc_p->sent_msgs, NULL);
79 ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
81 model_allocator_checkpoint_next_force_full();
82 model_allocator_checkpoint_take(0);
90 struct lp_ctx *this_lp = current_lp;
91 ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
102 array_fini(proc_p->sent_msgs);
105 msg_allocator_free(array_get_at(proc_p->past_msgs, i));
107 array_fini(proc_p->past_msgs);
110 static inline void silent_execution(
struct process_data *proc_p,
113 silent_processing =
true;
115 void *state_p = current_lp->
lib_ctx_p->state_s;
117 const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
118 stats_time_start(STATS_MSG_SILENT);
127 stats_time_take(STATS_MSG_SILENT);
130 silent_processing =
false;
133 static inline void send_anti_messages(
struct process_data *proc_p,
139 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
146 struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
152 if (dest_nid !=
nid) {
158 int msg_status = atomic_fetch_add_explicit(&msg->
flags,
159 MSG_FLAG_ANTI, memory_order_relaxed);
160 if (msg_status & MSG_FLAG_PROCESSED)
167 static inline void reinsert_invalid_past_messages(
struct process_data *proc_p,
172 struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
173 int msg_status = atomic_fetch_add_explicit(&msg->
flags,
174 -MSG_FLAG_PROCESSED, memory_order_relaxed);
175 if (!(msg_status & MSG_FLAG_ANTI))
184 stats_time_start(STATS_ROLLBACK);
186 array_count_t last_i = model_allocator_checkpoint_restore(past_i);
187 silent_execution(proc_p, last_i, past_i);
188 send_anti_messages(proc_p, past_i);
189 reinsert_invalid_past_messages(proc_p, past_i);
191 stats_time_take(STATS_ROLLBACK);
198 while (array_get_at(proc_p->past_msgs, past_i) != a_msg) {
208 while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg)) {
214 void process_msg(
void)
224 current_lp = this_lp;
226 int msg_status = atomic_fetch_add_explicit(&msg->
flags,
227 MSG_FLAG_PROCESSED, memory_order_relaxed);
228 if (
unlikely(msg_status & MSG_FLAG_ANTI)) {
229 if (msg_status == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
231 handle_rollback(proc_p, past_i);
232 termination_on_lp_rollback(msg->
dest_t);
234 msg_allocator_free(msg);
240 handle_rollback(proc_p, past_i);
241 termination_on_lp_rollback(msg->
dest_t);
244 array_push(proc_p->sent_msgs, NULL);
245 array_push(proc_p->past_msgs, msg);
247 stats_time_start(STATS_MSG_PROCESSED);
256 stats_time_take(STATS_MSG_PROCESSED);
258 model_allocator_checkpoint_take(
array_count(proc_p->past_msgs) - 1);
259 termination_on_msg_process(msg->
dest_t);
260 gvt_on_msg_process(msg->
dest_t);