The ROme OpTimistic Simulator  3.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
remote_msg_map.c
Go to the documentation of this file.
1 
12 
13 #include <core/intrinsics.h>
14 #include <core/sync.h>
15 #include <datatypes/msg_queue.h>
16 #include <gvt/gvt.h>
17 #include <lp/msg.h>
18 #include <mm/mm.h>
19 
20 #include <memory.h>
21 #include <stdalign.h>
22 
23 #define MAX_LOAD_FACTOR 0.95
24 #define HB_LCK ((uintptr_t)1U)
25 
26 typedef uint_fast32_t map_size_t;
27 
29 struct msg_map_node {
31  atomic_uintptr_t msg_id;
37  struct lp_msg *msg;
38 };
39 
41 struct msg_map {
42  alignas(CACHE_LINE_SIZE) struct {
46 
47  map_size_t capacity_mo;
49 
50  atomic_int count;
51  };
52  struct {
55  } locks[MAX_THREADS];
56 };
57 
58 static struct msg_map re_map;
59 
60 static __attribute__((const)) inline map_size_t msg_id_hash(uintptr_t msg_id)
61 {
62  return (msg_id ^ (msg_id >> 32)) * 0xbf58476d1ce4e5b9ULL; // @suppress("Avoid magic numbers")
63 }
64 
65 void remote_msg_map_global_init(void)
66 {
67  map_size_t cnt = n_threads * 2 / (1 - MAX_LOAD_FACTOR);
68  map_size_t cap = 1ULL << (sizeof(cnt) * CHAR_BIT - intrinsics_clz(cnt));
69  // capacity_mo is in the form 2^n - 1, modulo computations are then easy
70  re_map.capacity_mo = cap - 1;
71  atomic_store_explicit(&re_map.count, cap * MAX_LOAD_FACTOR,
72  memory_order_relaxed);
73  re_map.nodes = mm_alloc(sizeof(*re_map.nodes) * cap);
74  memset(re_map.nodes, 0, sizeof(*re_map.nodes) * cap);
75 }
76 
77 void remote_msg_map_global_fini(void)
78 {
79  mm_free(re_map.nodes);
80 }
81 
82 inline static void remote_msg_map_lock_all(void)
83 {
84  const rid_t t_cnt = n_threads;
85  bool done[t_cnt];
86  memset(done, 0, sizeof(done));
87 
88  static spinlock_t ll;
89 
90  spin_lock(&ll);
91  for (rid_t i = 0, r = t_cnt; r; i = (i + 1) % t_cnt) {
92  if(done[i] || !spin_trylock(&re_map.locks[i].l))
93  continue;
94 
95  done[i] = true;
96  --r;
97  }
98  spin_unlock(&ll);
99 }
100 
101 inline static void remote_msg_map_unlock_all(void)
102 {
103  const rid_t t_cnt = n_threads;
104  for (rid_t i = 0; i < t_cnt; ++i) {
105  spin_unlock(&re_map.locks[i].l);
106  }
107 }
108 
109 void remote_msg_map_fossil_collect(simtime_t current_gvt)
110 {
111  static struct msg_map_node *old_nodes = NULL;
112  static atomic_int re_map_bar = 0;
113  static map_size_t cap;
114 
115  if (!atomic_fetch_add_explicit(&re_map_bar, 1U, memory_order_acquire)) {
116  remote_msg_map_lock_all();
117  mm_free(old_nodes);
118  old_nodes = re_map.nodes;
119  cap = re_map.capacity_mo + 1;
120  atomic_fetch_sub_explicit(&re_map_bar, n_threads,
121  memory_order_release);
122 
123  re_map.nodes = mm_alloc(sizeof(struct msg_map_node) * cap);
124  memset(re_map.nodes, 0, sizeof(struct msg_map_node) * cap);
125  atomic_store_explicit(&re_map.count, cap * MAX_LOAD_FACTOR,
126  memory_order_relaxed);
127  remote_msg_map_unlock_all();
128  } else {
129  while (atomic_load_explicit(&re_map_bar,
130  memory_order_acquire) > 0)
131  spin_pause();
132  }
133 
134  bool big = (cap % n_threads) > rid;
135 
136  map_size_t cnt = cap / n_threads;
137  map_size_t off;
138  if (big) {
139  ++cnt;
140  off = cnt * rid;
141  } else {
142  off = cap - cnt * (n_threads - rid);
143  }
144 
145  while (cnt--) {
146  struct msg_map_node *node = &old_nodes[off + cnt];
147  if (node->msg_id && current_gvt <= node->until)
148  remote_msg_map_match(node->msg_id, node->msg_nid,
149  node->msg);
150  }
151 }
152 
153 static void msg_map_size_increase(void)
154 {
155  const map_size_t old_cmo = re_map.capacity_mo;
156  const map_size_t cmo = old_cmo * 2 + 1;
157  struct msg_map_node *old_nodes = re_map.nodes;
158  struct msg_map_node *nodes = mm_alloc(sizeof(*nodes) * (cmo + 1));
159 
160  memset(nodes, 0, sizeof(*nodes) * (cmo + 1));
161 
162  remote_msg_map_lock_all();
163 
164  atomic_fetch_add_explicit(&re_map.count, old_cmo * MAX_LOAD_FACTOR,
165  memory_order_release);
166 
167  for (map_size_t j = 0; j <= old_cmo; ++j) {
168  if(!old_nodes[j].msg_id)
169  continue;
170 
171  struct msg_map_node *cnode = &old_nodes[j];
172  map_size_t cdib = 0;
173  map_size_t i = msg_id_hash(atomic_load_explicit(&cnode->msg_id,
174  memory_order_relaxed)) & cmo;
175 
176  while (nodes[i].msg_id) {
177  map_size_t tdib = (cmo + 1 + i - (msg_id_hash(
178  atomic_load_explicit(&nodes[i].msg_id,
179  memory_order_relaxed)) & cmo)) & cmo;
180 
181  if (cdib > tdib) {
182  cdib = tdib;
183  struct msg_map_node tmp_node;
184  memcpy(&tmp_node, cnode, sizeof(*cnode));
185  memcpy(cnode, &nodes[i], sizeof(*cnode));
186  memcpy(&nodes[i], &tmp_node, sizeof(*cnode));
187  }
188  i = (i + 1) & cmo;
189  ++cdib;
190  }
191  memcpy(&nodes[i], cnode, sizeof(*cnode));
192  }
193  re_map.capacity_mo = cmo;
194  re_map.nodes = nodes;
195 
196  remote_msg_map_unlock_all();
197 
198  mm_free(old_nodes);
199 }
200 
201 void remote_msg_map_match(uintptr_t msg_id, nid_t m_nid, struct lp_msg *msg)
202 {
203  msg_id &= ~HB_LCK;
204  map_size_t cdib = 0;
205  struct lp_msg *cmsg = msg;
206  simtime_t cuntil = msg ? msg->dest_t : SIMTIME_MAX;
207  nid_t cnid = m_nid;
208  uintptr_t cd = msg_id | HB_LCK;
209  map_size_t i = msg_id_hash(msg_id);
210 
211  spinlock_t *lck = &re_map.locks[rid].l;
212  spin_lock(lck);
213 
214  struct msg_map_node *n = re_map.nodes;
215  const map_size_t cmo = re_map.capacity_mo;
216  i &= cmo;
217  // linear probing with robin hood hashing
218  // https://cs.uwaterloo.ca/research/tr/1986/CS-86-14.pdf by Pedro Celis
219  while (1) {
220  uintptr_t td = atomic_load_explicit(&n[i].msg_id,
221  memory_order_relaxed);
222 
223  retry_zero_check:
224  if (!td) {
225  if (unlikely(!atomic_compare_exchange_weak_explicit(
226  &n[i].msg_id, &td, cd, memory_order_acquire,
227  memory_order_relaxed))) {
228  goto retry_zero_check;
229  }
230  n[i].msg = cmsg;
231  n[i].until = cuntil;
232  n[i].msg_nid = cnid;
233  atomic_fetch_sub_explicit(&n[i].msg_id, HB_LCK,
234  memory_order_release);
235 
236  int c = atomic_fetch_add_explicit(&re_map.count, -1,
237  memory_order_relaxed);
238 
239  spin_unlock(lck);
240 
241  if (unlikely(c <= 0)) {
242  if (c) {
243  while (atomic_load_explicit(
244  &re_map.count,
245  memory_order_acquire) <= 0);
246  } else {
247  msg_map_size_increase();
248  }
249  }
250  return;
251  }
252  retry_eq_check:
253  td &= ~HB_LCK;
254  if (td == (cd & ~HB_LCK)) {
255  if (unlikely(!atomic_compare_exchange_weak_explicit(
256  &n[i].msg_id, &td, cd,
257  memory_order_acquire, memory_order_relaxed))) {
258  goto retry_eq_check;
259  }
260 
261  if (likely(cnid == n[i].msg_nid)){
262  if (likely(!cmsg))
263  cmsg = n[i].msg;
264 
265  n[i].until = 0;
266 
267  atomic_fetch_sub_explicit(&n[i].msg_id, HB_LCK,
268  memory_order_release);
269  spin_unlock(lck);
270 
271  int flags = atomic_fetch_add_explicit(
272  &cmsg->flags, MSG_FLAG_ANTI,
273  memory_order_relaxed);
274  if (flags & MSG_FLAG_PROCESSED)
275  msg_queue_insert(cmsg);
276 
277  return;
278  } else {
279  atomic_fetch_sub_explicit(&n[i].msg_id, HB_LCK,
280  memory_order_release);
281  }
282  }
283  map_size_t tdib;
284  retry_dib_check:
285  tdib = (cmo + 1 + i - (msg_id_hash(td) & cmo)) & cmo;
286  if (cdib > tdib) {
287  if (unlikely(!atomic_compare_exchange_weak_explicit(
288  &n[i].msg_id, &td, cd, memory_order_acquire,
289  memory_order_relaxed))) {
290  td &= ~HB_LCK;
291  goto retry_dib_check;
292  }
293  struct lp_msg *tmsg = n[i].msg;
294  simtime_t tuntil = n[i].until;
295  nid_t tnid = n[i].msg_nid;
296  n[i].msg = cmsg;
297  n[i].until = cuntil;
298  n[i].msg_nid = cnid;
299  atomic_fetch_sub_explicit(&n[i].msg_id, HB_LCK,
300  memory_order_release);
301  cmsg = tmsg;
302  cuntil = tuntil;
303  cnid = tnid;
304  cdib = tdib;
305  cd = td | HB_LCK;
306  }
307  i = (i + 1) & cmo;
308  ++cdib;
309  }
310 }
simtime_t
double simtime_t
The type used to represent logical time in the simulation.
Definition: core.h:62
lp_msg::flags
atomic_int flags
The flags to handle local anti messages.
Definition: msg.h:49
intrinsics_clz
#define intrinsics_clz(x)
Counts the leading zeros in a base 2 number.
Definition: intrinsics.h:47
rid_t
unsigned rid_t
Used to identify in a node the computing resources (threads at the moment)
Definition: core.h:77
msg_map::l
spinlock_t l
Synchronizes access from the worker threads.
Definition: remote_msg_map.c:54
msg_map_node::until
simtime_t until
The logical time after which it is safe to delete this entry.
Definition: remote_msg_map.c:35
sync.h
Easier Synchronization primitives.
nid_t
int nid_t
Used to identify MPI nodes in a distributed environment.
Definition: core.h:79
spin_lock
#define spin_lock(lck_p)
Locks a spinlock.
Definition: sync.h:40
CACHE_LINE_SIZE
#define CACHE_LINE_SIZE
The size of a cpu cache line.
Definition: core.h:44
msg_map_node::msg_id
atomic_uintptr_t msg_id
The id of the registered message.
Definition: remote_msg_map.c:31
spin_trylock
#define spin_trylock(lck_p)
Executes the trylock operation on a spinlock.
Definition: sync.h:52
MAX_THREADS
#define MAX_THREADS
The maximum number of supported threads.
Definition: core.h:71
mm_alloc
void * mm_alloc(size_t mem_size)
A version of the stdlib malloc() used internally.
Definition: mm.h:26
lp_msg
A model simulation message.
Definition: msg.h:34
n_threads
rid_t n_threads
The total number of MPI nodes in the simulation.
Definition: core.c:15
msg_map::capacity_mo
map_size_t capacity_mo
The current capacity of this hashmap minus one.
Definition: remote_msg_map.c:47
msg.h
Message management functions.
msg_map::nodes
struct msg_map_node * nodes
The array of hash map nodes.
Definition: remote_msg_map.c:44
msg_map_node
A bucket of the remote messages hash map.
Definition: remote_msg_map.c:29
SIMTIME_MAX
#define SIMTIME_MAX
The maximum value of the logical simulation time, semantically never.
Definition: core.h:64
rid
__thread rid_t rid
The identifier of the thread.
Definition: core.c:16
msg_queue.h
Message queue datatype.
mm_free
void mm_free(void *ptr)
A version of the stdlib free() used internally.
Definition: mm.h:61
msg_map_node::msg_nid
nid_t msg_nid
The node id of the sender of the registered message.
Definition: remote_msg_map.c:33
msg_map::count
atomic_int count
The count of entries which can still be inserted.
Definition: remote_msg_map.c:50
likely
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:57
remote_msg_map.h
Message map datatype.
msg_queue_insert
void msg_queue_insert(struct lp_msg *msg)
Inserts a message in the queue.
Definition: msg_queue.c:168
unlikely
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:59
msg_map
The message hash map used to register external messages.
Definition: remote_msg_map.c:41
intrinsics.h
Easier access to compiler extensions.
lp_msg::msg_id
uintptr_t msg_id
The message unique id, used for inter-node anti messages.
Definition: msg.h:51
gvt.h
Global Virtual Time.
lp_msg::dest_t
simtime_t dest_t
The intended destination logical time of this message.
Definition: msg.h:38
spin_unlock
#define spin_unlock(lck_p)
Unlocks a spinlock.
Definition: sync.h:59
spinlock_t
atomic_flag spinlock_t
The type of a spinlock, an efficient lock primitive in contended scenarios.
Definition: sync.h:18
msg_map_node::msg
struct lp_msg * msg
The registered message, NULL if it is an anti-message entry.
Definition: remote_msg_map.c:37
spin_pause
#define spin_pause()
Tells the compiler that we are inside a spin loop.
Definition: sync.h:33
mm.h
Memory Manager main header.