23 #define MAX_LOAD_FACTOR 0.95
24 #define HB_LCK ((uintptr_t)1U)
26 typedef uint_fast32_t map_size_t;
60 static __attribute__((
const)) inline map_size_t msg_id_hash(uintptr_t msg_id)
62 return (msg_id ^ (msg_id >> 32)) * 0xbf58476d1ce4e5b9ULL;
65 void remote_msg_map_global_init(
void)
67 map_size_t cnt =
n_threads * 2 / (1 - MAX_LOAD_FACTOR);
68 map_size_t cap = 1ULL << (
sizeof(cnt) * CHAR_BIT -
intrinsics_clz(cnt));
71 atomic_store_explicit(&re_map.
count, cap * MAX_LOAD_FACTOR,
72 memory_order_relaxed);
74 memset(re_map.
nodes, 0,
sizeof(*re_map.
nodes) * cap);
77 void remote_msg_map_global_fini(
void)
82 inline static void remote_msg_map_lock_all(
void)
86 memset(done, 0,
sizeof(done));
91 for (
rid_t i = 0, r = t_cnt; r; i = (i + 1) % t_cnt) {
101 inline static void remote_msg_map_unlock_all(
void)
104 for (
rid_t i = 0; i < t_cnt; ++i) {
109 void remote_msg_map_fossil_collect(
simtime_t current_gvt)
112 static atomic_int re_map_bar = 0;
113 static map_size_t cap;
115 if (!atomic_fetch_add_explicit(&re_map_bar, 1U, memory_order_acquire)) {
116 remote_msg_map_lock_all();
118 old_nodes = re_map.
nodes;
120 atomic_fetch_sub_explicit(&re_map_bar,
n_threads,
121 memory_order_release);
125 atomic_store_explicit(&re_map.
count, cap * MAX_LOAD_FACTOR,
126 memory_order_relaxed);
127 remote_msg_map_unlock_all();
129 while (atomic_load_explicit(&re_map_bar,
130 memory_order_acquire) > 0)
153 static void msg_map_size_increase(
void)
156 const map_size_t cmo = old_cmo * 2 + 1;
160 memset(nodes, 0,
sizeof(*nodes) * (cmo + 1));
162 remote_msg_map_lock_all();
164 atomic_fetch_add_explicit(&re_map.
count, old_cmo * MAX_LOAD_FACTOR,
165 memory_order_release);
167 for (map_size_t j = 0; j <= old_cmo; ++j) {
173 map_size_t i = msg_id_hash(atomic_load_explicit(&cnode->
msg_id,
174 memory_order_relaxed)) & cmo;
177 map_size_t tdib = (cmo + 1 + i - (msg_id_hash(
178 atomic_load_explicit(&nodes[i].
msg_id,
179 memory_order_relaxed)) & cmo)) & cmo;
184 memcpy(&tmp_node, cnode,
sizeof(*cnode));
185 memcpy(cnode, &nodes[i],
sizeof(*cnode));
186 memcpy(&nodes[i], &tmp_node,
sizeof(*cnode));
191 memcpy(&nodes[i], cnode,
sizeof(*cnode));
194 re_map.
nodes = nodes;
196 remote_msg_map_unlock_all();
205 struct lp_msg *cmsg = msg;
208 uintptr_t cd =
msg_id | HB_LCK;
209 map_size_t i = msg_id_hash(
msg_id);
220 uintptr_t td = atomic_load_explicit(&n[i].
msg_id,
221 memory_order_relaxed);
225 if (
unlikely(!atomic_compare_exchange_weak_explicit(
226 &n[i].
msg_id, &td, cd, memory_order_acquire,
227 memory_order_relaxed))) {
228 goto retry_zero_check;
233 atomic_fetch_sub_explicit(&n[i].
msg_id, HB_LCK,
234 memory_order_release);
236 int c = atomic_fetch_add_explicit(&re_map.
count, -1,
237 memory_order_relaxed);
243 while (atomic_load_explicit(
245 memory_order_acquire) <= 0);
247 msg_map_size_increase();
254 if (td == (cd & ~HB_LCK)) {
255 if (
unlikely(!atomic_compare_exchange_weak_explicit(
257 memory_order_acquire, memory_order_relaxed))) {
267 atomic_fetch_sub_explicit(&n[i].
msg_id, HB_LCK,
268 memory_order_release);
271 int flags = atomic_fetch_add_explicit(
272 &cmsg->
flags, MSG_FLAG_ANTI,
273 memory_order_relaxed);
274 if (flags & MSG_FLAG_PROCESSED)
279 atomic_fetch_sub_explicit(&n[i].
msg_id, HB_LCK,
280 memory_order_release);
285 tdib = (cmo + 1 + i - (msg_id_hash(td) & cmo)) & cmo;
287 if (
unlikely(!atomic_compare_exchange_weak_explicit(
288 &n[i].
msg_id, &td, cd, memory_order_acquire,
289 memory_order_relaxed))) {
291 goto retry_dib_check;
299 atomic_fetch_sub_explicit(&n[i].
msg_id, HB_LCK,
300 memory_order_release);