@@ -239,6 +239,62 @@ namespace asynchost
239239 return snapshot;
240240 }
241241
242+ #define THROW_ON_ERROR (x, name ) \
243+ do \
244+ { \
245+ auto rc = x; \
246+ if (rc == -1 ) \
247+ { \
248+ throw std::runtime_error (fmt::format ( \
249+ " Error ({}) writing snapshot {} in " #x, strerror (errno), name)); \
250+ } \
251+ } while (0 )
252+
253+ struct AsyncSnapshotSyncAndRename
254+ {
255+ // Inputs, populated at construction
256+ const std::filesystem::path dir;
257+ const std::string tmp_file_name;
258+ const int snapshot_fd;
259+
260+ // Outputs, populated by callback
261+ std::string committed_file_name = {};
262+ };
263+
264+ static void on_snapshot_sync_and_rename (uv_work_t * req)
265+ {
266+ auto data = static_cast <AsyncSnapshotSyncAndRename*>(req->data );
267+
268+ {
269+ asynchost::TimeBoundLogger log_if_slow (
270+ fmt::format (" Committing snapshot - fsync({})" , data->tmp_file_name ));
271+ fsync (data->snapshot_fd );
272+ }
273+
274+ close (data->snapshot_fd );
275+
276+ // e.g. snapshot_100_105.committed
277+ data->committed_file_name =
278+ fmt::format (" {}{}" , data->tmp_file_name , snapshot_committed_suffix);
279+ const auto full_committed_path = data->dir / data->committed_file_name ;
280+
281+ const auto full_tmp_path = data->dir / data->tmp_file_name ;
282+ files::rename (full_tmp_path, full_committed_path);
283+ }
284+
285+ static void on_snapshot_sync_and_rename_complete (uv_work_t * req, int status)
286+ {
287+ auto data = static_cast <AsyncSnapshotSyncAndRename*>(req->data );
288+
289+ LOG_INFO_FMT (
290+ " Renamed temporary snapshot {} to {}" ,
291+ data->tmp_file_name ,
292+ data->committed_file_name );
293+
294+ delete data;
295+ delete req;
296+ }
297+
242298 void commit_snapshot (
243299 ::consensus::Index snapshot_idx,
244300 const uint8_t * receipt_data,
@@ -288,42 +344,40 @@ namespace asynchost
288344 {
289345 const auto & snapshot = it->second .snapshot ;
290346
291- #define THROW_ON_ERROR (x ) \
292- do \
293- { \
294- auto rc = x; \
295- if (rc == -1 ) \
296- { \
297- throw std::runtime_error (fmt::format ( \
298- " Error ({}) writing snapshot {} in " #x, errno, file_name)); \
299- } \
300- } while (0 )
301-
302347 THROW_ON_ERROR (
303- write (snapshot_fd, snapshot->data (), snapshot->size ()));
304- THROW_ON_ERROR (write (snapshot_fd, receipt_data, receipt_size));
305-
306- THROW_ON_ERROR (fsync (snapshot_fd));
307- THROW_ON_ERROR (close (snapshot_fd));
308-
309- #undef THROW_ON_ERROR
348+ write (snapshot_fd, snapshot->data (), snapshot->size ()),
349+ file_name);
350+ THROW_ON_ERROR (
351+ write (snapshot_fd, receipt_data, receipt_size), file_name);
310352
311353 LOG_INFO_FMT (
312- " New snapshot file written to {} [{} bytes]" ,
354+ " New snapshot file written to {} [{} bytes] (unsynced) " ,
313355 file_name,
314356 snapshot->size () + receipt_size);
315357
316- // e.g. snapshot_100_105.committed
317- const auto committed_file_name =
318- fmt::format (" {}{}" , file_name, snapshot_committed_suffix);
319- const auto full_committed_path =
320- snapshot_dir / committed_file_name;
358+ // Call fsync and rename on a worker-thread via uv async, as they
359+ // may be slow
360+ uv_work_t * work_handle = new uv_work_t ;
321361
322- files::rename (full_snapshot_path, full_committed_path);
323- LOG_INFO_FMT (
324- " Renamed temporary snapshot {} to committed {}" ,
325- file_name,
326- committed_file_name);
362+ {
363+ auto * data = new AsyncSnapshotSyncAndRename{
364+ .dir = snapshot_dir,
365+ .tmp_file_name = file_name,
366+ .snapshot_fd = snapshot_fd};
367+
368+ work_handle->data = data;
369+ }
370+
371+ #ifdef TEST_MODE_EXECUTE_SYNC_INLINE
372+ on_snapshot_sync_and_rename (work_handle);
373+ on_snapshot_sync_and_rename_complete (work_handle, 0 );
374+ #else
375+ uv_queue_work (
376+ uv_default_loop (),
377+ work_handle,
378+ &on_snapshot_sync_and_rename,
379+ &on_snapshot_sync_and_rename_complete);
380+ #endif
327381 }
328382
329383 pending_snapshots.erase (it);
@@ -342,6 +396,7 @@ namespace asynchost
342396 e.what ());
343397 }
344398 }
399+ #undef THROW_ON_ERROR
345400
346401 std::optional<std::pair<fs::path, fs::path>>
347402 find_latest_committed_snapshot ()
0 commit comments