The ROme OpTimistic Simulator  3.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
process.c
Go to the documentation of this file.
1 
11 #include <lp/process.h>
12 
13 #include <datatypes/msg_queue.h>
14 #include <distributed/mpi.h>
15 #include <gvt/gvt.h>
16 #include <log/stats.h>
17 #include <lp/lp.h>
18 #include <mm/msg_allocator.h>
19 #include <serial/serial.h>
20 
21 static _Thread_local bool silent_processing = false;
22 
23 void ScheduleNewEvent_pr(lp_id_t receiver, simtime_t timestamp,
24  unsigned event_type, const void *payload, unsigned payload_size)
25 {
26  if (unlikely(silent_processing))
27  return;
28 
29  struct process_data *proc_p = &current_lp->p;
30  struct lp_msg *msg = msg_allocator_pack(receiver, timestamp, event_type,
31  payload, payload_size);
32 
33 #if LOG_LEVEL <= LOG_DEBUG
34  msg->send = current_lp - lps;
35  msg->send_t = array_peek(proc_p->past_msgs)->dest_t;
36 #endif
37 
38 #ifdef ROOTSIM_MPI
39  nid_t dest_nid = lid_to_nid(receiver);
40  if (dest_nid != nid) {
41  mpi_remote_msg_send(msg, dest_nid);
42  msg_allocator_free_at_gvt(msg);
43  } else {
44 #else
45  {
46 #endif
47  atomic_store_explicit(&msg->flags, 0U, memory_order_relaxed);
48  msg_queue_insert(msg);
49  }
50  array_push(proc_p->sent_msgs, msg);
51 }
52 
53 void process_global_init(void)
54 {
55  serial_model_init();
56 }
57 
58 void process_global_fini(void)
59 {
60  ProcessEvent(0, 0, MODEL_FINI, NULL, 0, NULL);
61 }
62 
66 void process_lp_init(void)
67 {
68  struct lp_ctx *this_lp = current_lp;
69  struct process_data *proc_p = &current_lp->p;
70 
71  array_init(proc_p->past_msgs);
72  array_init(proc_p->sent_msgs);
73 
74  struct lp_msg *msg = msg_allocator_pack(this_lp - lps, 0, LP_INIT,
75  NULL, 0U);
76 
77  array_push(proc_p->past_msgs, msg);
78  array_push(proc_p->sent_msgs, NULL);
79  ProcessEvent_pr(this_lp - lps, 0, LP_INIT, NULL, 0, NULL);
80 
81  model_allocator_checkpoint_next_force_full();
82  model_allocator_checkpoint_take(0);
83 }
84 
89 {
90  struct lp_ctx *this_lp = current_lp;
91  ProcessEvent_pr(this_lp - lps, 0, LP_FINI, NULL, 0,
92  this_lp->lib_ctx_p->state_s);
93 }
94 
98 void process_lp_fini(void)
99 {
100  struct process_data *proc_p = &current_lp->p;
101 
102  array_fini(proc_p->sent_msgs);
103 
104  for (array_count_t i = 0; i < array_count(proc_p->past_msgs); ++i) {
105  msg_allocator_free(array_get_at(proc_p->past_msgs, i));
106  }
107  array_fini(proc_p->past_msgs);
108 }
109 
110 static inline void silent_execution(struct process_data *proc_p,
111  array_count_t last_i, array_count_t past_i)
112 {
113  silent_processing = true;
114 
115  void *state_p = current_lp->lib_ctx_p->state_s;
116  for (array_count_t k = last_i + 1; k <= past_i; ++k) {
117  const struct lp_msg *msg = array_get_at(proc_p->past_msgs, k);
118  stats_time_start(STATS_MSG_SILENT);
119  ProcessEvent_pr(
120  msg->dest,
121  msg->dest_t,
122  msg->m_type,
123  msg->pl,
124  msg->pl_size,
125  state_p
126  );
127  stats_time_take(STATS_MSG_SILENT);
128  }
129 
130  silent_processing = false;
131 }
132 
133 static inline void send_anti_messages(struct process_data *proc_p,
134  array_count_t past_i)
135 {
136  array_count_t sent_i = array_count(proc_p->sent_msgs) - 1;
137  array_count_t b = array_count(proc_p->past_msgs) - 1 - past_i;
138  do {
139  struct lp_msg *msg = array_get_at(proc_p->sent_msgs, sent_i);
140  b -= msg == NULL;
141  --sent_i;
142  } while(b);
143 
144  for (array_count_t i = sent_i + 1; i < array_count(proc_p->sent_msgs);
145  ++i) {
146  struct lp_msg *msg = array_get_at(proc_p->sent_msgs, i);
147  if (!msg)
148  continue;
149 
150 #ifdef ROOTSIM_MPI
151  nid_t dest_nid = lid_to_nid(msg->dest);
152  if (dest_nid != nid) {
153  mpi_remote_anti_msg_send(msg, dest_nid);
154  } else {
155 #else
156  {
157 #endif
158  int msg_status = atomic_fetch_add_explicit(&msg->flags,
159  MSG_FLAG_ANTI, memory_order_relaxed);
160  if (msg_status & MSG_FLAG_PROCESSED)
161  msg_queue_insert(msg);
162  }
163  }
164  array_count(proc_p->sent_msgs) = sent_i + 1;
165 }
166 
167 static inline void reinsert_invalid_past_messages(struct process_data *proc_p,
168  array_count_t past_i)
169 {
170  for (array_count_t i = past_i + 1; i < array_count(proc_p->past_msgs);
171  ++i) {
172  struct lp_msg *msg = array_get_at(proc_p->past_msgs, i);
173  int msg_status = atomic_fetch_add_explicit(&msg->flags,
174  -MSG_FLAG_PROCESSED, memory_order_relaxed);
175  if (!(msg_status & MSG_FLAG_ANTI))
176  msg_queue_insert(msg);
177  }
178 
179  array_count(proc_p->past_msgs) = past_i + 1;
180 }
181 
182 static void handle_rollback(struct process_data *proc_p, array_count_t past_i)
183 {
184  stats_time_start(STATS_ROLLBACK);
185 
186  array_count_t last_i = model_allocator_checkpoint_restore(past_i);
187  silent_execution(proc_p, last_i, past_i);
188  send_anti_messages(proc_p, past_i);
189  reinsert_invalid_past_messages(proc_p, past_i);
190 
191  stats_time_take(STATS_ROLLBACK);
192 }
193 
194 static inline array_count_t match_anti_msg(
195  const struct process_data *proc_p, const struct lp_msg *a_msg)
196 {
197  array_count_t past_i = array_count(proc_p->past_msgs) - 1;
198  while (array_get_at(proc_p->past_msgs, past_i) != a_msg) {
199  --past_i;
200  }
201  return past_i - 1;
202 }
203 
204 static inline array_count_t match_straggler_msg(
205  const struct process_data *proc_p, const struct lp_msg *s_msg)
206 {
207  array_count_t past_i = array_count(proc_p->past_msgs) - 2;
208  while (!msg_is_before(array_get_at(proc_p->past_msgs, past_i), s_msg)) {
209  --past_i;
210  }
211  return past_i;
212 }
213 
214 void process_msg(void)
215 {
216  struct lp_msg *msg = msg_queue_extract();
217  if (unlikely(!msg)) {
218  current_lp = NULL;
219  return;
220  }
221 
222  struct lp_ctx *this_lp = &lps[msg->dest];
223  struct process_data *proc_p = &this_lp->p;
224  current_lp = this_lp;
225 
226  int msg_status = atomic_fetch_add_explicit(&msg->flags,
227  MSG_FLAG_PROCESSED, memory_order_relaxed);
228  if (unlikely(msg_status & MSG_FLAG_ANTI)) {
229  if (msg_status == (MSG_FLAG_ANTI | MSG_FLAG_PROCESSED)) {
230  array_count_t past_i = match_anti_msg(proc_p, msg);
231  handle_rollback(proc_p, past_i);
232  termination_on_lp_rollback(msg->dest_t);
233  }
234  msg_allocator_free(msg);
235  return;
236  }
237 
238  if (unlikely(!msg_is_before(array_peek(proc_p->past_msgs), msg))) {
239  array_count_t past_i = match_straggler_msg(proc_p, msg);
240  handle_rollback(proc_p, past_i);
241  termination_on_lp_rollback(msg->dest_t);
242  }
243 
244  array_push(proc_p->sent_msgs, NULL);
245  array_push(proc_p->past_msgs, msg);
246 
247  stats_time_start(STATS_MSG_PROCESSED);
248  ProcessEvent_pr(
249  msg->dest,
250  msg->dest_t,
251  msg->m_type,
252  msg->pl,
253  msg->pl_size,
254  this_lp->lib_ctx_p->state_s
255  );
256  stats_time_take(STATS_MSG_PROCESSED);
257 
258  model_allocator_checkpoint_take(array_count(proc_p->past_msgs) - 1);
259  termination_on_msg_process(msg->dest_t);
260  gvt_on_msg_process(msg->dest_t);
261 }
simtime_t
double simtime_t
The type used to represent logical time in the simulation.
Definition: core.h:62
lp_msg::flags
atomic_int flags
The flags to handle local anti messages.
Definition: msg.h:49
mpi_remote_anti_msg_send
void mpi_remote_anti_msg_send(struct lp_msg *msg, nid_t dest_nid)
Sends a model anti-message to a LP residing on another node.
Definition: mpi.c:163
nid
nid_t nid
The node identifier of the node.
Definition: core.c:20
lp_msg::m_type
uint_fast32_t m_type
The message type, a user controlled field.
Definition: msg.h:40
nid_t
int nid_t
Used to identify MPI nodes in a distributed environment.
Definition: core.h:79
process_data
The message processing data produced by the LP.
Definition: process.h:17
lp_ctx::p
struct process_data p
The message processing context of this LP.
Definition: lp.h:27
process_lp_init
void process_lp_init(void)
Initializes the processing module in the current LP.
Definition: process.c:66
msg_queue_extract
struct lp_msg * msg_queue_extract(void)
Extracts the next message from the queue.
Definition: msg_queue.c:98
lp_msg
A model simulation message.
Definition: msg.h:34
ProcessEvent
void ProcessEvent(lp_id_t me, simtime_t now, unsigned event_type, const void *content, unsigned size, void *state)
The total number of threads running in the node.
Definition: application.c:21
msg_allocator.h
Memory management functions for messages.
lp_msg::dest
lp_id_t dest
The id of the recipient LP.
Definition: msg.h:36
lp_ctx
A complete LP context.
Definition: lp.h:21
process.h
LP state management functions.
stats.h
Statistics module.
lp.h
LP construction functions.
mpi.h
MPI Support Module.
lp_ctx::lib_ctx_p
struct lib_ctx * lib_ctx_p
The additional libraries context of this LP.
Definition: lp.h:25
array_count_t
uint_fast32_t array_count_t
The type used to handle dynamic arrays count of elements and capacity.
Definition: array.h:21
msg_queue.h
Message queue datatype.
array_peek
#define array_peek(self)
Gets the current capacity of a dynamic array.
Definition: array.h:59
mpi_remote_msg_send
void mpi_remote_msg_send(struct lp_msg *msg, nid_t dest_nid)
Sends a model message to a LP residing on another node.
Definition: mpi.c:138
serial.h
Sequential simlation engine.
process_lp_fini
void process_lp_fini(void)
Finalizes the processing module in the current LP.
Definition: process.c:98
lp_msg::pl
unsigned char pl[48]
The initial part of the payload.
Definition: msg.h:54
msg_queue_insert
void msg_queue_insert(struct lp_msg *msg)
Inserts a message in the queue.
Definition: msg_queue.c:168
array_count
#define array_count(self)
Gets the count of contained element in a dynamic array.
Definition: array.h:47
lp_id_t
uint64_t lp_id_t
Used to uniquely identify LPs in the simulation.
Definition: core.h:75
lp_msg::pl_size
uint_fast32_t pl_size
The message payload size.
Definition: msg.h:46
process_lp_deinit
void process_lp_deinit(void)
Deinitializes the LP by calling the model's DEINIT handler.
Definition: process.c:88
unlikely
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:59
gvt.h
Global Virtual Time.
lp_msg::dest_t
simtime_t dest_t
The intended destination logical time of this message.
Definition: msg.h:38