Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.common.avro.internal.json;

import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.branchName;
import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.nullableSingle;

import java.util.IdentityHashMap;
import java.util.List;
Expand Down Expand Up @@ -67,6 +68,7 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator
private static final int RESERVE = 24;

private final JsonGeneratorEx json;
private final boolean canonical;
private final AvroType rootType;
private final Map<AvroType, List<AvroField>> fieldsByType;
private final Map<AvroType, List<AvroType>> branchesByType;
Expand All @@ -85,8 +87,17 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator
public AvroJsonGeneratorImpl(
AvroSchema schema,
JsonGeneratorEx json)
{
this(schema, json, false);
}

public AvroJsonGeneratorImpl(
AvroSchema schema,
JsonGeneratorEx json,
boolean canonical)
{
this.json = json;
this.canonical = canonical;
this.rootType = schema.type();
this.fieldsByType = new IdentityHashMap<>();
this.branchesByType = new IdentityHashMap<>();
Expand Down Expand Up @@ -180,8 +191,9 @@ public void writeIndex(
int index)
{
value();
AvroType branch = branches(valueType).get(index);
boolean wrapped = branch.kind() != AvroKind.NULL;
List<AvroType> branches = branches(valueType);
AvroType branch = branches.get(index);
boolean wrapped = branch.kind() != AvroKind.NULL && !(canonical && nullableSingle(branches));
if (wrapped)
{
json.writeStartObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class AvroJsonParserImpl implements AvroParser
private static final int DONE = 2;

private final JsonParserEx json;
private final boolean canonical;
private final AvroType rootType;
private final UnsafeBuffer segment;
private final UnsafeBuffer segmentView;
Expand Down Expand Up @@ -109,8 +110,17 @@ public long getStreamOffset()
public AvroJsonParserImpl(
AvroSchema schema,
JsonParserEx json)
{
this(schema, json, false);
}

public AvroJsonParserImpl(
AvroSchema schema,
JsonParserEx json,
boolean canonical)
{
this.json = json;
this.canonical = canonical;
this.rootType = schema.type();
this.segment = new UnsafeBuffer(0, 0);
this.segmentView = new UnsafeBuffer(0, 0);
Expand Down Expand Up @@ -438,6 +448,14 @@ private AvroEvent stepUnion(
}
event = selectBranch(type, index, false);
}
else if (canonical && AvroJsonUnion.nullableSingle(branches))
{
if (next != null)
{
int index = AvroJsonUnion.nullBranchIndex(branches) ^ 1;
event = selectBranch(type, index, false);
}
}
else if (next == JsonEvent.START_OBJECT)
{
consume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,15 @@ static int nullBranchIndex(
}
return index;
}

/**
* {@code true} when {@code branches} is a union of {@code null} and exactly one other type — the
* nullable-single shape whose non-null branch the canonical JSON encoding represents as a bare value
* rather than the {@code {"<branch>": value}} wrapper.
*/
static boolean nullableSingle(
List<AvroType> branches)
{
return branches.size() == 2 && nullBranchIndex(branches) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ public static AvroParser parser(
return new AvroJsonParserImpl(schema, parser);
}

/**
* Variant of {@link #parser(AvroSchema, JsonParserEx)} that, when {@code canonical} is {@code true},
* reads a nullable-single union (a union of {@code null} and exactly one other type) from a bare JSON
* value — {@code null} or the unwrapped value — rather than the {@code {"<branch>": value}} wrapper.
* Every other union shape is unchanged.
*/
public static AvroParser parser(
AvroSchema schema,
JsonParserEx parser,
boolean canonical)
{
return new AvroJsonParserImpl(schema, parser, canonical);
}

/**
* Begins a <b>JSON → Avro</b> push pipeline driven by {@code parser}: the returned {@link AvroStream}
* walks {@code schema} in lockstep with the JSON pull events and emits the Avro event stream. Append
Expand All @@ -86,6 +100,18 @@ public static AvroStream stream(
return Avro.stream(parser(schema, parser));
}

/**
* Variant of {@link #stream(AvroSchema, JsonParserEx)} with the canonical nullable-single union reading
* of {@link #parser(AvroSchema, JsonParserEx, boolean)}.
*/
public static AvroStream stream(
AvroSchema schema,
JsonParserEx parser,
boolean canonical)
{
return Avro.stream(parser(schema, parser, canonical));
}

/**
* Returns a schema-bound <b>Avro → JSON</b> {@link AvroGenerator} that maps each positional Avro write
* onto {@code generator}, applying the documented type mapping. Wrap it over the target buffer via
Expand All @@ -98,4 +124,18 @@ public static AvroGenerator generator(
{
return new AvroJsonGeneratorImpl(schema, generator);
}

/**
* Variant of {@link #generator(AvroSchema, JsonGeneratorEx)} that, when {@code canonical} is
* {@code true}, writes a nullable-single union (a union of {@code null} and exactly one other type) as a
* bare JSON value — {@code null} or the unwrapped value — rather than the {@code {"<branch>": value}}
* wrapper. Every other union shape is unchanged.
*/
public static AvroGenerator generator(
AvroSchema schema,
JsonGeneratorEx generator,
boolean canonical)
{
return new AvroJsonGeneratorImpl(schema, generator, canonical);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,86 @@ public void shouldRejectUnionWithoutNullBranch()
assertRejected("[\"int\",\"string\"]", "null");
}

@Test
public void shouldEncodeCanonicalNullableUnionValue()
{
assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x02, 0x02, 0x78 }, "\"x\"");
}

@Test
public void shouldEncodeCanonicalNullableUnionNull()
{
assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x00 }, "null");
}

@Test
public void shouldEncodeCanonicalNullableUnionInRecord()
{
assertCanonicalJson("""
{"type":"record","name":"Event","fields":[
{"name":"id","type":"string"},
{"name":"status","type":["null","string"]}]}""",
new byte[] { 0x06, 0x69, 0x64, 0x30, 0x02, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65 },
"{\"id\":\"id0\",\"status\":\"positive\"}");
}

@Test
public void shouldWrapNonNullableSingleUnionInCanonicalMode()
{
// a union with more than one non-null branch keeps the wrapped encoding even in canonical mode
assertCanonicalJson("[\"null\",\"int\",\"string\"]", new byte[] { 0x04, 0x02, 0x78 }, "{\"string\":\"x\"}");
}

private void assertCanonicalJson(
String schemaText,
byte[] binary,
String expectedJson)
{
AvroSchema schema = Avro.schema(schemaText);
assertEquals(expectedJson, avroToJsonCanonical(schema, binary));
assertArrayEquals(binary, jsonToAvroCanonical(schema, expectedJson));
}

private static String avroToJsonCanonical(
AvroSchema schema,
byte[] binary)
{
MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, binary.length * 8)]);
JsonGeneratorEx json = JsonEx.createGenerator();
AvroGenerator generator = AvroJson.generator(schema, json, true).wrap(out, 0, out.capacity());
AvroPipeline pipeline = Avro.stream(Avro.parser(schema)).into(AvroSink.of(generator));
pipeline.reset();
Status status = pipeline.feed(new UnsafeBuffer(binary), 0, binary.length);
if (status != Status.COMPLETED)
{
throw new AssertionError("avro -> json did not complete: " + status);
}
json.flush();
byte[] bytes = new byte[json.length()];
out.getBytes(0, bytes);
return new String(bytes, UTF_8);
}

private static byte[] jsonToAvroCanonical(
AvroSchema schema,
String json)
{
byte[] jsonBytes = json.getBytes(UTF_8);
MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, jsonBytes.length * 4)]);
AvroGenerator generator = Avro.generator(schema, out, 0);
JsonParserEx parser = JsonEx.createParser();
AvroPipeline pipeline = AvroJson.stream(schema, parser, true).into(AvroSink.of(generator));
pipeline.reset();
Status status = pipeline.feed(new UnsafeBuffer(jsonBytes), 0, jsonBytes.length);
if (status != Status.COMPLETED)
{
throw new AssertionError("json -> avro did not complete: " + status);
}
byte[] avro = new byte[generator.length()];
out.getBytes(0, avro);
return avro;
}

private void assertRejected(
String schemaText,
String json)
Expand Down
13 changes: 4 additions & 9 deletions runtime/model-avro/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

This project includes:
Apache Avro under Apache-2.0
Apache Commons Codec under Apache-2.0
Apache Commons Compress under Apache-2.0
Apache Commons IO under Apache License, Version 2.0
Apache Commons Lang under Apache License, Version 2.0
Jackson-annotations under The Apache Software License, Version 2.0
Jackson-core under The Apache Software License, Version 2.0
jackson-databind under The Apache Software License, Version 2.0
SLF4J API Module under MIT
agrona under The Apache License, Version 2.0
Jakarta JSON Processing API under Eclipse Public License 2.0 or GNU General Public License, version 2 with the GNU Classpath Exception
zilla::runtime::common-avro under Aklivity Community License Agreement
zilla::runtime::common-json under Aklivity Community License Agreement

49 changes: 9 additions & 40 deletions runtime/model-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,20 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>engine</artifactId>
<type>test-jar</type>
<artifactId>common-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<groupId>${project.groupId}</groupId>
<artifactId>common-json</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>${project.groupId}</groupId>
<artifactId>engine</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down Expand Up @@ -146,34 +144,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.avro:avro</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>io.aklivity.zilla.runtime.model.avro.internal.avro</shadedPattern>
</relocation>
</relocations>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<useDependencyReducedPomInJar>true</useDependencyReducedPomInJar>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.moditect</groupId>
<artifactId>moditect-maven-plugin</artifactId>
Expand All @@ -199,7 +169,6 @@
<configuration>
<excludes>
<exclude>io/aklivity/zilla/runtime/model/avro/internal/types/**/*.class</exclude>
<exclude>org/apache/avro/**/*.class</exclude>
</excludes>
<rules>
<rule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,27 @@

import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW;

public class AvroField
public final class AvroField
{
private static final int INITIAL_CAPACITY = 24;

public final OctetsFW value;
public final MutableDirectBuffer buffer;

private MutableDirectBuffer buffer;

public AvroField()
{
this.value = new OctetsFW();
this.buffer = new UnsafeBuffer(new byte[24]);
this.buffer = new UnsafeBuffer(new byte[INITIAL_CAPACITY]);
}

public MutableDirectBuffer buffer(
int capacity)
{
if (capacity > buffer.capacity())
{
buffer = new UnsafeBuffer(new byte[Math.max(buffer.capacity() * 2, capacity)]);
}
return buffer;
}
}
Loading
Loading