The ROme OpTimistic Simulator  3.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
msg_queue.c
Go to the documentation of this file.
1 
17 #include <datatypes/msg_queue.h>
18 
19 #include <core/core.h>
20 #include <core/sync.h>
21 #include <datatypes/heap.h>
22 #include <lp/lp.h>
23 #include <mm/msg_allocator.h>
24 
25 #include <stdalign.h>
26 
28 struct msg_queue {
32  binary_heap(struct lp_msg *) q;
33 };
34 
36 static struct msg_queue *queues;
37 
41 #define mqueue(from, to) (&queues[to * n_threads + from])
42 
47 {
48  queues = mm_alloc(n_threads * n_threads * sizeof(*queues));
49 }
50 
54 void msg_queue_init(void)
55 {
56  rid_t i = n_threads;
57  while(i--) {
58  heap_init(mqueue(i, rid)->q);
59  spin_init(&(mqueue(i, rid)->lck));
60  }
61 }
62 
66 void msg_queue_fini(void)
67 {
68  rid_t i = n_threads;
69  while(i--) {
70  struct msg_queue *this_q = mqueue(i, rid);
71  array_count_t j = heap_count(this_q->q);
72  while(j--) {
73  struct lp_msg *msg = heap_items(this_q->q)[j];
74  if(!(atomic_load_explicit(&msg->flags,
75  memory_order_relaxed) & MSG_FLAG_PROCESSED))
76  msg_allocator_free(msg);
77  }
78  heap_fini(this_q->q);
79  }
80 }
81 
86 {
87  mm_free(queues);
88 }
89 
99 {
100  rid_t i = n_threads;
101  struct msg_queue *bid_q = mqueue(rid, rid);
102  struct lp_msg *msg = heap_count(bid_q->q) ? heap_min(bid_q->q) : NULL;
103 
104  while(i--) {
105  struct msg_queue *this_q = mqueue(i, rid);
106  if(!spin_trylock(&this_q->lck))
107  continue;
108 
109  if (heap_count(this_q->q) &&
110  (!msg || msg_is_before(heap_min(this_q->q), msg))) {
111  msg = heap_min(this_q->q);
112  bid_q = this_q;
113  }
114  spin_unlock(&this_q->lck);
115  }
116 
117  spin_lock(&bid_q->lck);
118 
119  if(likely(heap_count(bid_q->q)))
120  msg = heap_extract(bid_q->q, msg_is_before);
121  else
122  msg = NULL;
123 
124  spin_unlock(&bid_q->lck);
125  return msg;
126 }
127 
138 {
139  const rid_t t_cnt = n_threads;
140  simtime_t t_min = SIMTIME_MAX;
141  bool done[t_cnt];
142  memset(done, 0, sizeof(done));
143 
144  for(rid_t i = 0, r = t_cnt; r; i = (i + 1) % t_cnt){
145  if(done[i])
146  continue;
147 
148  struct msg_queue *this_q = mqueue(i, rid);
149  if(!spin_trylock(&this_q->lck))
150  continue;
151 
152  done[i] = true;
153  --r;
154  if (heap_count(this_q->q) && t_min >
155  heap_min(this_q->q)->dest_t) {
156  t_min = heap_min(this_q->q)->dest_t;
157  }
158  spin_unlock(&this_q->lck);
159  }
160 
161  return t_min;
162 }
163 
168 void msg_queue_insert(struct lp_msg *msg)
169 {
170  rid_t dest_rid = lid_to_rid(msg->dest);
171  struct msg_queue *this_q = mqueue(rid, dest_rid);
172  spin_lock(&this_q->lck);
173  heap_insert(this_q->q, msg_is_before, msg);
174  spin_unlock(&this_q->lck);
175 }
simtime_t
double simtime_t
The type used to represent logical time in the simulation.
Definition: core.h:62
heap_fini
#define heap_fini(self)
Finalizes an heap.
Definition: heap.h:32
lp_msg::flags
atomic_int flags
The flags to handle local anti messages.
Definition: msg.h:49
rid_t
unsigned rid_t
Used to identify in a node the computing resources (threads at the moment)
Definition: core.h:77
sync.h
Easier Synchronization primitives.
spin_lock
#define spin_lock(lck_p)
Locks a spinlock.
Definition: sync.h:40
CACHE_LINE_SIZE
#define CACHE_LINE_SIZE
The size of a cpu cache line.
Definition: core.h:44
binary_heap
static binary_heap(struct lp_msg *)
The messages queue of the serial runtime.
Definition: serial.c:39
spin_trylock
#define spin_trylock(lck_p)
Executes the trylock operation on a spinlock.
Definition: sync.h:52
mm_alloc
void * mm_alloc(size_t mem_size)
A version of the stdlib malloc() used internally.
Definition: mm.h:26
msg_queue::lck
spinlock_t lck
Synchronizes access to the queue.
Definition: msg_queue.c:30
msg_queue_init
void msg_queue_init(void)
Initializes the message queue for the current thread.
Definition: msg_queue.c:54
msg_queue_extract
struct lp_msg * msg_queue_extract(void)
Extracts the next message from the queue.
Definition: msg_queue.c:98
spin_init
#define spin_init(lck_p)
Initializes a spinlock.
Definition: sync.h:24
lp_msg
A model simulation message.
Definition: msg.h:34
n_threads
rid_t n_threads
The total number of MPI nodes in the simulation.
Definition: core.c:15
msg_allocator.h
Memory management functions for messages.
msg_queue_global_init
void msg_queue_global_init(void)
Initializes the message queue at the node level.
Definition: msg_queue.c:46
lp_msg::dest
lp_id_t dest
The id of the recipient LP.
Definition: msg.h:36
SIMTIME_MAX
#define SIMTIME_MAX
The maximum value of the logical simulation time, semantically never.
Definition: core.h:64
heap_insert
#define heap_insert(self, cmp_f, elem)
Inserts an element into the heap.
Definition: heap.h:47
lp.h
LP construction functions.
msg_queue
A queue synchronized by a spinlock.
Definition: msg_queue.c:28
array_count_t
uint_fast32_t array_count_t
The type used to handle dynamic arrays count of elements and capacity.
Definition: array.h:21
rid
__thread rid_t rid
The identifier of the thread.
Definition: core.c:16
msg_queue.h
Message queue datatype.
mm_free
void mm_free(void *ptr)
A version of the stdlib free() used internally.
Definition: mm.h:61
likely
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:57
queues
static struct msg_queue * queues
The queues matrix, linearized in a contiguous array.
Definition: msg_queue.c:36
msg_queue_insert
void msg_queue_insert(struct lp_msg *msg)
Inserts a message in the queue.
Definition: msg_queue.c:168
core.h
Core ROOT-Sim functionalities.
msg_queue_fini
void msg_queue_fini(void)
Finalizes the message queue for the current thread.
Definition: msg_queue.c:66
heap_extract
#define heap_extract(self, cmp_f)
Extracts an element from the heap.
Definition: heap.h:72
mqueue
#define mqueue(from, to)
Utility macro to fetch the correct inner queue.
Definition: msg_queue.c:41
msg_queue_global_fini
void msg_queue_global_fini(void)
Finalizes the message queue at the node level.
Definition: msg_queue.c:85
heap_init
#define heap_init(self)
Initializes an empty heap.
Definition: heap.h:24
msg_queue_time_peek
simtime_t msg_queue_time_peek(void)
Peeks the timestamp of the next message from the queue.
Definition: msg_queue.c:137
spin_unlock
#define spin_unlock(lck_p)
Unlocks a spinlock.
Definition: sync.h:59
spinlock_t
atomic_flag spinlock_t
The type of a spinlock, an efficient lock primitive in contended scenarios.
Definition: sync.h:18
heap.h
Heap datatype.