Skip to content

Commit 2508520

Browse files
rustyconoverclaude
andcommitted
feat: union_varargs fixture — decode union-typed table varargs
Add union decoding to VectorScalarCodec (sparse UnionVector + DenseUnionVector cases) returning a new TaggedUnion(tag, value) that preserves the active member's field name, and add the union_varargs example table function echoing (idx, tag, value) per UNION(i BIGINT, s VARCHAR) vararg. Makes test/sql/integration/table/union_varargs.test pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent a6f6a04 commit 2508520

5 files changed

Lines changed: 263 additions & 1 deletion

File tree

vgi-example-worker/src/main/java/farm/query/vgi/example/Main.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ private static void registerTables(Worker w) {
489489
new farm.query.vgi.example.table.BrokenPartitionColumnsFunctions.BrokenPartitionColumnAbsentFromBatch(),
490490
new farm.query.vgi.example.table.TypedProbeFunction(),
491491
new farm.query.vgi.example.table.FilteredColumnsEchoFunction(),
492+
new farm.query.vgi.example.table.UnionVarargsFunction(),
492493
new farm.query.vgi.example.table.TxCachedValueFunction()));
493494
}
494495

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2026 Query Farm LLC - https://query.farm
2+
3+
package farm.query.vgi.example.table;
4+
5+
import farm.query.vgi.function.ArgSpec;
6+
import farm.query.vgi.function.FunctionSpec;
7+
import farm.query.vgi.function.TaggedUnion;
8+
import farm.query.vgi.internal.BatchUtil;
9+
import farm.query.vgi.internal.SchemaUtil;
10+
import farm.query.vgi.protocol.BindResponse;
11+
import farm.query.vgi.table.TableBindParams;
12+
import farm.query.vgi.table.TableFunction;
13+
import farm.query.vgi.table.TableInitParams;
14+
import farm.query.vgi.table.TableProducerState;
15+
import farm.query.vgi.types.Schemas;
16+
import farm.query.vgirpc.CallContext;
17+
import farm.query.vgirpc.OutputCollector;
18+
import org.apache.arrow.vector.BigIntVector;
19+
import org.apache.arrow.vector.VarCharVector;
20+
import org.apache.arrow.vector.types.UnionMode;
21+
import org.apache.arrow.vector.types.pojo.ArrowType;
22+
import org.apache.arrow.vector.types.pojo.Field;
23+
import org.apache.arrow.vector.types.pojo.Schema;
24+
import org.apache.arrow.vector.util.Text;
25+
26+
import java.io.Serializable;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
/**
31+
* {@code union_varargs(values...)} — union-typed variadic table function.
32+
*
33+
* <p>Each argument is a SQL {@code UNION(i BIGINT, s VARCHAR)}. DuckDB
34+
* serializes a SQL UNION as a <em>sparse</em> Arrow union; the worker declares
35+
* the vararg's Arrow type as a sparse union with members {@code [i BIGINT,
36+
* s VARCHAR]} so DuckDB renders the parameter type as exactly
37+
* {@code UNION(i BIGINT, s VARCHAR)}. Each vararg is decoded SDK-side into a
38+
* {@link TaggedUnion}, which preserves the active member discriminator that a
39+
* plain {@code getObject} would drop.
40+
*
41+
* <p>Emits one row per vararg in positional order:
42+
* <pre>
43+
* idx BIGINT — 0-based positional index
44+
* tag VARCHAR — active union member name ('i' or 's')
45+
* value VARCHAR — active member value stringified
46+
* </pre>
47+
*
48+
* <p>Mirrors the Python fixture {@code UnionVarargsFunction} in
49+
* {@code vgi/_test_fixtures/table/pairs.py}.
50+
*/
51+
public final class UnionVarargsFunction implements TableFunction {
52+
53+
/** Sparse union shared by every {@code union_varargs} argument. */
54+
private static final ArrowType UNION_TYPE =
55+
new ArrowType.Union(UnionMode.Sparse, new int[]{0, 1});
56+
57+
/** Member fields of the union, in type-id order. */
58+
private static final List<Field> UNION_MEMBERS = List.of(
59+
Schemas.nullable("i", Schemas.INT64),
60+
Schemas.nullable("s", Schemas.UTF8));
61+
62+
/** Fixed output schema: idx BIGINT, tag VARCHAR, value VARCHAR. */
63+
private static final Schema OUTPUT_SCHEMA = new Schema(List.of(
64+
Schemas.nullable("idx", Schemas.INT64),
65+
Schemas.nullable("tag", Schemas.UTF8),
66+
Schemas.nullable("value", Schemas.UTF8)));
67+
68+
private static final byte[] OUTPUT_SCHEMA_IPC =
69+
SchemaUtil.serializeSchema(OUTPUT_SCHEMA);
70+
71+
private static final FunctionSpec SPEC = FunctionSpec.builder("union_varargs")
72+
.description("Echo the active member tag and value of each union vararg")
73+
.arg(ArgSpec.nested("values", 0, UNION_TYPE, UNION_MEMBERS, /*varargs=*/true))
74+
.build();
75+
76+
@Override public FunctionSpec spec() { return SPEC; }
77+
78+
@Override public BindResponse onBind(TableBindParams p) {
79+
return BindResponse.forSchema(OUTPUT_SCHEMA_IPC);
80+
}
81+
82+
@Override public TableProducerState createProducer(TableInitParams p) {
83+
List<Object> positionals = p.arguments().positional();
84+
List<String> tags = new ArrayList<>(positionals.size());
85+
List<String> values = new ArrayList<>(positionals.size());
86+
for (Object o : positionals) {
87+
if (o instanceof TaggedUnion tu) {
88+
tags.add(tu.tag());
89+
values.add(tu.value() == null ? null : String.valueOf(tu.value()));
90+
} else {
91+
// Defensive: a non-union arg (shouldn't happen for this fixture).
92+
tags.add(null);
93+
values.add(o == null ? null : String.valueOf(o));
94+
}
95+
}
96+
return new State(tags, values);
97+
}
98+
99+
/** One-shot producer: emits a single batch of (idx, tag, value) rows. */
100+
public static final class State extends TableProducerState implements Serializable {
101+
private static final long serialVersionUID = 1L;
102+
public List<String> tags;
103+
public List<String> values;
104+
public boolean done;
105+
106+
public State() {}
107+
State(List<String> tags, List<String> values) {
108+
this.tags = tags;
109+
this.values = values;
110+
}
111+
112+
@Override public void produceTick(OutputCollector out, CallContext ctx) {
113+
if (done) { out.finish(); return; }
114+
done = true;
115+
int n = tags.size();
116+
BatchUtil.emit(OUTPUT_SCHEMA, n, out, (root, rows, start) -> {
117+
BigIntVector idx = (BigIntVector) root.getVector("idx");
118+
VarCharVector tag = (VarCharVector) root.getVector("tag");
119+
VarCharVector value = (VarCharVector) root.getVector("value");
120+
for (int i = 0; i < rows; i++) {
121+
idx.setSafe(i, i);
122+
if (tags.get(i) == null) tag.setNull(i);
123+
else tag.setSafe(i, new Text(tags.get(i)));
124+
if (values.get(i) == null) value.setNull(i);
125+
else value.setSafe(i, new Text(values.get(i)));
126+
}
127+
});
128+
}
129+
}
130+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright 2026 Query Farm LLC - https://query.farm
2+
3+
package farm.query.vgi.function;
4+
5+
/**
6+
* A decoded Arrow union value that preserves the active member discriminator.
7+
*
8+
* <p>A plain {@code UnionVector.getObject(row)} returns only the active
9+
* member's value, dropping the type id that identifies <em>which</em> member
10+
* is active. {@code TaggedUnion} pairs the active member's field name
11+
* ({@code tag}) with its decoded value so callers can recover the union's
12+
* discriminator after the Arrow round-trip.
13+
*
14+
* <p>Mirrors the Python framework's {@code TaggedUnion}. Produced by
15+
* {@link farm.query.vgi.internal.VectorScalarCodec#read} for
16+
* {@link org.apache.arrow.vector.complex.UnionVector} (sparse, as emitted by
17+
* DuckDB) cells.
18+
*
19+
* @param tag the active union member's field name (e.g. {@code "i"} or {@code "s"}).
20+
* @param value the active member's decoded value, or {@code null} when the active member is null.
21+
*/
22+
public record TaggedUnion(String tag, Object value) {}

vgi/src/main/java/farm/query/vgi/internal/VectorScalarCodec.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424
import org.apache.arrow.vector.UInt8Vector;
2525
import org.apache.arrow.vector.VarBinaryVector;
2626
import org.apache.arrow.vector.VarCharVector;
27+
import org.apache.arrow.vector.complex.DenseUnionVector;
2728
import org.apache.arrow.vector.complex.FixedSizeListVector;
2829
import org.apache.arrow.vector.complex.ListVector;
2930
import org.apache.arrow.vector.complex.StructVector;
31+
import org.apache.arrow.vector.complex.UnionVector;
32+
import org.apache.arrow.vector.types.pojo.ArrowType;
3033
import org.apache.arrow.vector.types.pojo.Field;
3134
import org.apache.arrow.vector.util.Text;
3235

36+
import farm.query.vgi.function.TaggedUnion;
37+
3338
import java.math.BigDecimal;
3439
import java.math.RoundingMode;
3540
import java.nio.charset.StandardCharsets;
@@ -68,7 +73,8 @@ private VectorScalarCodec() {}
6873
* @return the normalised Java value, or {@code null} for a null cell (structs excepted)
6974
*/
7075
public static Object read(FieldVector v, int row) {
71-
if (!(v instanceof StructVector) && v.isNull(row)) return null;
76+
if (!(v instanceof StructVector) && !(v instanceof UnionVector)
77+
&& !(v instanceof DenseUnionVector) && v.isNull(row)) return null;
7278
if (v instanceof BigIntVector b) return b.get(row);
7379
if (v instanceof IntVector i) return (long) i.get(row);
7480
if (v instanceof SmallIntVector s) return (long) s.get(row);
@@ -119,9 +125,60 @@ public static Object read(FieldVector v, int row) {
119125
for (int i = 0; i < width; i++) out.add(read(inner, start + i));
120126
return out;
121127
}
128+
if (v instanceof DenseUnionVector du) {
129+
byte typeId = du.getTypeId(row);
130+
FieldVector child = (FieldVector) du.getVectorByType(typeId);
131+
String tag = unionMemberName(du.getField(), typeId, child);
132+
int offset = du.getOffset(row);
133+
Object value = child == null ? null : read(child, offset);
134+
return new TaggedUnion(tag, value);
135+
}
136+
if (v instanceof UnionVector uv) {
137+
// Sparse union (DuckDB's wire form): the active member's value lives
138+
// at the same row index in the type-id-selected child sub-vector.
139+
int typeValue = uv.getTypeValue(row);
140+
FieldVector child = (FieldVector) uv.getVectorByType(typeValue);
141+
String tag = unionMemberName(uv.getField(), typeValue, child);
142+
Object value = child == null ? null : read(child, row);
143+
return new TaggedUnion(tag, value);
144+
}
122145
return v.getObject(row);
123146
}
124147

148+
/**
149+
* Resolve the active union member's field name (the discriminator tag) for
150+
* a given runtime type id.
151+
*
152+
* <p>The union {@link Field}'s declared children carry the authoritative
153+
* member names (e.g. {@code "i"} / {@code "s"} for DuckDB's
154+
* {@code UNION(i BIGINT, s VARCHAR)}); the {@link ArrowType.Union}'s
155+
* {@code typeIds} array maps each declared child to its id, which we invert.
156+
* The runtime child sub-vector's name is only a fallback — Arrow's
157+
* canonical {@code UnionVector} renames children by minor type
158+
* ({@code "bigint"}, {@code "varchar"}) when built outside an IPC round
159+
* trip, so it is not reliable on its own.
160+
*
161+
* @param unionField the union's declared {@link Field}
162+
* @param typeId the active runtime type id
163+
* @param child the active child sub-vector (may be {@code null})
164+
* @return the active member's field name
165+
*/
166+
private static String unionMemberName(Field unionField, int typeId, FieldVector child) {
167+
List<Field> children = unionField.getChildren();
168+
if (unionField.getType() instanceof ArrowType.Union ut) {
169+
int[] ids = ut.getTypeIds();
170+
if (ids != null) {
171+
for (int i = 0; i < ids.length; i++) {
172+
if (ids[i] == typeId && i < children.size()) {
173+
return children.get(i).getName();
174+
}
175+
}
176+
}
177+
}
178+
if (typeId >= 0 && typeId < children.size()) return children.get(typeId).getName();
179+
return child != null ? child.getName() : String.valueOf(typeId);
180+
}
181+
125182
/**
126183
* Write {@code value} into row {@code row} of {@code v}.
127184
*

vgi/src/test/java/farm/query/vgi/internal/VectorScalarCodecTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,56 @@ void writeVarCharAcceptsTextOrString() {
143143
assertEquals("def", VectorScalarCodec.read(vc, 1));
144144
}
145145
}
146+
147+
@Test
148+
void readsSparseUnionAsTaggedUnionPreservingMemberName() {
149+
// Sparse UNION(i BIGINT, s VARCHAR) — the wire form DuckDB emits for
150+
// a SQL UNION. Row 0 has member 'i' active (type id 0), row 1 has
151+
// member 's' active (type id 1).
152+
List<Field> members = List.of(
153+
new Field("i", new FieldType(true, new ArrowType.Int(64, true), null), null),
154+
new Field("s", new FieldType(true, new ArrowType.Utf8(), null), null));
155+
ArrowType.Union unionType = new ArrowType.Union(
156+
org.apache.arrow.vector.types.UnionMode.Sparse, new int[]{0, 1});
157+
FieldType ft = new FieldType(true, unionType, null);
158+
try (org.apache.arrow.vector.complex.UnionVector uv =
159+
new org.apache.arrow.vector.complex.UnionVector(
160+
"u", alloc, ft, /*callBack=*/null)) {
161+
uv.initializeChildrenFromFields(members);
162+
uv.allocateNew();
163+
// Resolve children through the same accessor production code uses
164+
// (getVectorByType), so the type ids and child sub-vectors stay
165+
// consistent regardless of Arrow's internal child ordering.
166+
BigIntVector iChild = (BigIntVector) uv.getVectorByType(0);
167+
VarCharVector sChild = (VarCharVector) uv.getVectorByType(1);
168+
iChild.setInitialCapacity(2);
169+
sChild.setInitialCapacity(2);
170+
171+
// Row 0: member i active (type id 0).
172+
uv.getTypeBuffer().setByte(
173+
0L * org.apache.arrow.vector.complex.UnionVector.TYPE_WIDTH, (byte) 0);
174+
iChild.setSafe(0, 1L);
175+
// Row 1: member s active (type id 1).
176+
uv.getTypeBuffer().setByte(
177+
1L * org.apache.arrow.vector.complex.UnionVector.TYPE_WIDTH, (byte) 1);
178+
sChild.setSafe(1, new Text("x"));
179+
180+
iChild.setValueCount(2);
181+
sChild.setValueCount(2);
182+
uv.setValueCount(2);
183+
184+
Object r0 = VectorScalarCodec.read(uv, 0);
185+
Object r1 = VectorScalarCodec.read(uv, 1);
186+
assertTrue(r0 instanceof farm.query.vgi.function.TaggedUnion,
187+
"expected TaggedUnion, got " + r0);
188+
assertTrue(r1 instanceof farm.query.vgi.function.TaggedUnion,
189+
"expected TaggedUnion, got " + r1);
190+
farm.query.vgi.function.TaggedUnion t0 = (farm.query.vgi.function.TaggedUnion) r0;
191+
farm.query.vgi.function.TaggedUnion t1 = (farm.query.vgi.function.TaggedUnion) r1;
192+
assertEquals("i", t0.tag());
193+
assertEquals(1L, t0.value());
194+
assertEquals("s", t1.tag());
195+
assertEquals("x", t1.value());
196+
}
197+
}
146198
}

0 commit comments

Comments
 (0)