Skip to content

Commit d950184

Browse files
Fix empty committed snapshots: Restore behaviour of writing to temporary uncommitted file and renaming (#7029)
Co-authored-by: Amaury Chamayou <amaury@xargs.fr>
1 parent 0978efe commit d950184

4 files changed

Lines changed: 135 additions & 36 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
99

1010
[6.0.4]: https://github.com/microsoft/CCF/releases/tag/ccf-6.0.4
1111

12+
### Fixed
13+
14+
- CCF will no longer create in-progress snapshot files with a `.committed` suffix. It will only rename files to `.committed` when they are complete and ready for reading (#7029).
15+
1216
### Changed
1317

1418
- Templated URL parsing will no longer allow `:` within regex matched components, since `:` is already used to delimit actions. Concretely, a call to `GET .../state-digests/abcd:update` should now correctly return a 404, rather than dispatching to `GET .../state-digests/{memberId}` and returning `No ACK record exists for member m[abcd:update]`.

python/src/ccf/ledger.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ def is_ledger_chunk_committed(file_name):
8686
return file_name.endswith(COMMITTED_FILE_SUFFIX)
8787

8888

89+
def is_snapshot_file_committed(file_name):
90+
return file_name.endswith(COMMITTED_FILE_SUFFIX)
91+
92+
8993
def digest(data):
9094
return sha256(data).digest()
9195

@@ -817,14 +821,23 @@ def __init__(self, filename: str):
817821
self._filename = filename
818822
self._file_size = os.path.getsize(filename)
819823

824+
if self._file_size == 0:
825+
raise InvalidSnapshotException(f"{filename} is currently empty")
826+
820827
entry_start_pos = super()._read_header()
821828

822829
# 1.x snapshots do not include evidence
823830
if self.is_committed() and not self.is_snapshot_file_1_x():
824831
receipt_pos = entry_start_pos + self._header.size
825832
receipt_bytes = _peek_all(self._file, pos=receipt_pos)
826833

827-
receipt = json.loads(receipt_bytes.decode("utf-8"))
834+
try:
835+
receipt = json.loads(receipt_bytes.decode("utf-8"))
836+
except json.decoder.JSONDecodeError as e:
837+
raise InvalidSnapshotException(
838+
f"Cannot read receipt from snapshot {os.path.basename(self._filename)}: Receipt starts at {receipt_pos} (file is {self._file_size} bytes), and contains {receipt_bytes}"
839+
) from e
840+
828841
# Receipts included in snapshots always contain leaf components,
829842
# including a claims digest and commit evidence, from 2.0.0-rc0 onwards.
830843
# This verification code deliberately does not support snapshots
@@ -1178,3 +1191,7 @@ class UntrustedNodeException(Exception):
11781191

11791192
class UnknownTransaction(Exception):
11801193
"""The transaction at seqno does not exist in ledger"""
1194+
1195+
1196+
class InvalidSnapshotException(Exception):
1197+
"""The given snapshot file is invalid and cannot be parsed"""

src/snapshots/snapshot_manager.h

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -96,48 +96,77 @@ namespace snapshots
9696
{
9797
if (snapshot_idx == it->first)
9898
{
99-
// e.g. snapshot_100_105.committed
99+
// e.g. snapshot_100_105
100100
auto file_name = fmt::format(
101-
"{}{}{}{}{}{}",
101+
"{}{}{}{}{}",
102102
snapshot_file_prefix,
103103
snapshot_idx_delimiter,
104104
it->first,
105105
snapshot_idx_delimiter,
106-
it->second.evidence_idx,
107-
snapshot_committed_suffix);
106+
it->second.evidence_idx);
108107
auto full_snapshot_path = snapshot_dir / file_name;
109108

110-
if (fs::exists(full_snapshot_path))
109+
int snapshot_fd = open(
110+
full_snapshot_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0664);
111+
if (snapshot_fd == -1)
111112
{
112-
// In the case that a file with this name already exists, keep
113-
// existing file and drop pending snapshot
114-
LOG_FAIL_FMT(
115-
"Cannot write snapshot as file already exists: {}", file_name);
116-
}
117-
else
118-
{
119-
std::ofstream snapshot_file(
120-
full_snapshot_path, std::ios::app | std::ios::binary);
121-
if (!snapshot_file.good())
113+
if (errno == EEXIST)
122114
{
115+
// In the case that a file with this name already exists, keep
116+
// existing file and drop pending snapshot
123117
LOG_FAIL_FMT(
124-
"Cannot write snapshot: error opening file {}", file_name);
118+
"Cannot write snapshot as file already exists: {}",
119+
file_name);
125120
}
126121
else
127122
{
128-
const auto& snapshot = it->second.snapshot;
129-
snapshot_file.write(
130-
reinterpret_cast<const char*>(snapshot->data()),
131-
snapshot->size());
132-
snapshot_file.write(
133-
reinterpret_cast<const char*>(receipt_data), receipt_size);
134-
135-
LOG_INFO_FMT(
136-
"New snapshot file written to {} [{} bytes]",
137-
file_name,
138-
static_cast<size_t>(snapshot_file.tellp()));
123+
LOG_FAIL_FMT(
124+
"Cannot write snapshot: error ({}) opening file {}",
125+
errno,
126+
file_name);
139127
}
140128
}
129+
else
130+
{
131+
const auto& snapshot = it->second.snapshot;
132+
133+
#define THROW_ON_ERROR(x) \
134+
do \
135+
{ \
136+
auto rc = x; \
137+
if (rc == -1) \
138+
{ \
139+
throw std::runtime_error(fmt::format( \
140+
"Error ({}) writing snapshot {} in " #x, errno, file_name)); \
141+
} \
142+
} while (0)
143+
144+
THROW_ON_ERROR(
145+
write(snapshot_fd, snapshot->data(), snapshot->size()));
146+
THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size));
147+
148+
THROW_ON_ERROR(fsync(snapshot_fd));
149+
THROW_ON_ERROR(close(snapshot_fd));
150+
151+
#undef THROW_ON_ERROR
152+
153+
LOG_INFO_FMT(
154+
"New snapshot file written to {} [{} bytes]",
155+
file_name,
156+
snapshot->size() + receipt_size);
157+
158+
// e.g. snapshot_100_105.committed
159+
const auto committed_file_name =
160+
fmt::format("{}{}", file_name, snapshot_committed_suffix);
161+
const auto full_committed_path =
162+
snapshot_dir / committed_file_name;
163+
164+
files::rename(full_snapshot_path, full_committed_path);
165+
LOG_INFO_FMT(
166+
"Renamed temporary snapshot {} to committed {}",
167+
file_name,
168+
committed_file_name);
169+
}
141170

142171
pending_snapshots.erase(it);
143172

tests/e2e_operations.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from pycose.messages import Sign1Message
3131
import sys
3232
import pathlib
33+
import infra.concurrency
3334

3435
from loguru import logger as LOG
3536

@@ -57,14 +58,62 @@ def test_save_committed_ledger_files(network, args):
5758

5859

5960
def test_parse_snapshot_file(network, args):
60-
primary, _ = network.find_primary()
61-
network.txs.issue(network, number_txs=args.snapshot_tx_interval * 2)
62-
committed_snapshots_dir = network.get_committed_snapshots(primary)
63-
for snapshot in os.listdir(committed_snapshots_dir):
64-
with ccf.ledger.Snapshot(os.path.join(committed_snapshots_dir, snapshot)) as s:
65-
assert len(
66-
s.get_public_domain().get_tables()
67-
), "No public table in snapshot"
61+
class ReaderThread(infra.concurrency.StoppableThread):
62+
def __init__(self, network):
63+
super().__init__(name="reader")
64+
primary, _ = network.find_primary()
65+
self.snapshots_dir = os.path.join(
66+
primary.remote.remote.root,
67+
primary.remote.snapshots_dir_name,
68+
)
69+
70+
def run(self):
71+
seen = set()
72+
while not self.is_stopped():
73+
for snapshot in os.listdir(self.snapshots_dir):
74+
if (
75+
ccf.ledger.is_snapshot_file_committed(snapshot)
76+
and snapshot not in seen
77+
):
78+
seen.add(snapshot)
79+
with ccf.ledger.Snapshot(
80+
os.path.join(self.snapshots_dir, snapshot)
81+
) as s:
82+
assert len(
83+
s.get_public_domain().get_tables()
84+
), "No public table in snapshot"
85+
LOG.success(f"Successfully parsed snapshot: {snapshot}")
86+
LOG.info(f"Tested {len(seen)} snapshots")
87+
assert len(seen) > 0, "No snapshots seen, so this tested nothing"
88+
89+
class WriterThread(infra.concurrency.StoppableThread):
90+
def __init__(self, network, reader):
91+
super().__init__(name="writer")
92+
self.primary, _ = network.find_primary()
93+
self.member = network.consortium.get_any_active_member()
94+
self.reader = reader
95+
96+
def run(self):
97+
while not self.is_stopped() and self.reader.is_alive():
98+
self.member.update_ack_state_digest(self.primary)
99+
100+
reader_thread = ReaderThread(network)
101+
reader_thread.start()
102+
103+
writer_thread = WriterThread(network, reader_thread)
104+
writer_thread.start()
105+
106+
# When this test was added, the original failure was occurring 100% of the time within 0.5s.
107+
# This fix has been manually verified across multi-minute runs.
108+
# 5s is a plausible run-time in the CI, that should still provide convincing coverage.
109+
time.sleep(5)
110+
111+
writer_thread.stop()
112+
writer_thread.join()
113+
114+
reader_thread.stop()
115+
reader_thread.join()
116+
68117
return network
69118

70119

0 commit comments

Comments
 (0)