Skip to content

Commit 2d4e0e9

Browse files
subkanthixieandrew
andauthored
feat: Add externalized commit retries and commit lock using etcd distributed locks (#162)
Co-authored-by: Andrew Xie <dev@xie.is>
1 parent f112dd9 commit 2d4e0e9

26 files changed

Lines changed: 1149 additions & 29 deletions

File tree

examples/grafana/METRICS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ These metrics are reported by Iceberg clients when they perform operations on ta
5858
| `iceberg_commit_added_equality_deletes_total` | Counter | catalog, namespace, table, operation | Total number of equality deletes added in commits |
5959
| `iceberg_commit_total_files_size_bytes` | Counter | catalog, namespace, table, operation | Total size in bytes of files involved in commits |
6060
| `iceberg_commit_duration_seconds` | Histogram | catalog, namespace, table, operation | Duration of commit operations in seconds |
61+
| `iceberg_commit_retries_total` | Counter | catalog, namespace, table | Server-side retries after a commit CAS conflict (`CommitFailedException`) in the REST catalog commit loop; tune `commitRetry` in `.ice-rest-catalog.yaml` if this grows under parallel writers |
62+
| `iceberg_commit_lock_acquire_seconds` | Histogram | catalog | Time to acquire the etcd per-table commit lock (`commitLock` in `.ice-rest-catalog.yaml`; etcd backend only) |
63+
| `iceberg_commit_lock_held_seconds` | Histogram | catalog | Time the etcd commit lock was held during a table commit |
64+
| `iceberg_commit_lock_acquire_timeouts_total` | Counter | catalog | Acquire attempts that exceeded `commitLock.acquireTimeoutMs` (HTTP 503 to clients) |
6165

6266
#### Reporter Metrics
6367

examples/scratch/.ice-rest-catalog.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ bearerTokens:
1717

1818
anonymousAccess:
1919
enabled: true
20-
accessConfig: {}
20+
accessConfig: {}

ice-rest-catalog/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,19 @@ That's it.
1111

1212
Examples of `.ice-rest-catalog.yaml` (as well as Kubernetes deployment manifests) can be found [here](../examples/).
1313

14+
## Parallel writers (`commitLock`)
15+
16+
Many concurrent commits to the **same table** can cause repeated `CommitFailedException` (optimistic concurrency). For the **etcd** metastore you can serialize commits per table using etcd’s lock API:
17+
18+
```yaml
19+
commitLock:
20+
enabled: true
21+
leaseTtlSeconds: 30
22+
acquireTimeoutMs: 30000
23+
```
24+
25+
If `enabled` is true but the catalog backend is not etcd, the lock is ignored (warning in logs). When lock acquisition exceeds `acquireTimeoutMs`, the server responds with HTTP **503** so clients can retry.
26+
1427
## Documentation
1528

1629
- [Architecture](../docs/architecture.md) -- components, design principles, HA, backup/recovery

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.altinity.ice.rest.catalog.internal.aws.CredentialsProvider;
2020
import com.altinity.ice.rest.catalog.internal.config.Config;
2121
import com.altinity.ice.rest.catalog.internal.config.MaintenanceConfig;
22+
import com.altinity.ice.rest.catalog.internal.etcd.CommitLock;
2223
import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog;
2324
import com.altinity.ice.rest.catalog.internal.maintenance.DataCompaction;
2425
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceJob;
@@ -279,7 +280,8 @@ private static Server createBaseServer(
279280
if (requireAuth) {
280281
mux.insertHandler(createAuthorizationHandler(config.bearerTokens(), config));
281282

282-
restCatalogAdapter = new RESTCatalogAdapter(catalog);
283+
restCatalogAdapter =
284+
new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config));
283285
var globalConfig = config.toIcebergConfigDefaults();
284286
if (!globalConfig.isEmpty()) {
285287
restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig);
@@ -299,7 +301,8 @@ private static Server createBaseServer(
299301
new RESTCatalogMiddlewareCredentials(restCatalogAdapter, auth), auth);
300302
}
301303
} else {
302-
restCatalogAdapter = new RESTCatalogAdapter(catalog);
304+
restCatalogAdapter =
305+
new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config));
303306
var globalConfig = config.toIcebergConfigDefaults();
304307
if (!globalConfig.isEmpty()) {
305308
restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig);
@@ -319,6 +322,18 @@ private static Server createBaseServer(
319322
}
320323
}
321324

325+
logger.info(
326+
"Commit retry config: numRetries={} minWaitMs={} maxWaitMs={} totalTimeoutMs={}",
327+
config.commitRetry().numRetries(),
328+
config.commitRetry().minWaitMs(),
329+
config.commitRetry().maxWaitMs(),
330+
config.commitRetry().totalTimeoutMs());
331+
logger.info(
332+
"Commit lock config: enabled={} leaseTtlSeconds={} acquireTimeoutMs={}",
333+
config.commitLock().enabled(),
334+
config.commitLock().leaseTtlSeconds(),
335+
config.commitLock().acquireTimeoutMs());
336+
322337
var h = new ServletHolder(new RESTCatalogServlet(restCatalogAdapter));
323338
mux.addServlet(h, "/*");
324339

@@ -395,6 +410,22 @@ private static RESTCatalogAuthorizationHandler createAuthorizationHandler(
395410
return new RESTCatalogAuthorizationHandler(tokens, anonymousSession);
396411
}
397412

413+
/**
414+
* Per-table etcd commit lock for {@link EtcdCatalog}; ignored when disabled or when not using
415+
* etcd.
416+
*/
417+
static CommitLock maybeCommitLock(Catalog catalog, Config config) {
418+
if (!config.commitLock().enabled()) {
419+
return null;
420+
}
421+
if (!(catalog instanceof EtcdCatalog etcd)) {
422+
logger.warn(
423+
"commitLock.enabled is true but catalog is not EtcdCatalog; commit lock disabled");
424+
return null;
425+
}
426+
return new CommitLock(etcd.etcdClient(), catalog.name(), config.commitLock());
427+
}
428+
398429
private static void overrideJettyDefaults(Server s) {
399430
ServerConfig.setQuiet(s);
400431
s.setErrorHandler(new PlainErrorHandler());
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.rest.catalog.internal.config;
11+
12+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
13+
14+
/**
15+
* Optional per-table etcd commit lock for {@code ice-rest-catalog} when using the etcd metastore.
16+
*
17+
* <p>Serializes commits to the same table so concurrent writers do not lose optimistic concurrency
18+
* races indefinitely.
19+
*/
20+
public record CommitLockConfig(
21+
@JsonPropertyDescription(
22+
"Enable etcd mutual-exclusion lock around table commits (etcd backend only; default false)")
23+
boolean enabled,
24+
@JsonPropertyDescription(
25+
"Lease TTL for the lock in seconds (must exceed slow commits; default 30)")
26+
long leaseTtlSeconds,
27+
@JsonPropertyDescription("Max time to wait to acquire the lock in milliseconds (default 30000)")
28+
long acquireTimeoutMs) {
29+
30+
public CommitLockConfig {
31+
if (leaseTtlSeconds <= 0) {
32+
leaseTtlSeconds = 30;
33+
}
34+
if (acquireTimeoutMs <= 0) {
35+
acquireTimeoutMs = 30_000L;
36+
}
37+
}
38+
39+
public static CommitLockConfig defaults() {
40+
return new CommitLockConfig(false, 30, 30_000L);
41+
}
42+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.rest.catalog.internal.config;
11+
12+
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
13+
import org.apache.iceberg.TableProperties;
14+
15+
/**
16+
* Server-side tuning for the REST catalog commit retry loop (OCC compare-and-swap failures).
17+
*
18+
* <p>Defaults match Iceberg's {@link TableProperties} commit retry defaults.
19+
*/
20+
public record CommitRetryConfig(
21+
@JsonPropertyDescription(
22+
"Number of retries on CommitFailedException (default: Iceberg commit.retry.num-retries = 4)")
23+
int numRetries,
24+
@JsonPropertyDescription(
25+
"Minimum backoff between retries in ms (default: Iceberg commit.retry.min-wait-ms)")
26+
long minWaitMs,
27+
@JsonPropertyDescription(
28+
"Maximum backoff between retries in ms (default: Iceberg commit.retry.max-wait-ms)")
29+
long maxWaitMs,
30+
@JsonPropertyDescription(
31+
"Total time budget for the retry loop in ms (default: Iceberg commit.retry.total-timeout-ms)")
32+
long totalTimeoutMs) {
33+
34+
public CommitRetryConfig {
35+
if (numRetries <= 0) {
36+
numRetries = TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
37+
}
38+
if (minWaitMs <= 0) {
39+
minWaitMs = TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
40+
}
41+
if (maxWaitMs <= 0) {
42+
maxWaitMs = TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
43+
}
44+
if (totalTimeoutMs <= 0) {
45+
totalTimeoutMs = TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
46+
}
47+
}
48+
49+
public static CommitRetryConfig defaults() {
50+
return new CommitRetryConfig(
51+
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
52+
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
53+
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
54+
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT);
55+
}
56+
}

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ public record Config(
5656
"Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)")
5757
String maintenanceSchedule,
5858
@JsonPropertyDescription("Maintenance config") MaintenanceConfig maintenance,
59+
@JsonPropertyDescription(
60+
"Server-side commit retry config; tune up for high-contention workloads (e.g., parallel `ice insert` to one table)")
61+
CommitRetryConfig commitRetry,
62+
@JsonPropertyDescription(
63+
"Optional etcd per-table commit lock (etcd metastore only). Reduces CommitFailedException under concurrent writers.")
64+
CommitLockConfig commitLock,
5965
@JsonPropertyDescription(
6066
"(experimental) Extra properties to include in loadTable REST response.")
6167
Map<String, String> loadTableProperties,
@@ -81,6 +87,8 @@ public Config(
8187
AnonymousAccess anonymousAccess,
8288
String maintenanceSchedule,
8389
MaintenanceConfig maintenance,
90+
CommitRetryConfig commitRetry,
91+
CommitLockConfig commitLock,
8492
Map<String, String> loadTableProperties,
8593
@JsonProperty("iceberg") Map<String, String> icebergProperties) {
8694
this.addr = Strings.orDefault(addr, DEFAULT_ADDR);
@@ -98,6 +106,8 @@ public Config(
98106
this.maintenance =
99107
Objects.requireNonNullElseGet(
100108
maintenance, () -> new MaintenanceConfig(null, 0, 0, 0, 0, 0, 0, null, false));
109+
this.commitRetry = Objects.requireNonNullElse(commitRetry, CommitRetryConfig.defaults());
110+
this.commitLock = Objects.requireNonNullElse(commitLock, CommitLockConfig.defaults());
101111
this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of());
102112
this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of());
103113
}

0 commit comments

Comments
 (0)