Skip to content

Commit 204d30f

Browse files
authored
feat: Add logic to export and import etcd catalog keys as JSON (#174)
1 parent 84a7efc commit 204d30f

16 files changed

Lines changed: 816 additions & 5 deletions

File tree

docs/catalog-import-export.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Catalog import / export
2+
3+
Export and import the **catalog registry** (namespace and table metadata pointers in etcd). Table data stays in object storage (S3/MinIO/GCS); only registry keys are moved.
4+
5+
**Requires:** etcd-backed `ice-rest-catalog`. Configure the CLI with `.ice.yaml` (`uri`, optional `bearerToken`).
6+
7+
## Export
8+
9+
```bash
10+
ice catalog-export -o catalog-snapshot.json
11+
```
12+
13+
| Option | Description |
14+
|--------|-------------|
15+
| `-o`, `--output` | Output file (`-` or omit for stdout) |
16+
| `--namespace` | Export one namespace and its tables only (e.g. `flowers`) |
17+
18+
Example (single namespace to stdout):
19+
20+
```bash
21+
ice catalog-export --namespace flowers
22+
```
23+
24+
Snapshot JSON fields: `version`, `catalog_name`, `exported_at`, `namespaces[]`, `tables[]` (each entry has `key` and `value`).
25+
26+
## Import
27+
28+
```bash
29+
ice catalog-import -i catalog-snapshot.json
30+
```
31+
32+
| Option | Description |
33+
|--------|-------------|
34+
| `-i`, `--input` | Input file (`-` or omit for stdin) |
35+
| `--dry-run` | Preview only; no writes |
36+
| `--overwrite` | Replace existing keys (default: skip existing) |
37+
38+
Preview changes:
39+
40+
```bash
41+
ice catalog-import -i catalog-snapshot.json --dry-run
42+
```
43+
44+
Import result JSON: `created`, `skipped`, `overwritten`, `catalog_name`, `exported_at`.
45+
46+
Pipe export → import:
47+
48+
```bash
49+
ice catalog-export | ice catalog-import -
50+
```
51+
52+
## REST API
53+
54+
Same operations on the main catalog server (`addr` in `.ice-rest-catalog.yaml`). Not exposed on optional `adminAddr`.
55+
56+
| Method | Path | Description |
57+
|--------|------|-------------|
58+
| `GET` | `/admin/v1/catalog-export` | Export; optional query `namespace` |
59+
| `POST` | `/admin/v1/catalog-import` | Import; body = snapshot JSON; query `dry-run`, `overwrite` |
60+
61+
Example:
62+
63+
```bash
64+
curl -s http://localhost:8181/admin/v1/catalog-export | jq .
65+
curl -s -X POST "http://localhost:8181/admin/v1/catalog-import?dry-run=true" \
66+
-H "Content-Type: application/json" \
67+
-d @catalog-snapshot.json | jq .
68+
```
69+
70+
## Typical uses
71+
72+
- Move catalog metadata between environments (dev → staging)
73+
- Registry backup lighter than a full etcd snapshot
74+
- Clone a namespace into another catalog instance
75+
76+
For full etcd cluster backup/restore, see [etcd backup & restore (3-node)](etcd-backup-restore-upgrade-3-node.md).

ice-rest-catalog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,4 @@ If `enabled` is true but the catalog backend is not etcd, the lock is ignored (w
3232
- [GCS Setup](../docs/ice-rest-catalog-gcs.md) -- configuring ice-rest-catalog with Google Cloud Storage
3333
- [etcd Backend Schema](../docs/etcd-backend-schema.md) -- etcd key/value schema (`n/`, `t/` prefixes) and mapping to SQLite
3434
- [SQLite Backend Schema](../docs/sqlite-backend-schema.md) -- SQLite tables (`iceberg_tables`, `iceberg_namespace_properties`)
35+
- [Catalog Import/Export](../docs/catalog-import-export.md) -- export and import catalog registry (namespaces and table metadata pointers) via CLI or REST API

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.altinity.ice.rest.catalog.internal.maintenance.SnapshotCleanup;
3131
import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics;
3232
import com.altinity.ice.rest.catalog.internal.metrics.PrometheusMetricsReporter;
33+
import com.altinity.ice.rest.catalog.internal.rest.CatalogAdminServlet;
3334
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
3435
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
3536
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
@@ -237,7 +238,7 @@ static Server createServer(
237238
Config config,
238239
Map<String, String> icebergConfig,
239240
PrometheusMetricsReporter metricsReporter) {
240-
var s = createBaseServer(catalog, config, icebergConfig, true, metricsReporter);
241+
var s = createBaseServer(catalog, config, icebergConfig, true, true, metricsReporter);
241242
ServerConnector connector = new ServerConnector(s);
242243
connector.setHost(host);
243244
connector.setPort(port);
@@ -252,7 +253,7 @@ private static Server createAdminServer(
252253
Config config,
253254
Map<String, String> icebergConfig,
254255
PrometheusMetricsReporter metricsReporter) {
255-
var s = createBaseServer(catalog, config, icebergConfig, false, metricsReporter);
256+
var s = createBaseServer(catalog, config, icebergConfig, false, false, metricsReporter);
256257
ServerConnector connector = new ServerConnector(s);
257258
connector.setHost(host);
258259
connector.setPort(port);
@@ -265,6 +266,7 @@ private static Server createBaseServer(
265266
Config config,
266267
Map<String, String> icebergConfig,
267268
boolean requireAuth,
269+
boolean registerAdminServlet,
268270
PrometheusMetricsReporter metricsReporter) {
269271
var mux = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
270272
mux.insertHandler(new GzipHandler());
@@ -337,6 +339,11 @@ private static Server createBaseServer(
337339
var h = new ServletHolder(new RESTCatalogServlet(restCatalogAdapter));
338340
mux.addServlet(h, "/*");
339341

342+
if (registerAdminServlet) {
343+
var adminServlet = new ServletHolder(new CatalogAdminServlet(catalog, config.name()));
344+
mux.addServlet(adminServlet, "/admin/*");
345+
}
346+
340347
var s = new Server();
341348
overrideJettyDefaults(s);
342349
s.setHandler(mux);
@@ -496,7 +503,7 @@ public Integer call() throws Exception {
496503
icebergConfig,
497504
metricsReporter);
498505
adminServer.start();
499-
logger.warn("Serving admin endpoint at http://{}/v1/{config,*}", adminHostAndPort);
506+
logger.warn("Serving admin endpoint at http://{}/v1/{{config,*}}", adminHostAndPort);
500507
}
501508

502509
HostAndPort hostAndPort = HostAndPort.fromString(config.addr());
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.rest.catalog.internal.cmd;
11+
12+
import com.altinity.ice.internal.strings.Strings;
13+
import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog;
14+
import com.altinity.ice.rest.catalog.internal.rest.RESTObjectMapper;
15+
import com.fasterxml.jackson.core.JsonProcessingException;
16+
import com.fasterxml.jackson.core.type.TypeReference;
17+
import java.time.Instant;
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Map;
21+
import org.apache.iceberg.catalog.Catalog;
22+
import org.apache.iceberg.exceptions.RuntimeIOException;
23+
24+
/** Catalog registry export/import for etcd-backed catalogs. */
25+
public final class CatalogAdminService {
26+
27+
private CatalogAdminService() {}
28+
29+
public static CatalogSnapshot export(
30+
Catalog catalog, String catalogName, String namespaceFilter) {
31+
EtcdCatalog etcdCatalog = requireEtcdCatalog(catalog);
32+
List<CatalogSnapshot.NamespaceEntry> namespaces = new ArrayList<>();
33+
for (EtcdCatalog.CatalogKv kv : etcdCatalog.listAllNamespaceKvs()) {
34+
if (!matchesNamespaceFilter(kv.key(), namespaceFilter, etcdCatalog)) {
35+
continue;
36+
}
37+
namespaces.add(new CatalogSnapshot.NamespaceEntry(kv.key(), parseJsonMap(kv.value())));
38+
}
39+
40+
List<CatalogSnapshot.TableEntry> tables = new ArrayList<>();
41+
for (EtcdCatalog.CatalogKv kv : etcdCatalog.listAllTableKvs(namespaceFilter)) {
42+
tables.add(new CatalogSnapshot.TableEntry(kv.key(), parseJsonMap(kv.value())));
43+
}
44+
45+
return new CatalogSnapshot(
46+
CatalogSnapshot.CURRENT_VERSION, catalogName, Instant.now().toString(), namespaces, tables);
47+
}
48+
49+
public static CatalogImportResult importSnapshot(
50+
Catalog catalog, CatalogSnapshot snapshot, boolean dryRun, boolean overwrite) {
51+
EtcdCatalog etcdCatalog = requireEtcdCatalog(catalog);
52+
53+
if (snapshot.version() != CatalogSnapshot.CURRENT_VERSION) {
54+
throw new IllegalArgumentException(
55+
"Unsupported snapshot version: "
56+
+ snapshot.version()
57+
+ " (expected "
58+
+ CatalogSnapshot.CURRENT_VERSION
59+
+ ")");
60+
}
61+
62+
int created = 0;
63+
int skipped = 0;
64+
int overwritten = 0;
65+
66+
if (snapshot.namespaces() != null) {
67+
for (CatalogSnapshot.NamespaceEntry entry : snapshot.namespaces()) {
68+
EtcdCatalog.PutCatalogKvResult result =
69+
etcdCatalog.putCatalogKv(entry.key(), marshal(entry.value()), overwrite, dryRun);
70+
switch (result) {
71+
case CREATED -> created++;
72+
case SKIPPED -> skipped++;
73+
case OVERWRITTEN -> overwritten++;
74+
}
75+
}
76+
}
77+
if (snapshot.tables() != null) {
78+
for (CatalogSnapshot.TableEntry entry : snapshot.tables()) {
79+
EtcdCatalog.PutCatalogKvResult result =
80+
etcdCatalog.putCatalogKv(entry.key(), marshal(entry.value()), overwrite, dryRun);
81+
switch (result) {
82+
case CREATED -> created++;
83+
case SKIPPED -> skipped++;
84+
case OVERWRITTEN -> overwritten++;
85+
}
86+
}
87+
}
88+
89+
return new CatalogImportResult(
90+
created, skipped, overwritten, snapshot.catalogName(), snapshot.exportedAt());
91+
}
92+
93+
private static EtcdCatalog requireEtcdCatalog(Catalog catalog) {
94+
if (!(catalog instanceof EtcdCatalog etcdCatalog)) {
95+
throw new IllegalArgumentException(
96+
"Catalog export/import requires an etcd-backed catalog (uri: etcd:... in config)");
97+
}
98+
return etcdCatalog;
99+
}
100+
101+
private static boolean matchesNamespaceFilter(
102+
String namespaceKey, String namespaceFilter, EtcdCatalog catalog) {
103+
if (Strings.isNullOrEmpty(namespaceFilter)) {
104+
return true;
105+
}
106+
String prefix = catalogPrefixForFilter(catalog) + "n/";
107+
if (!namespaceKey.startsWith(prefix)) {
108+
return false;
109+
}
110+
String path = namespaceKey.substring(prefix.length());
111+
return path.equals(namespaceFilter) || path.startsWith(namespaceFilter + "/");
112+
}
113+
114+
private static String catalogPrefixForFilter(EtcdCatalog catalog) {
115+
if ("default".equals(catalog.name())) {
116+
return "";
117+
}
118+
return catalog.name() + "/";
119+
}
120+
121+
private static Map<String, String> parseJsonMap(String json) {
122+
try {
123+
return RESTObjectMapper.mapper().readValue(json, new TypeReference<>() {});
124+
} catch (JsonProcessingException e) {
125+
throw new RuntimeIOException(e);
126+
}
127+
}
128+
129+
private static String marshal(Map<String, String> value) {
130+
try {
131+
return RESTObjectMapper.mapper().writeValueAsString(value);
132+
} catch (JsonProcessingException e) {
133+
throw new RuntimeIOException(e);
134+
}
135+
}
136+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.rest.catalog.internal.cmd;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
14+
/** Summary returned by catalog import. */
15+
public record CatalogImportResult(
16+
int created,
17+
int skipped,
18+
int overwritten,
19+
@JsonProperty("catalog_name") String catalogName,
20+
@JsonProperty("exported_at") String exportedAt) {}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.altinity.ice.rest.catalog.internal.cmd;
11+
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import java.util.List;
14+
import java.util.Map;
15+
16+
/** Portable JSON snapshot of etcd catalog registry keys ({@code n/} and {@code t/}). */
17+
public record CatalogSnapshot(
18+
int version,
19+
@JsonProperty("catalog_name") String catalogName,
20+
@JsonProperty("exported_at") String exportedAt,
21+
List<NamespaceEntry> namespaces,
22+
List<TableEntry> tables) {
23+
24+
public static final int CURRENT_VERSION = 1;
25+
26+
public record NamespaceEntry(String key, Map<String, String> value) {}
27+
28+
public record TableEntry(String key, Map<String, String> value) {}
29+
}

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.IOException;
3030
import java.nio.charset.StandardCharsets;
3131
import java.util.Arrays;
32+
import java.util.Comparator;
3233
import java.util.List;
3334
import java.util.Locale;
3435
import java.util.Map;
@@ -113,6 +114,64 @@ public Client etcdClient() {
113114
return client;
114115
}
115116

117+
/** UTF-8 etcd key and JSON value for catalog export/import. */
118+
public record CatalogKv(String key, String value) {}
119+
120+
/** All namespace entries under this catalog's {@code n/} prefix. */
121+
public List<CatalogKv> listAllNamespaceKvs() {
122+
return prefixScan(namespacePrefix());
123+
}
124+
125+
/**
126+
* All table entries under this catalog's {@code t/} prefix. When {@code namespacePath} is set,
127+
* only tables in that namespace (e.g. {@code flowers} or {@code parent/child}) are included.
128+
*/
129+
public List<CatalogKv> listAllTableKvs(String namespacePath) {
130+
String prefix = tablePrefix();
131+
if (namespacePath != null && !namespacePath.isBlank()) {
132+
prefix = prefix + namespacePath + "/";
133+
}
134+
return prefixScan(prefix);
135+
}
136+
137+
private List<CatalogKv> prefixScan(String prefix) {
138+
GetResponse res = unwrap(kv.get(byteSeq(prefix), GetOption.builder().isPrefix(true).build()));
139+
return res.getKvs().stream()
140+
.map(
141+
entry ->
142+
new CatalogKv(
143+
entry.getKey().toString(StandardCharsets.UTF_8),
144+
entry.getValue().toString(StandardCharsets.UTF_8)))
145+
.sorted(Comparator.comparing(CatalogKv::key))
146+
.toList();
147+
}
148+
149+
public enum PutCatalogKvResult {
150+
CREATED,
151+
SKIPPED,
152+
OVERWRITTEN
153+
}
154+
155+
/**
156+
* Writes a catalog key. When {@code overwrite} is false and the key exists, returns {@link
157+
* PutCatalogKvResult#SKIPPED} without writing. When {@code dryRun} is true, no write is
158+
* performed.
159+
*/
160+
public PutCatalogKvResult putCatalogKv(
161+
String key, String jsonValue, boolean overwrite, boolean dryRun) {
162+
ByteSequence k = byteSeq(key);
163+
GetResponse existing = unwrap(kv.get(k, GetOption.builder().withCountOnly(true).build()));
164+
boolean exists = existing.getCount() > 0;
165+
if (exists && !overwrite) {
166+
return PutCatalogKvResult.SKIPPED;
167+
}
168+
if (dryRun) {
169+
return exists ? PutCatalogKvResult.OVERWRITTEN : PutCatalogKvResult.CREATED;
170+
}
171+
unwrapCommit(kv.put(k, byteSeq(jsonValue)));
172+
return exists ? PutCatalogKvResult.OVERWRITTEN : PutCatalogKvResult.CREATED;
173+
}
174+
116175
// Used by EtcdCatalogTest to test concurrent modifications.
117176
protected Txn kvtx() {
118177
return kv.txn();

0 commit comments

Comments
 (0)