Line data Source code
1 1 : /**
2 : * @file log/stats.c
3 : *
4 : * @brief Statistics module
5 : *
6 : * All facilities to collect, gather, and dump statistics are implemented
7 : * in this module.
8 : *
9 : * SPDX-FileCopyrightText: 2008-2021 HPDCS Group <rootsim@googlegroups.com>
10 : * SPDX-License-Identifier: GPL-3.0-only
11 : */
12 : #include <log/stats.h>
13 :
14 : #include <arch/io.h>
15 : #include <arch/mem.h>
16 : #include <arch/timer.h>
17 : #include <core/arg_parse.h>
18 : #include <core/core.h>
19 : #include <distributed/mpi.h>
20 :
21 : #include <assert.h>
22 : #include <inttypes.h>
23 : #include <memory.h>
24 : #include <stdarg.h>
25 : #include <stdatomic.h>
26 : #include <stdint.h>
27 : #include <stdio.h>
28 :
29 0 : #define STATS_BUFFER_ENTRIES (1024)
30 0 : #define STD_DEV_POWER_2_EXP 5
31 0 : #define STATS_MAX_STRLEN 32U
32 :
33 : /// A container for statistics in a logical time period
34 1 : struct stats_thread {
35 : /// Real elapsed time in microseconds from simulation beginning
36 1 : uint64_t rt;
37 : /// The array of statistics taken in the period
38 : struct stats_measure s[STATS_COUNT];
39 : };
40 :
41 0 : struct stats_node {
42 : /// The gvt value
43 1 : simtime_t gvt;
44 : /// The size in bytes of the resident set
45 1 : uint64_t rss;
46 : };
47 :
48 0 : struct stats_glob {
49 : /// The number of threads in this node
50 1 : uint64_t threads_count;
51 : /// The maximum size in bytes of the resident set
52 1 : uint64_t max_rss;
53 : /// When stats global init has been called
54 1 : uint64_t glob_init_rt;
55 : /// The latest ts before having to aggregate stats
56 1 : uint64_t glob_fini_rt;
57 : /// The timestamps of the relevant simulation life-cycle events
58 1 : uint64_t timestamps[STATS_GLOBAL_COUNT];
59 : };
60 :
61 : static_assert(sizeof(struct stats_measure) == 24 &&
62 : sizeof(struct stats_thread) == 8 + 24 * STATS_COUNT &&
63 : sizeof(struct stats_node) == 16 &&
64 : sizeof(struct stats_glob) == 32 + 8 * (STATS_GLOBAL_COUNT),
65 : "structs aren't naturally packed, parsing may be difficult");
66 :
67 : /// The statistics names, used to fill in the header of the final csv
68 1 : const char * const s_names[] = {
69 : [STATS_ROLLBACK] = "rollbacks",
70 : [STATS_GVT] = "gvt",
71 : [STATS_MSG_SILENT] = "silent messages",
72 : [STATS_MSG_PROCESSED] = "processed messages"
73 : };
74 :
75 0 : static timer_uint sim_start_ts;
76 : static struct stats_glob stats_glob_cur;
77 :
78 0 : static FILE *stats_node_tmp;
79 0 : static FILE **stats_tmps;
80 0 : static __thread struct stats_thread stats_cur;
81 :
82 0 : static __thread timer_uint last_ts[STATS_COUNT];
83 :
84 0 : static void file_write_chunk(FILE *f, const void *data, size_t data_size)
85 : {
86 : if (unlikely(fwrite(data, data_size, 1, f) != 1))
87 : log_log(LOG_ERROR, "Error during disk write!");
88 : }
89 :
90 0 : static void *file_memory_load(FILE *f, int64_t *f_size_p)
91 : {
92 : fseek(f, 0, SEEK_END);
93 : long f_size = ftell(f); // Fails horribly for files bigger than 2 GB
94 : fseek(f, 0, SEEK_SET);
95 : void *ret = mm_alloc(f_size);
96 : if (fread(ret, f_size, 1, f) != 1) {
97 : mm_free(ret);
98 : *f_size_p = 0;
99 : return NULL;
100 : }
101 : *f_size_p = f_size;
102 : return ret;
103 : }
104 :
105 : /**
106 : * @brief A version of fopen() which accepts a printf style format string
107 : * @param open_type a string which controls how the file is opened (see fopen())
108 : * @param fmt the file name expressed as a printf style format string
109 : * @param ... the list of additional arguments used in @a fmt (see printf())
110 : */
111 1 : static FILE *file_open(const char *open_type, const char *fmt, ...)
112 : {
113 : va_list args, args_cp;
114 : va_start(args, fmt);
115 : va_copy(args_cp, args);
116 :
117 : size_t l = vsnprintf(NULL, 0, fmt, args_cp) + 1;
118 : va_end(args_cp);
119 :
120 : char *f_name = mm_alloc(l);
121 : vsnprintf(f_name, l, fmt, args);
122 : va_end(args);
123 :
124 : FILE *ret = fopen(f_name, open_type);
125 : if (ret == NULL)
126 : log_log(LOG_ERROR, "Unable to open \"%s\" in %s mode", f_name,
127 : open_type);
128 :
129 : mm_free(f_name);
130 : return ret;
131 : }
132 :
133 : /**
134 : * @brief Initializes the internal timer used to take accurate measurements
135 : */
136 1 : void stats_global_time_start(void)
137 : {
138 : sim_start_ts = timer_new();
139 : }
140 :
141 : /**
142 : * @brief Initializes the internal timer used to take accurate measurements
143 : */
144 1 : void stats_global_time_take(enum stats_global_time this_stat)
145 : {
146 : stats_glob_cur.timestamps[this_stat] = timer_value(sim_start_ts);
147 : }
148 :
149 : /**
150 : * @brief Initializes the stats subsystem in the node
151 : */
152 1 : void stats_global_init(void)
153 : {
154 : stats_glob_cur.glob_init_rt = timer_value(sim_start_ts);
155 : stats_glob_cur.threads_count = n_threads;
156 : if (mem_stat_setup() < 0)
157 : log_log(LOG_ERROR, "Unable to extract memory statistics!");
158 :
159 : stats_node_tmp = io_file_tmp_get();
160 : setvbuf(stats_node_tmp, NULL, _IOFBF,
161 : STATS_BUFFER_ENTRIES * sizeof(struct stats_node));
162 :
163 : stats_tmps = mm_alloc(n_threads * sizeof(*stats_tmps));
164 : }
165 :
166 : /**
167 : * @brief Initializes the stats subsystem in the current thread
168 : */
169 1 : void stats_init(void)
170 : {
171 : stats_tmps[rid] = io_file_tmp_get();
172 : setvbuf(stats_tmps[rid], NULL, _IOFBF,
173 : STATS_BUFFER_ENTRIES * sizeof(stats_cur));
174 : }
175 :
176 0 : static void stats_files_receive(FILE *o)
177 : {
178 : for (nid_t j = 1; j < n_nodes; ++j) {
179 : int buf_size;
180 : struct stats_glob *sg_p = mpi_blocking_data_rcv(&buf_size, j);
181 : file_write_chunk(o, sg_p, buf_size);
182 : uint64_t iters = sg_p->threads_count + 1; // +1 for node stats
183 : mm_free(sg_p);
184 :
185 : for (uint64_t i = 0; i < iters; ++i) {
186 : void *buf = mpi_blocking_data_rcv(&buf_size, j);
187 : int64_t f_size = buf_size;
188 : file_write_chunk(o, &f_size, sizeof(f_size));
189 : file_write_chunk(o, buf, buf_size);
190 : mm_free(buf);
191 : }
192 : }
193 : }
194 :
195 0 : static void stats_files_send(void)
196 : {
197 : stats_glob_cur.max_rss = mem_stat_rss_max_get();
198 : stats_glob_cur.glob_fini_rt = timer_value(sim_start_ts);
199 : mpi_blocking_data_send(&stats_glob_cur, sizeof(stats_glob_cur), 0);
200 :
201 : int64_t f_size;
202 : void *f_buf = file_memory_load(stats_node_tmp, &f_size);
203 : f_size = min(INT_MAX, f_size);
204 : mpi_blocking_data_send(f_buf, f_size, 0);
205 : mm_free(f_buf);
206 :
207 : for (rid_t i = 0; i < n_threads; ++i) {
208 : f_buf = file_memory_load(stats_tmps[i], &f_size);
209 : f_size = min(INT_MAX, f_size);
210 : mpi_blocking_data_send(f_buf, f_size, 0);
211 : mm_free(f_buf);
212 : }
213 : }
214 :
215 0 : static void stats_file_final_write(FILE *o)
216 : {
217 : uint16_t endian_check = 61455U; // 0xFOOF
218 : file_write_chunk(o, &endian_check, sizeof(endian_check));
219 :
220 : int64_t n = STATS_COUNT;
221 : file_write_chunk(o, &n, sizeof(n));
222 : for (int i = 0; i < STATS_COUNT; ++i) {
223 : size_t l = min(strlen(s_names[i]), STATS_MAX_STRLEN);
224 : file_write_chunk(o, s_names[i], l);
225 : unsigned char nul = 0;
226 : for (; l < STATS_MAX_STRLEN; ++l)
227 : file_write_chunk(o, &nul, 1);
228 : }
229 :
230 : n = n_nodes;
231 : file_write_chunk(o, &n, sizeof(n));
232 :
233 : stats_glob_cur.max_rss = mem_stat_rss_max_get();
234 : stats_glob_cur.glob_fini_rt = timer_value(sim_start_ts);
235 : file_write_chunk(o, &stats_glob_cur, sizeof(stats_glob_cur));
236 :
237 : int64_t buf_size;
238 : void *buf = file_memory_load(stats_node_tmp, &buf_size);
239 : file_write_chunk(o, &buf_size, sizeof(buf_size));
240 : file_write_chunk(o, buf, buf_size);
241 : mm_free(buf);
242 :
243 : for (rid_t i = 0; i < n_threads; ++i) {
244 : buf = file_memory_load(stats_tmps[i], &buf_size);
245 : file_write_chunk(o, &buf_size, sizeof(buf_size));
246 : file_write_chunk(o, buf, buf_size);
247 : mm_free(buf);
248 : }
249 : }
250 :
251 : /**
252 : * @brief Finalizes the stats subsystem in the node
253 : *
254 : * When finalizing this subsystem the master node formats and dumps his
255 : * statistics from his temporary files onto the final csv. Then, in a
256 : * distributed setting, he receives the slaves temporary files, formatting and
257 : * dumping their statistics as well.
258 : */
259 1 : void stats_global_fini(void)
260 : {
261 : mpi_node_barrier();
262 : if (nid) {
263 : stats_files_send();
264 : return;
265 : }
266 :
267 : FILE *o = file_open("w", "%s_stats.bin", arg_parse_program_name());
268 : if (o == NULL) {
269 : log_log(LOG_WARN, "Unavailable stats file: stats will be dumped on stdout");
270 : o = stdout;
271 : }
272 :
273 : stats_file_final_write(o);
274 : stats_files_receive(o);
275 :
276 : fflush(o);
277 : if (o != stdout)
278 : fclose(o);
279 :
280 : for (rid_t i = 0; i < n_threads; ++i)
281 : fclose(stats_tmps[i]);
282 :
283 : mm_free(stats_tmps);
284 : fclose(stats_node_tmp);
285 : }
286 :
287 0 : void stats_time_start(enum stats_time this_stat)
288 : {
289 : last_ts[this_stat] = timer_new();
290 : }
291 :
292 0 : void stats_time_take(enum stats_time this_stat)
293 : {
294 : struct stats_measure *s_mes = &stats_cur.s[this_stat];
295 : const uint64_t t = timer_value(last_ts[this_stat]);
296 :
297 : if (likely(s_mes->count)) {
298 : const int64_t num = (t * s_mes->count - s_mes->sum_t);
299 : s_mes->var_t += ((num * num) << (2 * STD_DEV_POWER_2_EXP)) /
300 : (s_mes->count * (s_mes->count + 1));
301 : }
302 :
303 : s_mes->sum_t += t;
304 : s_mes->count++;
305 : }
306 :
307 0 : void stats_on_gvt(simtime_t gvt)
308 : {
309 : stats_cur.rt = timer_value(sim_start_ts);
310 :
311 : file_write_chunk(stats_tmps[rid], &stats_cur, sizeof(stats_cur));
312 : memset(&stats_cur, 0, sizeof(stats_cur));
313 :
314 : if (rid != 0)
315 : return;
316 :
317 : struct stats_node stats_node_cur =
318 : {.gvt = gvt, .rss = mem_stat_rss_current_get()};
319 : file_write_chunk(stats_node_tmp, &stats_node_cur, sizeof(stats_node_cur));
320 : memset(&stats_node_cur, 0, sizeof(stats_node_cur));
321 :
322 : printf("\rVirtual time: %lf", gvt);
323 : fflush(stdout);
324 : }
325 :
326 0 : void stats_dump(void)
327 : {
328 : puts("");
329 : fflush(stdout);
330 : }
331 :
332 0 : const struct stats_measure *stats_time_query(enum stats_time this_stat)
333 : {
334 : return &stats_cur.s[this_stat];
335 : }
|