Skip to content

Commit a6f6a04

Browse files
rustyconoverclaude
andcommitted
feat(secrets): scope- and type-aware Secrets class
Add a Secrets class (parse + field/namedField/ofType/secretType/forScope/ forScopeOfType/fieldFor) for resolved secrets now keyed by name, selecting among several secrets of one type by their scope and type fields. Update the example workers to select the vgi_example-typed secret (and skip the new scope protocol field). Unit-tested. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent f2fe4ac commit a6f6a04

5 files changed

Lines changed: 286 additions & 30 deletions

File tree

vgi-example-worker/src/main/java/farm/query/vgi/example/scalar/ReturnSecretValueFunction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public final class ReturnSecretValueFunction implements ScalarFunction {
3030

3131
private static final byte[] OUTPUT_SCHEMA_IPC = Schemas.singleResultIpc(Schemas.UTF8);
3232
private static final java.util.Set<String> PROTOCOL_FIELDS =
33-
java.util.Set.of("name", "type", "provider");
33+
java.util.Set.of("name", "type", "provider", "scope");
3434

3535
private static final FunctionSpec SPEC = FunctionSpec.builder("return_secret_value")
3636
.description("Return a secret's value")
@@ -73,6 +73,11 @@ private static String encodeSecretJson(byte[] bytes) {
7373
FieldVector vv = root.getVector(f.getName());
7474
if (vv == null || vv.isNull(0)) continue;
7575
if (vv instanceof org.apache.arrow.vector.complex.StructVector sv) {
76+
// Secrets are keyed by name; select the vgi_example-typed one.
77+
FieldVector typeCol = sv.getChild("type");
78+
String secretType = (typeCol != null && !typeCol.isNull(0))
79+
? String.valueOf(typeCol.getObject(0)) : "";
80+
if (!"vgi_example".equals(secretType)) continue;
7681
for (Field child : f.getChildren()) {
7782
String name = child.getName();
7883
if (PROTOCOL_FIELDS.contains(name)) continue;

vgi-example-worker/src/main/java/farm/query/vgi/example/scalar/SecretFieldFunction.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,20 @@
22

33
package farm.query.vgi.example.scalar;
44

5+
import farm.query.vgi.Secrets;
56
import farm.query.vgi.function.FunctionSpec;
67
import farm.query.vgi.protocol.BindResponse;
78
import farm.query.vgi.scalar.ScalarBindParams;
89
import farm.query.vgi.scalar.ScalarFunction;
910
import farm.query.vgi.scalar.ScalarProcessParams;
1011
import farm.query.vgi.types.Schemas;
11-
import farm.query.vgirpc.wire.Allocators;
1212
import org.apache.arrow.memory.BufferAllocator;
13-
import org.apache.arrow.vector.FieldVector;
1413
import org.apache.arrow.vector.VarCharVector;
1514
import org.apache.arrow.vector.VectorSchemaRoot;
16-
import org.apache.arrow.vector.complex.StructVector;
17-
import org.apache.arrow.vector.ipc.ArrowStreamReader;
18-
import org.apache.arrow.vector.types.pojo.Field;
1915
import org.apache.arrow.vector.util.Text;
2016

21-
import java.io.ByteArrayInputStream;
2217
import java.util.List;
18+
import java.util.Map;
2319

2420
/**
2521
* {@code secret_field()} — looks up individual fields on the resolved
@@ -57,27 +53,9 @@ public final class SecretFieldFunction implements ScalarFunction {
5753
}
5854

5955
private static String formatSecretField(byte[] bytes) {
60-
String port = "";
61-
String name = "";
62-
if (bytes != null && bytes.length > 0) {
63-
try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
64-
ArrowStreamReader r = new ArrowStreamReader(in, Allocators.root())) {
65-
if (r.loadNextBatch()) {
66-
VectorSchemaRoot root = r.getVectorSchemaRoot();
67-
for (Field f : root.getSchema().getFields()) {
68-
FieldVector vv = root.getVector(f.getName());
69-
if (vv instanceof StructVector sv) {
70-
FieldVector pc = sv.getChild("port");
71-
if (pc != null && !pc.isNull(0)) port = String.valueOf(pc.getObject(0));
72-
FieldVector nc = sv.getChild("secret_string");
73-
if (nc != null && !nc.isNull(0)) name = String.valueOf(nc.getObject(0));
74-
}
75-
}
76-
}
77-
} catch (Exception ignore) {
78-
// Resolve failure → empty fields, matching the python fixture.
79-
}
80-
}
81-
return "port=" + port + ";name=" + name;
56+
// Secrets are keyed by name; select the vgi_example-typed secret.
57+
Map<String, String> s = Secrets.parse(bytes).ofType("vgi_example")
58+
.stream().findFirst().orElse(Map.of());
59+
return "port=" + s.getOrDefault("port", "") + ";name=" + s.getOrDefault("secret_string", "");
8260
}
8361
}

vgi-example-worker/src/main/java/farm/query/vgi/example/table/SecretDemoFunction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ private static List<String[]> decodeSecret(byte[] bytes) {
7878
// Descend into its children, skipping protocol fields
7979
// that aren't user-supplied parameters.
8080
if (v instanceof org.apache.arrow.vector.complex.StructVector sv) {
81+
// Secrets are keyed by name; select the vgi_example-typed one.
82+
FieldVector typeCol = sv.getChild("type");
83+
String secretType = (typeCol != null && !typeCol.isNull(0))
84+
? String.valueOf(typeCol.getObject(0)) : "";
85+
if (!"vgi_example".equals(secretType)) continue;
8186
for (Field child : f.getChildren()) {
8287
String name = child.getName();
8388
if (PROTOCOL_FIELDS.contains(name)) continue;
@@ -104,7 +109,7 @@ private static List<String[]> decodeSecret(byte[] bytes) {
104109

105110
/** Protocol-supplied fields on a resolved secret struct (not user data). */
106111
private static final java.util.Set<String> PROTOCOL_FIELDS =
107-
java.util.Set.of("name", "type", "provider");
112+
java.util.Set.of("name", "type", "provider", "scope");
108113

109114
/** Friendly Arrow type name (Utf8 → "string", Int64 → "int64", etc.). */
110115
private static String arrowTypeName(Field f) {
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
// Copyright 2026 Query Farm LLC - https://query.farm
2+
3+
package farm.query.vgi;
4+
5+
import farm.query.vgirpc.wire.Allocators;
6+
import org.apache.arrow.vector.BigIntVector;
7+
import org.apache.arrow.vector.BitVector;
8+
import org.apache.arrow.vector.FieldVector;
9+
import org.apache.arrow.vector.IntVector;
10+
import org.apache.arrow.vector.VarCharVector;
11+
import org.apache.arrow.vector.VectorSchemaRoot;
12+
import org.apache.arrow.vector.complex.StructVector;
13+
import org.apache.arrow.vector.ipc.ArrowStreamReader;
14+
import org.apache.arrow.vector.types.pojo.Field;
15+
16+
import java.io.ByteArrayInputStream;
17+
import java.util.LinkedHashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Optional;
21+
22+
/**
23+
* Resolved secrets passed to a worker, keyed by each secret's unique DuckDB
24+
* secret name (not by type) so several secrets of the same type (e.g. one per S3
25+
* bucket) coexist. Each secret carries its connector-serialized {@code type} (the
26+
* DuckDB secret type) and {@code scope} (newline-joined scope prefixes) fields,
27+
* plus type-specific fields like {@code key_id}.
28+
*
29+
* <p>Mirrors {@code vgi::Secrets} in the Rust SDK. Parse the {@code byte[]} blob
30+
* carried on the params with {@link #parse(byte[])}, then select by name, type,
31+
* or scope.</p>
32+
*/
33+
public final class Secrets {
34+
35+
/** name -> { field -> value-as-string }. */
36+
private final Map<String, Map<String, String>> byName;
37+
38+
private Secrets(Map<String, Map<String, String>> byName) {
39+
this.byName = byName;
40+
}
41+
42+
/**
43+
* Build directly from a name -> fields map (for tests / non-IPC callers).
44+
*
45+
* @param byName the resolved secrets keyed by secret name
46+
* @return a {@code Secrets} over a copy of {@code byName}
47+
*/
48+
public static Secrets of(Map<String, Map<String, String>> byName) {
49+
Map<String, Map<String, String>> copy = new LinkedHashMap<>();
50+
byName.forEach((k, v) -> copy.put(k, new LinkedHashMap<>(v)));
51+
return new Secrets(copy);
52+
}
53+
54+
/**
55+
* Parse the IPC secrets blob. Each column is a secret (named by its DuckDB
56+
* secret name) holding a struct of its fields, including {@code type} and
57+
* {@code scope}. Empty/null blob yields empty secrets.
58+
*
59+
* @param bytes the IPC batch, or {@code null}/empty
60+
* @return the parsed secrets
61+
*/
62+
public static Secrets parse(byte[] bytes) {
63+
Map<String, Map<String, String>> byName = new LinkedHashMap<>();
64+
if (bytes == null || bytes.length == 0) {
65+
return new Secrets(byName);
66+
}
67+
try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
68+
ArrowStreamReader r = new ArrowStreamReader(in, Allocators.root())) {
69+
if (r.loadNextBatch()) {
70+
VectorSchemaRoot root = r.getVectorSchemaRoot();
71+
for (Field f : root.getSchema().getFields()) {
72+
FieldVector col = root.getVector(f.getName());
73+
if (col == null || col.isNull(0)) {
74+
continue;
75+
}
76+
Map<String, String> fields = new LinkedHashMap<>();
77+
if (col instanceof StructVector sv) {
78+
for (Field child : f.getChildren()) {
79+
FieldVector cv = sv.getChild(child.getName());
80+
if (cv != null && !cv.isNull(0)) {
81+
fields.put(child.getName(), render(cv));
82+
}
83+
}
84+
} else {
85+
fields.put(f.getName(), render(col));
86+
}
87+
if (!fields.isEmpty()) {
88+
byName.put(f.getName(), fields);
89+
}
90+
}
91+
}
92+
} catch (Exception e) {
93+
throw new RuntimeException("Failed to parse secrets IPC batch", e);
94+
}
95+
return new Secrets(byName);
96+
}
97+
98+
/** A field value from the first secret carrying it (any name). */
99+
public Optional<String> field(String field) {
100+
return byName.values().stream()
101+
.map(m -> m.get(field))
102+
.filter(java.util.Objects::nonNull)
103+
.findFirst();
104+
}
105+
106+
/** A named secret's field. */
107+
public Optional<String> namedField(String name, String field) {
108+
Map<String, String> m = byName.get(name);
109+
return m == null ? Optional.empty() : Optional.ofNullable(m.get(field));
110+
}
111+
112+
/** Every resolved secret as (name -> fields). */
113+
public Map<String, Map<String, String>> byName() {
114+
return byName;
115+
}
116+
117+
/** The DuckDB secret type of the named secret (its {@code type} field). */
118+
public Optional<String> secretType(String name) {
119+
return namedField(name, "type");
120+
}
121+
122+
/** Every resolved secret whose {@code type} field matches {@code secretType}. */
123+
public List<Map<String, String>> ofType(String secretType) {
124+
return byName.values().stream()
125+
.filter(m -> secretType.equals(m.get("type")))
126+
.toList();
127+
}
128+
129+
/**
130+
* The fields of the secret whose {@code scope} is the longest prefix of
131+
* {@code path}. The connector serializes each secret's scope as a
132+
* newline-joined list of prefixes; a secret with no (or empty) scope matches
133+
* as a last-resort fallback. Empty only when there are no candidate secrets.
134+
*
135+
* @param path the path to match (e.g. {@code s3://bucket/data/x.dat})
136+
* @return the best-matching secret's fields
137+
*/
138+
public Optional<Map<String, String>> forScope(String path) {
139+
return selectForScope(path, null);
140+
}
141+
142+
/** Like {@link #forScope} but only over secrets of {@code secretType}. */
143+
public Optional<Map<String, String>> forScopeOfType(String path, String secretType) {
144+
return selectForScope(path, secretType);
145+
}
146+
147+
/** A field of the best scope-matching secret for {@code path}. */
148+
public Optional<String> fieldFor(String path, String field) {
149+
return forScope(path).map(m -> m.get(field)).filter(java.util.Objects::nonNull);
150+
}
151+
152+
private Optional<Map<String, String>> selectForScope(String path, String secretType) {
153+
Map<String, String> best = null;
154+
int bestLen = -1;
155+
Map<String, String> fallback = null;
156+
for (Map<String, String> fields : byName.values()) {
157+
if (secretType != null && !secretType.equals(fields.get("type"))) {
158+
continue;
159+
}
160+
String scope = fields.get("scope");
161+
if (scope == null || scope.isEmpty()) {
162+
if (fallback == null) {
163+
fallback = fields;
164+
}
165+
continue;
166+
}
167+
for (String prefix : scope.split("\n")) {
168+
if (!prefix.isEmpty() && path.startsWith(prefix) && prefix.length() > bestLen) {
169+
bestLen = prefix.length();
170+
best = fields;
171+
}
172+
}
173+
}
174+
return Optional.ofNullable(best != null ? best : fallback);
175+
}
176+
177+
private static String render(FieldVector v) {
178+
if (v.isNull(0)) {
179+
return "";
180+
}
181+
if (v instanceof VarCharVector vc) {
182+
return new String(vc.get(0), java.nio.charset.StandardCharsets.UTF_8);
183+
}
184+
if (v instanceof BigIntVector iv) {
185+
return String.valueOf(iv.get(0));
186+
}
187+
if (v instanceof IntVector iv) {
188+
return String.valueOf(iv.get(0));
189+
}
190+
if (v instanceof BitVector bv) {
191+
return String.valueOf(bv.get(0) != 0);
192+
}
193+
Object o = v.getObject(0);
194+
return o == null ? "" : o.toString();
195+
}
196+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2026 Query Farm LLC - https://query.farm
2+
3+
package farm.query.vgi;
4+
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.util.LinkedHashMap;
8+
import java.util.Map;
9+
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
import static org.junit.jupiter.api.Assertions.assertFalse;
12+
import static org.junit.jupiter.api.Assertions.assertTrue;
13+
14+
/** Type- and scope-aware selection over resolved secrets. */
15+
class SecretsTest {
16+
17+
private static Map<String, String> fields(String... kv) {
18+
Map<String, String> m = new LinkedHashMap<>();
19+
for (int i = 0; i < kv.length; i += 2) {
20+
m.put(kv[i], kv[i + 1]);
21+
}
22+
return m;
23+
}
24+
25+
private static Secrets sample() {
26+
Map<String, Map<String, String>> byName = new LinkedHashMap<>();
27+
byName.put("my_s3", fields("type", "s3", "key_id", "AAA", "scope", "s3://bucket-a"));
28+
byName.put("my_s3_b",
29+
fields("type", "s3", "key_id", "BBB", "scope", "s3://bucket-b\ns3://bucket-b2"));
30+
byName.put("my_gcs", fields("type", "gcs", "key_id", "G"));
31+
return Secrets.of(byName);
32+
}
33+
34+
@Test
35+
void typeAware() {
36+
Secrets s = sample();
37+
assertEquals("s3", s.secretType("my_s3").orElse(null));
38+
assertEquals("gcs", s.secretType("my_gcs").orElse(null));
39+
assertEquals(2, s.ofType("s3").size());
40+
assertEquals(1, s.ofType("gcs").size());
41+
assertTrue(s.ofType("azure").isEmpty());
42+
}
43+
44+
@Test
45+
void scopeSelectionPerBucket() {
46+
Secrets s = sample();
47+
assertEquals("AAA", s.forScopeOfType("s3://bucket-a/x.dat", "s3").orElseThrow().get("key_id"));
48+
assertEquals("BBB", s.forScopeOfType("s3://bucket-b2/y.dat", "s3").orElseThrow().get("key_id"));
49+
assertEquals("AAA", s.fieldFor("s3://bucket-a/x.dat", "key_id").orElse(null));
50+
}
51+
52+
@Test
53+
void longestPrefixAndFallback() {
54+
Map<String, Map<String, String>> byName = new LinkedHashMap<>();
55+
byName.put("broad", fields("type", "s3", "key_id", "broad", "scope", "s3://bucket"));
56+
byName.put("narrow", fields("type", "s3", "key_id", "narrow", "scope", "s3://bucket/data"));
57+
Secrets s = Secrets.of(byName);
58+
assertEquals("narrow", s.fieldFor("s3://bucket/data/x.dat", "key_id").orElse(null));
59+
assertEquals("broad", s.fieldFor("s3://bucket/other/x.dat", "key_id").orElse(null));
60+
61+
Secrets unscoped = Secrets.of(Map.of("only", fields("type", "s3", "key_id", "only")));
62+
assertEquals("only", unscoped.fieldFor("s3://any/x", "key_id").orElse(null));
63+
64+
assertFalse(s.forScope("s3://nope/x").isPresent());
65+
}
66+
67+
@Test
68+
void parseEmpty() {
69+
assertTrue(Secrets.parse(new byte[0]).byName().isEmpty());
70+
assertTrue(Secrets.parse(null).field("anything").isEmpty());
71+
}
72+
}

0 commit comments

Comments
 (0)