From 81ab6c3e5162189bb9def47d0aca37537853406a Mon Sep 17 00:00:00 2001
From: Claude
Date: Tue, 16 Jun 2026 22:15:06 +0000
Subject: [PATCH 1/4] feat(model-avro): re-platform converter onto common-avro
+ common-json
Re-platform the model-avro converter off the shaded Apache Avro + Jackson
whole-message GenericRecord/JSON path onto the format-native common-avro
library and the common-json transcoder, surfaced through the engine model SPI.
- AvroRead/AvroWrite converters drive the common-avro AvroJson streaming
pipelines (avro<->JSON) for view: json; schemas are resolved via
CatalogHandler and the compiled AvroSchema + parser cached per schemaId.
- Validation and field extraction run on the common-avro streaming pull
parser (zero-copy segments for string/bytes/fixed), aborting via engine
RESET on a decode/validation failure.
- Add a canonical (flattened nullable-single union) mode to the common-avro
AvroJson parser/stream/generator so view: json keeps the existing
key=value encoding for [null, T] unions; every other union shape is
unchanged.
- Drop the shaded Apache Avro + Jackson dependencies, the shade plugin, and
the org.apache.avro.io.* classes; depend on common-avro + common-json and
regenerate NOTICE.
- Preserve existing model: avro config, view: json semantics, padding, and
field extraction; the validation-failed event now carries the common-avro
decode error, mirroring the model-json re-platform.
Co-Authored-By: Claude Opus 4.8
Claude-Session: https://claude.ai/code/session_012CLYLdgT4B97uSUndTsG6N
---
.../internal/json/AvroJsonGeneratorImpl.java | 16 +-
.../internal/json/AvroJsonParserImpl.java | 18 +
.../avro/internal/json/AvroJsonUnion.java | 11 +
.../runtime/common/avro/json/AvroJson.java | 40 +
.../common/avro/json/AvroJsonTest.java | 80 ++
runtime/model-avro/NOTICE | 13 +-
runtime/model-avro/pom.xml | 49 +-
.../model/avro/internal/AvroField.java | 19 +-
.../model/avro/internal/AvroModelHandler.java | 458 ++++-----
.../internal/AvroReadConverterHandler.java | 129 ++-
.../avro/internal/AvroValidatorHandler.java | 45 +-
.../internal/AvroWriteConverterHandler.java | 110 +-
.../apache/avro/io/CanonicalJsonDecoder.java | 262 -----
.../apache/avro/io/CanonicalJsonEncoder.java | 99 --
.../java/org/apache/avro/io/JsonDecoder.java | 942 ------------------
.../java/org/apache/avro/io/JsonEncoder.java | 401 --------
.../src/main/moditect/module-info.java | 8 +-
.../model/avro/internal/AvroModelTest.java | 259 +++++
.../zilla/specs/model/avro/config/event.yaml | 2 +-
19 files changed, 785 insertions(+), 2176 deletions(-)
delete mode 100644 runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java
delete mode 100644 runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java
delete mode 100644 runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java
delete mode 100644 runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java
diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java
index 96ed25792c..09a30acfb4 100644
--- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java
+++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java
@@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.common.avro.internal.json;
import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.branchName;
+import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.nullableSingle;
import java.util.IdentityHashMap;
import java.util.List;
@@ -67,6 +68,7 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator
private static final int RESERVE = 24;
private final JsonGeneratorEx json;
+ private final boolean canonical;
private final AvroType rootType;
private final Map> fieldsByType;
private final Map> branchesByType;
@@ -85,8 +87,17 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator
public AvroJsonGeneratorImpl(
AvroSchema schema,
JsonGeneratorEx json)
+ {
+ this(schema, json, false);
+ }
+
+ public AvroJsonGeneratorImpl(
+ AvroSchema schema,
+ JsonGeneratorEx json,
+ boolean canonical)
{
this.json = json;
+ this.canonical = canonical;
this.rootType = schema.type();
this.fieldsByType = new IdentityHashMap<>();
this.branchesByType = new IdentityHashMap<>();
@@ -180,8 +191,9 @@ public void writeIndex(
int index)
{
value();
- AvroType branch = branches(valueType).get(index);
- boolean wrapped = branch.kind() != AvroKind.NULL;
+ List branches = branches(valueType);
+ AvroType branch = branches.get(index);
+ boolean wrapped = branch.kind() != AvroKind.NULL && !(canonical && nullableSingle(branches));
if (wrapped)
{
json.writeStartObject();
diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java
index 422d00350e..3676439750 100644
--- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java
+++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java
@@ -65,6 +65,7 @@ public final class AvroJsonParserImpl implements AvroParser, AvroLocation
private static final int DONE = 2;
private final JsonParserEx json;
+ private final boolean canonical;
private final AvroType rootType;
private final UnsafeBuffer segment;
private final UnsafeBuffer segmentView;
@@ -95,8 +96,17 @@ public final class AvroJsonParserImpl implements AvroParser, AvroLocation
public AvroJsonParserImpl(
AvroSchema schema,
JsonParserEx json)
+ {
+ this(schema, json, false);
+ }
+
+ public AvroJsonParserImpl(
+ AvroSchema schema,
+ JsonParserEx json,
+ boolean canonical)
{
this.json = json;
+ this.canonical = canonical;
this.rootType = schema.type();
this.segment = new UnsafeBuffer(0, 0);
this.segmentView = new UnsafeBuffer(0, 0);
@@ -430,6 +440,14 @@ private AvroEvent stepUnion(
}
event = selectBranch(type, index, false);
}
+ else if (canonical && AvroJsonUnion.nullableSingle(branches))
+ {
+ if (next != null)
+ {
+ int index = AvroJsonUnion.nullBranchIndex(branches) ^ 1;
+ event = selectBranch(type, index, false);
+ }
+ }
else if (next == JsonEvent.START_OBJECT)
{
consume();
diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java
index bdf31bcc4f..824969d43c 100644
--- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java
+++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java
@@ -119,4 +119,15 @@ static int nullBranchIndex(
}
return index;
}
+
+ /**
+ * {@code true} when {@code branches} is a union of {@code null} and exactly one other type — the
+ * nullable-single shape whose non-null branch the canonical JSON encoding represents as a bare value
+ * rather than the {@code {"": value}} wrapper.
+ */
+ static boolean nullableSingle(
+ List branches)
+ {
+ return branches.size() == 2 && nullBranchIndex(branches) >= 0;
+ }
}
diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java
index 8306d4aa25..a611359d40 100644
--- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java
+++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java
@@ -73,6 +73,20 @@ public static AvroParser parser(
return new AvroJsonParserImpl(schema, parser);
}
+ /**
+ * Variant of {@link #parser(AvroSchema, JsonParserEx)} that, when {@code canonical} is {@code true},
+ * reads a nullable-single union (a union of {@code null} and exactly one other type) from a bare JSON
+ * value — {@code null} or the unwrapped value — rather than the {@code {"": value}} wrapper.
+ * Every other union shape is unchanged.
+ */
+ public static AvroParser parser(
+ AvroSchema schema,
+ JsonParserEx parser,
+ boolean canonical)
+ {
+ return new AvroJsonParserImpl(schema, parser, canonical);
+ }
+
/**
* Begins a JSON → Avro push pipeline driven by {@code parser}: the returned {@link AvroStream}
* walks {@code schema} in lockstep with the JSON pull events and emits the Avro event stream. Append
@@ -86,6 +100,18 @@ public static AvroStream stream(
return Avro.stream(parser(schema, parser));
}
+ /**
+ * Variant of {@link #stream(AvroSchema, JsonParserEx)} with the canonical nullable-single union reading
+ * of {@link #parser(AvroSchema, JsonParserEx, boolean)}.
+ */
+ public static AvroStream stream(
+ AvroSchema schema,
+ JsonParserEx parser,
+ boolean canonical)
+ {
+ return Avro.stream(parser(schema, parser, canonical));
+ }
+
/**
* Returns a schema-bound Avro → JSON {@link AvroGenerator} that maps each positional Avro write
* onto {@code generator}, applying the documented type mapping. Wrap it over the target buffer via
@@ -98,4 +124,18 @@ public static AvroGenerator generator(
{
return new AvroJsonGeneratorImpl(schema, generator);
}
+
+ /**
+ * Variant of {@link #generator(AvroSchema, JsonGeneratorEx)} that, when {@code canonical} is
+ * {@code true}, writes a nullable-single union (a union of {@code null} and exactly one other type) as a
+ * bare JSON value — {@code null} or the unwrapped value — rather than the {@code {"": value}}
+ * wrapper. Every other union shape is unchanged.
+ */
+ public static AvroGenerator generator(
+ AvroSchema schema,
+ JsonGeneratorEx generator,
+ boolean canonical)
+ {
+ return new AvroJsonGeneratorImpl(schema, generator, canonical);
+ }
}
diff --git a/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java b/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java
index 02149d578f..241a8012d5 100644
--- a/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java
+++ b/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java
@@ -225,6 +225,86 @@ public void shouldRejectUnionWithoutNullBranch()
assertRejected("[\"int\",\"string\"]", "null");
}
+ @Test
+ public void shouldEncodeCanonicalNullableUnionValue()
+ {
+ assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x02, 0x02, 0x78 }, "\"x\"");
+ }
+
+ @Test
+ public void shouldEncodeCanonicalNullableUnionNull()
+ {
+ assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x00 }, "null");
+ }
+
+ @Test
+ public void shouldEncodeCanonicalNullableUnionInRecord()
+ {
+ assertCanonicalJson("""
+ {"type":"record","name":"Event","fields":[
+ {"name":"id","type":"string"},
+ {"name":"status","type":["null","string"]}]}""",
+ new byte[] { 0x06, 0x69, 0x64, 0x30, 0x02, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65 },
+ "{\"id\":\"id0\",\"status\":\"positive\"}");
+ }
+
+ @Test
+ public void shouldWrapNonNullableSingleUnionInCanonicalMode()
+ {
+ // a union with more than one non-null branch keeps the wrapped encoding even in canonical mode
+ assertCanonicalJson("[\"null\",\"int\",\"string\"]", new byte[] { 0x04, 0x02, 0x78 }, "{\"string\":\"x\"}");
+ }
+
+ private void assertCanonicalJson(
+ String schemaText,
+ byte[] binary,
+ String expectedJson)
+ {
+ AvroSchema schema = Avro.schema(schemaText);
+ assertEquals(expectedJson, avroToJsonCanonical(schema, binary));
+ assertArrayEquals(binary, jsonToAvroCanonical(schema, expectedJson));
+ }
+
+ private static String avroToJsonCanonical(
+ AvroSchema schema,
+ byte[] binary)
+ {
+ MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, binary.length * 8)]);
+ JsonGeneratorEx json = JsonEx.createGenerator();
+ AvroGenerator generator = AvroJson.generator(schema, json, true).wrap(out, 0, out.capacity());
+ AvroPipeline pipeline = Avro.stream(Avro.parser(schema)).into(AvroSink.of(generator));
+ pipeline.reset();
+ Status status = pipeline.feed(new UnsafeBuffer(binary), 0, binary.length);
+ if (status != Status.COMPLETED)
+ {
+ throw new AssertionError("avro -> json did not complete: " + status);
+ }
+ json.flush();
+ byte[] bytes = new byte[json.length()];
+ out.getBytes(0, bytes);
+ return new String(bytes, UTF_8);
+ }
+
+ private static byte[] jsonToAvroCanonical(
+ AvroSchema schema,
+ String json)
+ {
+ byte[] jsonBytes = json.getBytes(UTF_8);
+ MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, jsonBytes.length * 4)]);
+ AvroGenerator generator = Avro.generator(schema, out, 0);
+ JsonParserEx parser = JsonEx.createParser();
+ AvroPipeline pipeline = AvroJson.stream(schema, parser, true).into(AvroSink.of(generator));
+ pipeline.reset();
+ Status status = pipeline.feed(new UnsafeBuffer(jsonBytes), 0, jsonBytes.length);
+ if (status != Status.COMPLETED)
+ {
+ throw new AssertionError("json -> avro did not complete: " + status);
+ }
+ byte[] avro = new byte[generator.length()];
+ out.getBytes(0, avro);
+ return avro;
+ }
+
private void assertRejected(
String schemaText,
String json)
diff --git a/runtime/model-avro/NOTICE b/runtime/model-avro/NOTICE
index 4e7f359987..b607a04633 100644
--- a/runtime/model-avro/NOTICE
+++ b/runtime/model-avro/NOTICE
@@ -10,13 +10,8 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
This project includes:
- Apache Avro under Apache-2.0
- Apache Commons Codec under Apache-2.0
- Apache Commons Compress under Apache-2.0
- Apache Commons IO under Apache License, Version 2.0
- Apache Commons Lang under Apache License, Version 2.0
- Jackson-annotations under The Apache Software License, Version 2.0
- Jackson-core under The Apache Software License, Version 2.0
- jackson-databind under The Apache Software License, Version 2.0
- SLF4J API Module under MIT
+ agrona under The Apache License, Version 2.0
+ Jakarta JSON Processing API under Eclipse Public License 2.0 or GNU General Public License, version 2 with the GNU Classpath Exception
+ zilla::runtime::common-avro under Aklivity Community License Agreement
+ zilla::runtime::common-json under Aklivity Community License Agreement
diff --git a/runtime/model-avro/pom.xml b/runtime/model-avro/pom.xml
index 81aef3a62d..95387f0b32 100644
--- a/runtime/model-avro/pom.xml
+++ b/runtime/model-avro/pom.xml
@@ -43,22 +43,20 @@
${project.groupId}
- engine
- test-jar
+ common-avro
${project.version}
- test
- org.apache.avro
- avro
-
-
- com.fasterxml.jackson.core
- jackson-core
+ ${project.groupId}
+ common-json
+ ${project.version}
- com.fasterxml.jackson.core
- jackson-databind
+ ${project.groupId}
+ engine
+ test-jar
+ ${project.version}
+ test
org.mockito
@@ -146,34 +144,6 @@
-
- org.apache.maven.plugins
- maven-shade-plugin
-
-
- package
-
- shade
-
-
-
-
- org.apache.avro:avro
-
-
-
-
- org.apache.avro
- io.aklivity.zilla.runtime.model.avro.internal.avro
-
-
- true
- true
- true
-
-
-
-
org.moditect
moditect-maven-plugin
@@ -199,7 +169,6 @@
io/aklivity/zilla/runtime/model/avro/internal/types/**/*.class
- org/apache/avro/**/*.class
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java
index 4a5d6ebac4..2b75c097ab 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java
@@ -19,14 +19,27 @@
import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW;
-public class AvroField
+public final class AvroField
{
+ private static final int INITIAL_CAPACITY = 24;
+
public final OctetsFW value;
- public final MutableDirectBuffer buffer;
+
+ private MutableDirectBuffer buffer;
public AvroField()
{
this.value = new OctetsFW();
- this.buffer = new UnsafeBuffer(new byte[24]);
+ this.buffer = new UnsafeBuffer(new byte[INITIAL_CAPACITY]);
+ }
+
+ public MutableDirectBuffer buffer(
+ int capacity)
+ {
+ if (capacity > buffer.capacity())
+ {
+ buffer = new UnsafeBuffer(new byte[Math.max(buffer.capacity() * 2, capacity)]);
+ }
+ return buffer;
}
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
index 75bf85c98b..95e31c1715 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
@@ -14,13 +14,8 @@
*/
package io.aklivity.zilla.runtime.model.avro.internal;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.agrona.DirectBuffer;
@@ -28,81 +23,54 @@
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
-import org.agrona.io.DirectBufferInputStream;
-import org.agrona.io.ExpandableDirectBufferOutputStream;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
+import io.aklivity.zilla.runtime.common.avro.Avro;
+import io.aklivity.zilla.runtime.common.avro.AvroEvent;
+import io.aklivity.zilla.runtime.common.avro.AvroKind;
+import io.aklivity.zilla.runtime.common.avro.AvroParser;
+import io.aklivity.zilla.runtime.common.avro.AvroSchema;
+import io.aklivity.zilla.runtime.common.avro.AvroType;
+import io.aklivity.zilla.runtime.common.avro.AvroValidationException;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBooleanFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBytesFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroDoubleFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroFloatFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroIntFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroLongFW;
-import io.aklivity.zilla.runtime.model.avro.internal.types.AvroUnionFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW;
public abstract class AvroModelHandler
{
protected static final String VIEW_JSON = "json";
- private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
- private static final OutputStream EMPTY_OUTPUT_STREAM = new ByteArrayOutputStream(0);
+ // headroom so the bounded output window admits any single scalar value (worst-case JSON string escaping is
+ // ~6x, base64 ~1.34x, a double is 8 bytes); a datum whose total exceeds the window drains in chunks
+ private static final int OUT_SCALE = 8;
+ private static final int OUT_SLACK = 1024;
+
private static final int JSON_FIELD_STRUCTURE_LENGTH = "\"\":\"\",".length();
private static final int JSON_FIELD_UNION_LENGTH = "\"\":{\"DATA_TYPE\":\"\"},".length();
- private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[]," .length();
+ private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[],".length();
private static final int JSON_FIELD_MAP_LENGTH = "\"\":{},".length();
protected final SchemaConfig catalog;
protected final CatalogHandler handler;
- protected final DecoderFactory decoderFactory;
- protected final EncoderFactory encoderFactory;
- protected final BinaryDecoder decoder;
- protected final BinaryEncoder encoder;
protected final String subject;
protected final String view;
- protected final ExpandableDirectBufferOutputStream expandable;
- protected final DirectBufferInputStream in;
protected final AvroModelEventContext event;
protected final Map extracted;
+ protected final MutableDirectBuffer out;
+ protected final MutableDirectBuffer accumulator;
- private final Int2ObjectCache schemas;
- private final Int2ObjectCache> readers;
- private final Int2ObjectCache> writers;
- private final Int2ObjectCache records;
+ private final Int2ObjectCache schemas;
+ private final Int2ObjectCache parsers;
private final Int2IntHashMap paddings;
- private final AvroBytesFW bytesRO;
- private final AvroIntFW intRO;
- private final AvroLongFW longRO;
- private final AvroFloatFW floatRO;
- private final AvroDoubleFW doubleRO;
- private final AvroUnionFW unionRO;
private final int paddingMaxItems;
- protected int progress;
-
protected AvroModelHandler(
AvroModelConfiguration config,
AvroModelConfig options,
EngineContext context)
{
- this.decoderFactory = DecoderFactory.get();
- this.decoder = decoderFactory.binaryDecoder(EMPTY_INPUT_STREAM, null);
- this.encoderFactory = EncoderFactory.get();
- this.encoder = encoderFactory.binaryEncoder(EMPTY_OUTPUT_STREAM, null);
CatalogedConfig cataloged = options.cataloged.get(0);
this.handler = context.supplyCatalog(cataloged.id);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
@@ -111,23 +79,28 @@ protected AvroModelHandler(
? catalog.subject
: options.subject;
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
- this.readers = new Int2ObjectCache<>(1, 1024, i -> {});
- this.writers = new Int2ObjectCache<>(1, 1024, i -> {});
- this.records = new Int2ObjectCache<>(1, 1024, i -> {});
+ this.parsers = new Int2ObjectCache<>(1, 1024, i -> {});
this.paddings = new Int2IntHashMap(-1);
- this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
- this.in = new DirectBufferInputStream();
this.event = new AvroModelEventContext(context);
this.extracted = new HashMap<>();
- this.bytesRO = new AvroBytesFW();
- this.intRO = new AvroIntFW();
- this.longRO = new AvroLongFW();
- this.floatRO = new AvroFloatFW();
- this.doubleRO = new AvroDoubleFW();
- this.unionRO = new AvroUnionFW();
+ this.out = new ExpandableDirectByteBuffer();
+ this.accumulator = new ExpandableDirectByteBuffer();
this.paddingMaxItems = config.paddingMaxItems();
}
+ // the bounded output window for a datum decoded/encoded from a payload of the given length, grown so any
+ // single scalar value fits; a total exceeding the window drains across successive feeds
+ protected final int outLimit(
+ int length)
+ {
+ int limit = length * OUT_SCALE + OUT_SLACK;
+ if (out.capacity() < limit)
+ {
+ out.putByte(limit - 1, (byte) 0);
+ }
+ return limit;
+ }
+
protected final boolean validate(
long traceId,
long bindingId,
@@ -139,51 +112,23 @@ protected final boolean validate(
boolean status = false;
try
{
- Schema schema = supplySchema(schemaId);
- if (schema != null)
+ AvroParser parser = supplyParser(schemaId);
+ if (parser != null)
{
- switch (schema.getType())
- {
- case STRING:
- status = true;
- break;
- case RECORD:
- GenericRecord record = supplyRecord(schemaId);
- in.wrap(buffer, index, length);
- GenericDatumReader reader = supplyReader(schemaId);
- if (reader != null)
- {
- decoderFactory.binaryDecoder(in, decoder);
- reader.read(record, decoder);
- status = true;
- }
- progress = index;
- extractFields(buffer, index + length, schema);
- break;
- default:
- break;
- }
+ parser.reset();
+ parser.wrap(buffer, index, length, true);
+ walk(parser);
+ status = true;
}
}
- catch (IOException | AvroRuntimeException ex)
+ catch (AvroValidationException ex)
{
event.validationFailure(traceId, bindingId, ex.getMessage());
}
return status;
}
- protected void extractFields(
- DirectBuffer buffer,
- int limit,
- Schema schema)
- {
- for (Schema.Field field : schema.getFields())
- {
- extract(field.schema(), buffer, limit, extracted.get(field.name()));
- }
- }
-
- protected final Schema supplySchema(
+ protected final AvroSchema supplySchema(
int schemaId)
{
return schemas.computeIfAbsent(schemaId, this::resolveSchema);
@@ -192,247 +137,180 @@ protected final Schema supplySchema(
protected final int supplyPadding(
int schemaId)
{
- return paddings.computeIfAbsent(schemaId, id -> calculatePadding(supplySchema(id)));
- }
-
- protected final GenericDatumReader supplyReader(
- int schemaId)
- {
- return readers.computeIfAbsent(schemaId, this::createReader);
- }
-
- protected final GenericDatumWriter supplyWriter(
- int schemaId)
- {
- return writers.computeIfAbsent(schemaId, this::createWriter);
+ return paddings.computeIfAbsent(schemaId, id ->
+ {
+ AvroSchema schema = supplySchema(id);
+ return calculatePadding(schema != null ? schema.type() : null);
+ });
}
- protected final GenericRecord supplyRecord(
+ private AvroParser supplyParser(
int schemaId)
{
- return records.computeIfAbsent(schemaId, this::createRecord);
+ return parsers.computeIfAbsent(schemaId, id ->
+ {
+ AvroSchema schema = supplySchema(id);
+ return schema != null ? Avro.parser(schema) : null;
+ });
}
- private GenericDatumReader createReader(
+ private AvroSchema resolveSchema(
int schemaId)
{
- Schema schema = supplySchema(schemaId);
- GenericDatumReader reader = null;
- if (schema != null)
+ AvroSchema schema = null;
+ String schemaText = handler.resolve(schemaId);
+ if (schemaText != null)
{
- reader = new GenericDatumReader(schema);
+ schema = Avro.schema(schemaText);
}
- return reader;
+ return schema;
}
- private GenericDatumWriter createWriter(
- int schemaId)
+ private void walk(
+ AvroParser parser)
{
- Schema schema = supplySchema(schemaId);
- GenericDatumWriter writer = null;
- if (schema != null)
+ boolean extracting = !extracted.isEmpty();
+ int depth = 0;
+ AvroField current = null;
+ while (parser.hasNext())
{
- writer = new GenericDatumWriter(schema);
+ AvroEvent next = parser.nextEvent();
+ if (next == null)
+ {
+ break;
+ }
+ switch (next)
+ {
+ case START_RECORD:
+ case START_ARRAY:
+ case START_MAP:
+ depth++;
+ current = null;
+ break;
+ case END_RECORD:
+ case END_ARRAY:
+ case END_MAP:
+ depth--;
+ current = null;
+ break;
+ case FIELD_NAME:
+ current = extracting && depth == 1 ? extracted.get(parser.getField()) : null;
+ break;
+ case UNION_BRANCH:
+ case MAP_KEY:
+ case START_MESSAGE:
+ case END_MESSAGE:
+ break;
+ default:
+ if (current != null)
+ {
+ writeExtract(current, next, parser);
+ }
+ current = null;
+ break;
+ }
}
- return writer;
}
- private GenericRecord createRecord(
- int schemaId)
- {
- Schema schema = supplySchema(schemaId);
- return schema != null && schema.getType() == Schema.Type.RECORD
- ? new GenericData.Record(schema)
- : null;
- }
-
- private Schema resolveSchema(
- int schemaId)
+ private void writeExtract(
+ AvroField field,
+ AvroEvent next,
+ AvroParser parser)
{
- Schema schema = null;
- String schemaText = handler.resolve(schemaId);
- if (schemaText != null)
+ OctetsFW value = field.value;
+ switch (next)
+ {
+ case STRING:
+ case BYTES:
+ case FIXED:
{
- schema = new Schema.Parser().parse(schemaText);
+ DirectBuffer segment = parser.getSegment();
+ int length = segment.capacity();
+ MutableDirectBuffer buffer = field.buffer(length);
+ buffer.putBytes(0, segment, 0, length);
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ case ENUM:
+ case INT:
+ {
+ MutableDirectBuffer buffer = field.buffer(32);
+ int length = buffer.putIntAscii(0, parser.getInt());
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ case LONG:
+ {
+ MutableDirectBuffer buffer = field.buffer(32);
+ int length = buffer.putLongAscii(0, parser.getLong());
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ case FLOAT:
+ {
+ String text = String.valueOf(parser.getFloat());
+ MutableDirectBuffer buffer = field.buffer(text.length());
+ int length = buffer.putStringWithoutLengthAscii(0, text);
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ case DOUBLE:
+ {
+ String text = String.valueOf(parser.getDouble());
+ MutableDirectBuffer buffer = field.buffer(text.length());
+ int length = buffer.putStringWithoutLengthAscii(0, text);
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ case BOOLEAN:
+ {
+ String text = String.valueOf(parser.getBoolean());
+ MutableDirectBuffer buffer = field.buffer(text.length());
+ int length = buffer.putStringWithoutLengthAscii(0, text);
+ value.wrap(buffer, 0, length);
+ break;
+ }
+ default:
+ break;
}
- return schema;
}
private int calculatePadding(
- Schema schema)
+ AvroType type)
{
int padding = 0;
- if (schema != null)
+ if (type != null)
{
padding = 10;
- if (schema.getType().equals(Schema.Type.RECORD))
+ if (type.kind() == AvroKind.RECORD)
{
- for (Schema.Field field : schema.getFields())
+ for (io.aklivity.zilla.runtime.common.avro.AvroField field : type.fields())
{
- padding += field.name().getBytes().length;
+ padding += field.name().getBytes(StandardCharsets.UTF_8).length;
- switch (field.schema().getType())
+ AvroType fieldType = field.type();
+ switch (fieldType.kind())
{
case RECORD:
- {
- padding += calculatePadding(field.schema());
+ padding += calculatePadding(fieldType);
break;
- }
case UNION:
- {
padding += JSON_FIELD_UNION_LENGTH;
break;
- }
case MAP:
- {
- padding += JSON_FIELD_MAP_LENGTH + paddingMaxItems +
- calculatePadding(field.schema().getValueType());
+ padding += JSON_FIELD_MAP_LENGTH + paddingMaxItems + calculatePadding(fieldType.values());
break;
- }
case ARRAY:
- {
- padding += JSON_FIELD_ARRAY_LENGTH + paddingMaxItems +
- calculatePadding(field.schema().getElementType());
+ padding += JSON_FIELD_ARRAY_LENGTH + paddingMaxItems + calculatePadding(fieldType.items());
break;
- }
default:
- {
padding += JSON_FIELD_STRUCTURE_LENGTH;
break;
}
- }
}
}
}
return padding;
}
-
- private void extract(
- Schema schema,
- DirectBuffer data,
- int limit,
- AvroField field)
- {
- switch (schema.getType())
- {
- case RECORD:
- extractFields(data, limit, schema);
- break;
- case BYTES:
- case STRING:
- AvroBytesFW bytes = bytesRO.wrap(data, progress, limit);
- OctetsFW value = bytes.value();
- progress = bytes.limit();
- if (field != null)
- {
- OctetsFW octets = field.value;
- octets.wrap(value.buffer(), value.offset(), value.limit());
- }
- break;
- case ENUM:
- case INT:
- AvroIntFW int32 = intRO.wrap(data, progress, limit);
- int intValue = int32.value();
- progress = int32.limit();
- if (field != null)
- {
- MutableDirectBuffer text = field.buffer;
- int length = text.putIntAscii(0, intValue);
- field.value.wrap(text, 0, length);
- }
- break;
- case FLOAT:
- AvroFloatFW avroFloat = floatRO.wrap(data, progress, limit);
- int len = 0;
- DirectBuffer buffer = avroFloat.value().value();
- float floatValue = Float.intBitsToFloat(decodeNumberBytes(len, buffer));
- progress = avroFloat.limit();
- if (field != null)
- {
- MutableDirectBuffer text = field.buffer;
- int length = text.putStringWithoutLengthAscii(0, String.valueOf(floatValue));
- field.value.wrap(text, 0, length);
- }
- break;
- case LONG:
- AvroLongFW avroLong = longRO.wrap(data, progress, limit);
- long longValue = avroLong.value();
- progress = avroLong.limit();
- if (field != null)
- {
- MutableDirectBuffer text = field.buffer;
- int length = text.putLongAscii(0, longValue);
- field.value.wrap(text, 0, length);
- }
- break;
- case DOUBLE:
- AvroDoubleFW avroDouble = doubleRO.wrap(data, progress, limit);
- len = 0;
- buffer = avroDouble.value().value();
- int decoded = (buffer.getByte(len++) & 0xff) |
- ((buffer.getByte(len++) & 0xff) << 8) |
- ((buffer.getByte(len++) & 0xff) << 16) |
- ((buffer.getByte(len++) & 0xff) << 24);
-
- double doubleValue = Double.longBitsToDouble((((long) decoded) & 0xffffffffL) |
- (((long) decodeNumberBytes(len, buffer)) << 32));
- progress = avroDouble.limit();
- if (field != null)
- {
- MutableDirectBuffer text = field.buffer;
- int length = text.putStringWithoutLengthAscii(0, String.valueOf(doubleValue));
- field.value.wrap(text, 0, length);
- }
- break;
- case BOOLEAN:
- AvroBooleanFW avroBoolean = new AvroBooleanFW().wrap(data, progress, limit);
- value = avroBoolean.value();
- progress = avroBoolean.limit();
- if (field != null)
- {
- field.value.wrap(value.buffer(), value.offset(), value.limit());
- }
- break;
- case FIXED:
- int fixedSize = schema.getFixedSize();
- if (field != null)
- {
- field.value.wrap(data, progress, progress + fixedSize);
- }
- progress += fixedSize;
- break;
- case UNION:
- List types = schema.getTypes();
- Integer nullIndex = schema.getIndexNamed("null");
- if (nullIndex != null && types.size() == 2)
- {
- AvroUnionFW avroUnion = unionRO.wrap(data, progress, limit);
- int index = avroUnion.index();
-
- if (index != nullIndex)
- {
- progress = avroUnion.limit();
-
- int nonNullIndex = nullIndex ^ 1;
- Schema nonNull = types.get(nonNullIndex);
-
- extract(nonNull, data, limit, field);
- }
- }
- break;
- default:
- break;
- }
- }
-
- private static int decodeNumberBytes(
- int len,
- DirectBuffer buffer)
- {
- return (buffer.getByte(len++) & 0xff) |
- ((buffer.getByte(len++) & 0xff) << 8) |
- ((buffer.getByte(len++) & 0xff) << 16) |
- ((buffer.getByte(len) & 0xff) << 24);
- }
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
index 62acccdf63..13c0793525 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
@@ -16,20 +16,22 @@
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID;
-import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.agrona.DirectBuffer;
+import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.CanonicalJsonEncoder;
-import org.apache.avro.io.JsonEncoder;
+import io.aklivity.zilla.runtime.common.avro.Avro;
+import io.aklivity.zilla.runtime.common.avro.AvroGenerator;
+import io.aklivity.zilla.runtime.common.avro.AvroPipeline;
+import io.aklivity.zilla.runtime.common.avro.AvroPipeline.Status;
+import io.aklivity.zilla.runtime.common.avro.AvroSchema;
+import io.aklivity.zilla.runtime.common.avro.AvroSink;
+import io.aklivity.zilla.runtime.common.avro.json.AvroJson;
+import io.aklivity.zilla.runtime.common.json.JsonEx;
+import io.aklivity.zilla.runtime.common.json.JsonGeneratorEx;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
@@ -43,6 +45,7 @@ public class AvroReadConverterHandler extends AvroModelHandler implements Conver
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();
private final Matcher matcher;
+ private final Int2ObjectCache pipelines;
public AvroReadConverterHandler(
AvroModelConfiguration config,
@@ -51,6 +54,7 @@ public AvroReadConverterHandler(
{
super(config, options, context);
this.matcher = PATH_PATTERN.matcher("");
+ this.pipelines = new Int2ObjectCache<>(1, 1024, i -> {});
}
@Override
@@ -99,7 +103,7 @@ public int convert(
int length,
ValueConsumer next)
{
- for (AvroField field: extracted.values())
+ for (AvroField field : extracted.values())
{
field.value.wrap(EMPTY_BUFFER, 0, 0);
}
@@ -158,12 +162,10 @@ private int decodePayload(
if (VIEW_JSON.equals(view))
{
- deserializeRecord(traceId, bindingId, schemaId, data, index, length);
- int recordLength = expandable.position();
- if (recordLength > 0)
+ valLength = deserializeRecord(traceId, bindingId, schemaId, data, index, length, next);
+ if (valLength != -1 && !extracted.isEmpty())
{
- next.accept(expandable.buffer(), 0, recordLength);
- valLength = recordLength;
+ validate(traceId, bindingId, schemaId, data, index, length);
}
}
else if (validate(traceId, bindingId, schemaId, data, index, length))
@@ -174,36 +176,95 @@ else if (validate(traceId, bindingId, schemaId, data, index, length))
return valLength;
}
- private void deserializeRecord(
+ private int deserializeRecord(
long traceId,
long bindingId,
int schemaId,
- DirectBuffer buffer,
+ DirectBuffer data,
int index,
- int length)
+ int length,
+ ValueConsumer next)
{
- try
+ int valLength = -1;
+
+ AvroToJson pipeline = supplyPipeline(schemaId);
+ if (pipeline != null)
{
- GenericDatumReader reader = supplyReader(schemaId);
- GenericDatumWriter writer = supplyWriter(schemaId);
- if (reader != null)
+ int limit = outLimit(length);
+ int accumulated = 0;
+ pipeline.generator.wrap(out, 0, limit);
+ pipeline.pipeline.reset();
+ Status status = pipeline.pipeline.feed(data, index, length, true);
+ while (status == Status.SUSPENDED)
+ {
+ pipeline.json.flush();
+ int chunk = pipeline.json.length();
+ accumulator.putBytes(accumulated, out, 0, chunk);
+ accumulated += chunk;
+ pipeline.generator.wrap(out, 0, limit);
+ status = pipeline.pipeline.feed(data, index, length, true);
+ }
+
+ if (status == Status.COMPLETED)
+ {
+ pipeline.json.flush();
+ int chunk = pipeline.json.length();
+ if (accumulated == 0)
+ {
+ next.accept(out, 0, chunk);
+ valLength = chunk;
+ }
+ else
+ {
+ accumulator.putBytes(accumulated, out, 0, chunk);
+ accumulated += chunk;
+ next.accept(accumulator, 0, accumulated);
+ valLength = accumulated;
+ }
+ }
+ else
{
- GenericRecord record = supplyRecord(schemaId);
- in.wrap(buffer, index, length);
- expandable.wrap(expandable.buffer());
- record = reader.read(record, decoderFactory.binaryDecoder(in, decoder));
- Schema schema = record.getSchema();
- JsonEncoder out = new CanonicalJsonEncoder(schema, expandable);
- writer.write(record, out);
- out.flush();
-
- progress = index;
- extractFields(buffer, index + length, schema);
+ event.validationFailure(traceId, bindingId, "Invalid Avro encoding");
}
}
- catch (IOException | AvroRuntimeException ex)
+ return valLength;
+ }
+
+ private AvroToJson supplyPipeline(
+ int schemaId)
+ {
+ return pipelines.computeIfAbsent(schemaId, this::newPipeline);
+ }
+
+ private AvroToJson newPipeline(
+ int schemaId)
+ {
+ AvroToJson pipeline = null;
+ AvroSchema schema = supplySchema(schemaId);
+ if (schema != null)
+ {
+ JsonGeneratorEx json = JsonEx.createGenerator();
+ AvroGenerator generator = AvroJson.generator(schema, json, true);
+ AvroPipeline avro = Avro.stream(Avro.parser(schema)).into(AvroSink.of(generator));
+ pipeline = new AvroToJson(avro, json, generator);
+ }
+ return pipeline;
+ }
+
+ private static final class AvroToJson
+ {
+ private final AvroPipeline pipeline;
+ private final JsonGeneratorEx json;
+ private final AvroGenerator generator;
+
+ private AvroToJson(
+ AvroPipeline pipeline,
+ JsonGeneratorEx json,
+ AvroGenerator generator)
{
- event.validationFailure(traceId, bindingId, ex.getMessage());
+ this.pipeline = pipeline;
+ this.json = json;
+ this.generator = generator;
}
}
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java
index c94f649ec5..eace94458f 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java
@@ -14,15 +14,8 @@
*/
package io.aklivity.zilla.runtime.model.avro.internal;
-import java.io.IOException;
-import java.io.InputStream;
-
import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.model.ValidatorHandler;
@@ -33,6 +26,8 @@ public class AvroValidatorHandler extends AvroModelHandler implements ValidatorH
{
private final ExpandableDirectByteBuffer buffer;
+ private int progress;
+
public AvroValidatorHandler(
AvroModelConfiguration config,
AvroModelConfig options,
@@ -72,13 +67,11 @@ public boolean validate(
}
else
{
- in.wrap(buffer, 0, progress);
-
int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);
- status = validate(traceId, bindingId, schemaId, in);
+ status = validate(traceId, bindingId, schemaId, buffer, 0, progress);
}
}
}
@@ -100,36 +93,6 @@ private boolean validatePayload(
int length,
ValueConsumer next)
{
- in.wrap(data, index, length);
- return validate(traceId, bindingId, schemaId, in);
- }
-
- private boolean validate(
- long traceId,
- long bindingId,
- int schemaId,
- InputStream in)
- {
- boolean status = false;
- try
- {
- Schema schema = supplySchema(schemaId);
- if (schema != null)
- {
- GenericRecord record = supplyRecord(schemaId);
- GenericDatumReader reader = supplyReader(schemaId);
- if (reader != null)
- {
- decoderFactory.binaryDecoder(in, decoder);
- reader.read(record, decoder);
- status = true;
- }
- }
- }
- catch (IOException | AvroRuntimeException ex)
- {
- event.validationFailure(traceId, bindingId, ex.getMessage());
- }
- return status;
+ return validate(traceId, bindingId, schemaId, data, index, length);
}
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
index 6b8e3e0ac6..e6e2e31efc 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
@@ -14,16 +14,18 @@
*/
package io.aklivity.zilla.runtime.model.avro.internal;
-import java.io.IOException;
-
import org.agrona.DirectBuffer;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.CanonicalJsonDecoder;
+import org.agrona.collections.Int2ObjectCache;
+import io.aklivity.zilla.runtime.common.avro.Avro;
+import io.aklivity.zilla.runtime.common.avro.AvroGenerator;
+import io.aklivity.zilla.runtime.common.avro.AvroPipeline;
+import io.aklivity.zilla.runtime.common.avro.AvroPipeline.Status;
+import io.aklivity.zilla.runtime.common.avro.AvroSchema;
+import io.aklivity.zilla.runtime.common.avro.AvroSink;
+import io.aklivity.zilla.runtime.common.avro.json.AvroJson;
+import io.aklivity.zilla.runtime.common.json.JsonEx;
+import io.aklivity.zilla.runtime.common.json.JsonParserEx;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
@@ -32,12 +34,15 @@
public class AvroWriteConverterHandler extends AvroModelHandler implements ConverterHandler
{
+ private final Int2ObjectCache pipelines;
+
public AvroWriteConverterHandler(
AvroModelConfiguration config,
AvroModelConfig options,
EngineContext context)
{
super(config, options, context);
+ this.pipelines = new Int2ObjectCache<>(1, 1024, i -> {});
}
@Override
@@ -79,54 +84,67 @@ private int serializeJsonRecord(
long traceId,
long bindingId,
int schemaId,
- DirectBuffer buffer,
+ DirectBuffer data,
int index,
int length,
ValueConsumer next)
{
int valLength = -1;
- try
- {
- Schema schema = supplySchema(schemaId);
- if (schema != null)
+ JsonToAvro pipeline = supplyPipeline(schemaId);
+ if (pipeline != null)
+ {
+ // Avro binary is never more than OUT_SCALE times the JSON it encodes (a double is 8 bytes from a
+ // one-character number), so the bounded window admits the whole datum and the feed completes once
+ pipeline.generator.wrap(out, 0, outLimit(length));
+ pipeline.pipeline.reset();
+ Status status = pipeline.pipeline.feed(data, index, length, true);
+ if (status == Status.COMPLETED)
{
- switch (schema.getType())
- {
- case STRING:
- next.accept(buffer, index, length);
- valLength = length;
- break;
- case RECORD:
- GenericDatumReader reader = supplyReader(schemaId);
- GenericDatumWriter writer = supplyWriter(schemaId);
- if (reader != null)
- {
- GenericRecord record = supplyRecord(schemaId);
- in.wrap(buffer, index, length);
- expandable.wrap(expandable.buffer());
- CanonicalJsonDecoder decoder = new CanonicalJsonDecoder(schema, in);
- record = reader.read(record, decoder);
- encoderFactory.binaryEncoder(expandable, encoder);
- writer.write(record, encoder);
- encoder.flush();
- int position = expandable.position();
- if (position > 0)
- {
- next.accept(expandable.buffer(), 0, position);
- valLength = position;
- }
- }
- break;
- default:
- break;
- }
+ int chunk = pipeline.generator.length();
+ next.accept(out, 0, chunk);
+ valLength = chunk;
+ }
+ else
+ {
+ event.validationFailure(traceId, bindingId, "Invalid JSON encoding");
}
}
- catch (IOException | AvroRuntimeException ex)
+ return valLength;
+ }
+
+ private JsonToAvro supplyPipeline(
+ int schemaId)
+ {
+ return pipelines.computeIfAbsent(schemaId, this::newPipeline);
+ }
+
+ private JsonToAvro newPipeline(
+ int schemaId)
+ {
+ JsonToAvro pipeline = null;
+ AvroSchema schema = supplySchema(schemaId);
+ if (schema != null)
{
- event.validationFailure(traceId, bindingId, ex.getMessage());
+ JsonParserEx parser = JsonEx.createParser();
+ AvroGenerator generator = Avro.generator(schema, out, 0);
+ AvroPipeline avro = AvroJson.stream(schema, parser, true).into(AvroSink.of(generator));
+ pipeline = new JsonToAvro(avro, generator);
+ }
+ return pipeline;
+ }
+
+ private static final class JsonToAvro
+ {
+ private final AvroPipeline pipeline;
+ private final AvroGenerator generator;
+
+ private JsonToAvro(
+ AvroPipeline pipeline,
+ AvroGenerator generator)
+ {
+ this.pipeline = pipeline;
+ this.generator = generator;
}
- return valLength;
}
}
diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java
deleted file mode 100644
index 3656e1d44d..0000000000
--- a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Copyright 2021-2024 Aklivity Inc
- *
- * Licensed under the Aklivity Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * https://www.aklivity.io/aklivity-community-license/
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.avro.io;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.io.parsing.Symbol;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
-public final class CanonicalJsonDecoder extends JsonDecoder
-{
- private static final Method ADVANCE;
- private static final Method ERROR;
- private static final Field IN;
-
- static
- {
- try
- {
- ADVANCE = JsonDecoder.class.getDeclaredMethod("advance", Symbol.class);
- ERROR = JsonDecoder.class.getDeclaredMethod("error", String.class);
- IN = JsonDecoder.class.getDeclaredField("in");
- ADVANCE.setAccessible(true);
- ERROR.setAccessible(true);
- IN.setAccessible(true);
- }
- catch (NoSuchMethodException ex)
- {
- throw new RuntimeException(ex);
- }
- catch (SecurityException ex)
- {
- throw new RuntimeException(ex);
- }
- catch (NoSuchFieldException ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
-
- public CanonicalJsonDecoder(final Schema schema, final InputStream in)
- throws IOException
- {
- super(schema, in);
- }
-
- public CanonicalJsonDecoder(final Schema schema, final String in)
- throws IOException
- {
- this(schema, new ByteArrayInputStream(in.getBytes(Charset.forName("UTF-8"))));
- }
-
- /**
- * Overwrite this function to optime json decoding of union {null, type}.
- *
- * @return
- * @throws IOException
- */
- @Override
- public int readIndex() throws IOException
- {
- try
- {
- ADVANCE.invoke(this, Symbol.UNION);
- JsonParser lin = getParser();
- Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol();
-
- String label;
- final JsonToken currentToken = lin.getCurrentToken();
- if (currentToken == JsonToken.VALUE_NULL)
- {
- label = "null";
- }
- else if (CanonicalJsonEncoder.isNullableSingle(a))
- {
- label = CanonicalJsonEncoder.getNullableSingle(a);
- }
- else if (currentToken == JsonToken.START_OBJECT &&
- lin.nextToken() == JsonToken.FIELD_NAME)
- {
- label = lin.getText();
- lin.nextToken();
- parser.pushSymbol(Symbol.UNION_END);
- }
- else
- {
- throw (AvroTypeException) ERROR.invoke(this, "start-union");
- }
- int n = a.findLabel(label);
- if (n < 0)
- {
- throw new AvroTypeException("Unknown union branch " + label);
- }
- parser.pushSymbol(a.getSymbol(n));
- return n;
- }
- catch (IllegalAccessException ex)
- {
- throw new RuntimeException(ex);
- }
- catch (IllegalArgumentException ex)
- {
- throw new RuntimeException(ex);
- }
- catch (InvocationTargetException ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Overwrite to inject default values.
- *
- * @param input
- * @param top
- * @return
- * @throws IOException
- */
-
- @Override
- public Symbol doAction(final Symbol input, final Symbol top) throws IOException
- {
- try
- {
- JsonParser in = getParser();
- if (top instanceof Symbol.FieldAdjustAction)
- {
- Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
- String name = fa.fname;
- if (currentReorderBuffer != null)
- {
- List node = currentReorderBuffer.savedFields.get(name);
- if (node != null)
- {
- currentReorderBuffer.savedFields.remove(name);
- currentReorderBuffer.origParser = in;
- setParser(makeParser(node));
- return null;
- }
- }
- if (in.getCurrentToken() == JsonToken.FIELD_NAME)
- {
- do
- {
- String fn = in.getText();
- in.nextToken();
- if (name.equals(fn))
- {
- return null;
- }
- else
- {
- if (currentReorderBuffer == null)
- {
- currentReorderBuffer = new JsonDecoder.ReorderBuffer();
- }
- currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in));
- }
- }
- while (in.getCurrentToken() == JsonToken.FIELD_NAME);
- }
- }
- else if (top == Symbol.FIELD_END)
- {
- if (currentReorderBuffer != null && currentReorderBuffer.origParser != null)
- {
- setParser(currentReorderBuffer.origParser);
- currentReorderBuffer.origParser = null;
- }
- }
- else if (top == Symbol.RECORD_START)
- {
- if (in.getCurrentToken() == JsonToken.START_OBJECT)
- {
- in.nextToken();
- reorderBuffers.push(currentReorderBuffer);
- currentReorderBuffer = null;
- }
- else
- {
- throw error("record-start");
- }
- }
- else if (top == Symbol.RECORD_END || top == Symbol.UNION_END)
- {
- if (in.getCurrentToken() == JsonToken.END_OBJECT)
- {
- in.nextToken();
- if (top == Symbol.RECORD_END)
- {
- if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty())
- {
- throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet());
- }
- currentReorderBuffer = reorderBuffers.pop();
- }
- }
- else
- {
- throw error(top == Symbol.RECORD_END ? "record-end" : "union-end");
- }
- }
- else
- {
- throw new AvroTypeException("Unknown action symbol " + top);
- }
- return null;
- }
- catch (IllegalAccessException ex)
- {
- throw new RuntimeException(ex);
- }
- }
-
- private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null);
-
- private JsonParser getParser() throws IllegalAccessException
- {
- return (JsonParser) IN.get(this);
- }
-
- private void setParser(final JsonParser parser) throws IllegalAccessException
- {
- IN.set(this, parser);
- }
-}
diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java
deleted file mode 100644
index d9a67d8adb..0000000000
--- a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2021-2024 Aklivity Inc
- *
- * Licensed under the Aklivity Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * https://www.aklivity.io/aklivity-community-license/
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.avro.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.parsing.Parser;
-import org.apache.avro.io.parsing.Symbol;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-/**
- * A derived encoder that does the skipping of fields that match the index. It also encodes unions of null and a single
- * type as a more normal key=value rather than key={type=value}.
- *
- * @author zfarkas
- */
-public final class CanonicalJsonEncoder extends JsonEncoder
-{
-
-
- public CanonicalJsonEncoder(
- final Schema sc,
- final OutputStream out) throws IOException
- {
- super(sc, out);
- }
-
- public CanonicalJsonEncoder(
- final Schema sc,
- final OutputStream out,
- final boolean pretty) throws IOException
- {
- super(sc, out, pretty);
- }
-
- public CanonicalJsonEncoder(
- final Schema sc,
- final JsonGenerator out) throws IOException
- {
- super(sc, out);
- }
-
- public Parser getParser()
- {
- return parser;
- }
-
- public static boolean isNullableSingle(
- final Symbol.Alternative top)
- {
- return top.size() == 2 && ("null".equals(top.getLabel(0)) || "null".equals(top.getLabel(1)));
- }
-
- public static String getNullableSingle(
- final Symbol.Alternative top)
- {
- final String label = top.getLabel(0);
- return "null".equals(label) ? top.getLabel(1) : label;
- }
-
- /**
- * Overwrite this function to optime json decoding of union {null, type}.
- *
- * @param unionIndex
- * @throws IOException
- */
-
- @Override
- public void writeIndex(
- final int unionIndex) throws IOException
- {
- parser.advance(Symbol.UNION);
- Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol();
- Symbol symbol = top.getSymbol(unionIndex);
- if (symbol != Symbol.NULL && !isNullableSingle(top))
- {
- out.writeStartObject();
- out.writeFieldName(top.getLabel(unionIndex));
- parser.pushSymbol(Symbol.UNION_END);
- }
- parser.pushSymbol(symbol);
- }
-
-}
diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java
deleted file mode 100644
index 9de5203574..0000000000
--- a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java
+++ /dev/null
@@ -1,942 +0,0 @@
-/*
- * Copyright 2021-2024 Aklivity Inc
- *
- * Licensed under the Aklivity Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * https://www.aklivity.io/aklivity-community-license/
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.avro.io;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.io.parsing.JsonGrammarGenerator;
-import org.apache.avro.io.parsing.Parser;
-import org.apache.avro.io.parsing.Symbol;
-import org.apache.avro.util.Utf8;
-
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonLocation;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonStreamContext;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.ObjectCodec;
-import com.fasterxml.jackson.core.Version;
-
-/** A {@link Decoder} for Avro's JSON data encoding.
- *
- * Construct using {@link DecoderFactory}.
- *
- * JsonDecoder is not thread-safe.
- * */
-public class JsonDecoder extends ParsingDecoder
- implements Parser.ActionHandler
-{
- private JsonParser in;
- private static JsonFactory jsonFactory = new JsonFactory();
- Stack reorderBuffers = new Stack();
- ReorderBuffer currentReorderBuffer;
-
- static class ReorderBuffer
- {
- public Map> savedFields = new HashMap>();
- public JsonParser origParser = null;
- }
-
- static final String CHARSET = "ISO-8859-1";
-
- private JsonDecoder(Symbol root, InputStream in) throws IOException
- {
- super(root);
- configure(in);
- }
-
- private JsonDecoder(Symbol root, String in) throws IOException
- {
- super(root);
- configure(in);
- }
-
- JsonDecoder(Schema schema, InputStream in) throws IOException
- {
- this(getSymbol(schema), in);
- }
-
- JsonDecoder(Schema schema, String in) throws IOException
- {
- this(getSymbol(schema), in);
- }
-
- private static Symbol getSymbol(Schema schema)
- {
- if (null == schema)
- {
- throw new NullPointerException("Schema cannot be null!");
- }
- return new JsonGrammarGenerator().generate(schema);
- }
-
- /**
- * Reconfigures this JsonDecoder to use the InputStream provided.
- *
- * If the InputStream provided is null, a NullPointerException is thrown.
- *
- * Otherwise, this JsonDecoder will reset its state and then
- * reconfigure its input.
- * @param in
- * The IntputStream to read from. Cannot be null.
- * @throws IOException
- * @return this JsonDecoder
- */
- public JsonDecoder configure(InputStream in) throws IOException
- {
- if (null == in)
- {
- throw new NullPointerException("InputStream to read from cannot be null!");
- }
- parser.reset();
- this.in = jsonFactory.createJsonParser(in);
- this.in.nextToken();
- return this;
- }
-
- /**
- * Reconfigures this JsonDecoder to use the String provided for input.
- *
- * If the String provided is null, a NullPointerException is thrown.
- *
- * Otherwise, this JsonDecoder will reset its state and then
- * reconfigure its input.
- * @param in
- * The String to read from. Cannot be null.
- * @throws IOException
- * @return this JsonDecoder
- */
- public JsonDecoder configure(String in) throws IOException
- {
- if (null == in)
- {
- throw new NullPointerException("String to read from cannot be null!");
- }
- parser.reset();
- this.in = new JsonFactory().createJsonParser(in);
- this.in.nextToken();
- return this;
- }
-
- private void advance(Symbol symbol) throws IOException
- {
- this.parser.processTrailingImplicitActions();
- if (in.getCurrentToken() == null && this.parser.depth() == 1)
- throw new EOFException();
- parser.advance(symbol);
- }
-
- @Override
- public void readNull() throws IOException
- {
- advance(Symbol.NULL);
- if (in.getCurrentToken() == JsonToken.VALUE_NULL)
- {
- in.nextToken();
- }
- else
- {
- throw error("null");
- }
- }
-
- @Override
- public boolean readBoolean() throws IOException
- {
- advance(Symbol.BOOLEAN);
- JsonToken t = in.getCurrentToken();
- if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE)
- {
- in.nextToken();
- return t == JsonToken.VALUE_TRUE;
- }
- else
- {
- throw error("boolean");
- }
- }
-
- @Override
- public int readInt() throws IOException
- {
- advance(Symbol.INT);
- if (in.getCurrentToken().isNumeric())
- {
- int result = in.getIntValue();
- in.nextToken();
- return result;
- }
- else
- {
- throw error("int");
- }
- }
-
- @Override
- public long readLong() throws IOException
- {
- advance(Symbol.LONG);
- if (in.getCurrentToken().isNumeric())
- {
- long result = in.getLongValue();
- in.nextToken();
- return result;
- }
- else
- {
- throw error("long");
- }
- }
-
- @Override
- public float readFloat() throws IOException
- {
- advance(Symbol.FLOAT);
- if (in.getCurrentToken().isNumeric())
- {
- float result = in.getFloatValue();
- in.nextToken();
- return result;
- }
- else
- {
- throw error("float");
- }
- }
-
- @Override
- public double readDouble() throws IOException
- {
- advance(Symbol.DOUBLE);
- if (in.getCurrentToken().isNumeric())
- {
- double result = in.getDoubleValue();
- in.nextToken();
- return result;
- }
- else
- {
- throw error("double");
- }
- }
-
- @Override
- public Utf8 readString(Utf8 old) throws IOException
- {
- return new Utf8(readString());
- }
-
- @Override
- public String readString() throws IOException
- {
- advance(Symbol.STRING);
- if (parser.topSymbol() == Symbol.MAP_KEY_MARKER)
- {
- parser.advance(Symbol.MAP_KEY_MARKER);
- if (in.getCurrentToken() != JsonToken.FIELD_NAME)
- {
- throw error("map-key");
- }
- }
- else
- {
- if (in.getCurrentToken() != JsonToken.VALUE_STRING)
- {
- throw error("string");
- }
- }
- String result = in.getText();
- in.nextToken();
- return result;
- }
-
- @Override
- public void skipString() throws IOException
- {
- advance(Symbol.STRING);
- if (parser.topSymbol() == Symbol.MAP_KEY_MARKER)
- {
- parser.advance(Symbol.MAP_KEY_MARKER);
- if (in.getCurrentToken() != JsonToken.FIELD_NAME)
- {
- throw error("map-key");
- }
- }
- else
- {
- if (in.getCurrentToken() != JsonToken.VALUE_STRING)
- {
- throw error("string");
- }
- }
- in.nextToken();
- }
-
- @Override
- public ByteBuffer readBytes(ByteBuffer old) throws IOException
- {
- advance(Symbol.BYTES);
- if (in.getCurrentToken() == JsonToken.VALUE_STRING)
- {
- byte[] result = readByteArray();
- in.nextToken();
- return ByteBuffer.wrap(result);
- }
- else
- {
- throw error("bytes");
- }
- }
-
- private byte[] readByteArray() throws IOException
- {
- byte[] result = in.getText().getBytes(CHARSET);
- return result;
- }
-
- @Override
- public void skipBytes() throws IOException
- {
- advance(Symbol.BYTES);
- if (in.getCurrentToken() == JsonToken.VALUE_STRING)
- {
- in.nextToken();
- }
- else
- {
- throw error("bytes");
- }
- }
-
- private void checkFixed(int size) throws IOException
- {
- advance(Symbol.FIXED);
- Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
- if (size != top.size)
- {
- throw new AvroTypeException(
- "Incorrect length for fixed binary: expected " +
- top.size + " but received " + size + " bytes.");
- }
- }
-
- @Override
- public void readFixed(byte[] bytes, int start, int len) throws IOException
- {
- checkFixed(len);
- if (in.getCurrentToken() == JsonToken.VALUE_STRING)
- {
- byte[] result = readByteArray();
- in.nextToken();
- if (result.length != len)
- {
- throw new AvroTypeException("Expected fixed length " + len
- + ", but got" + result.length);
- }
- System.arraycopy(result, 0, bytes, start, len);
- }
- else
- {
- throw error("fixed");
- }
- }
-
- @Override
- public void skipFixed(int length) throws IOException
- {
- checkFixed(length);
- doSkipFixed(length);
- }
-
- private void doSkipFixed(int length) throws IOException
- {
- if (in.getCurrentToken() == JsonToken.VALUE_STRING)
- {
- byte[] result = readByteArray();
- in.nextToken();
- if (result.length != length)
- {
- throw new AvroTypeException("Expected fixed length " + length
- + ", but got" + result.length);
- }
- }
- else
- {
- throw error("fixed");
- }
- }
-
- @Override
- protected void skipFixed() throws IOException
- {
- advance(Symbol.FIXED);
- Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
- doSkipFixed(top.size);
- }
-
- @Override
- public int readEnum() throws IOException
- {
- advance(Symbol.ENUM);
- Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
- if (in.getCurrentToken() == JsonToken.VALUE_STRING)
- {
- in.getText();
- int n = top.findLabel(in.getText());
- if (n >= 0)
- {
- in.nextToken();
- return n;
- }
- throw new AvroTypeException("Unknown symbol in enum " + in.getText());
- }
- else
- {
- throw error("fixed");
- }
- }
-
- @Override
- public long readArrayStart() throws IOException
- {
- advance(Symbol.ARRAY_START);
- if (in.getCurrentToken() == JsonToken.START_ARRAY)
- {
- in.nextToken();
- return doArrayNext();
- }
- else
- {
- throw error("array-start");
- }
- }
-
- @Override
- public long arrayNext() throws IOException
- {
- advance(Symbol.ITEM_END);
- return doArrayNext();
- }
-
- private long doArrayNext() throws IOException
- {
- if (in.getCurrentToken() == JsonToken.END_ARRAY)
- {
- parser.advance(Symbol.ARRAY_END);
- in.nextToken();
- return 0;
- }
- else
- {
- return 1;
- }
- }
-
- @Override
- public long skipArray() throws IOException
- {
- advance(Symbol.ARRAY_START);
- if (in.getCurrentToken() == JsonToken.START_ARRAY)
- {
- in.skipChildren();
- in.nextToken();
- advance(Symbol.ARRAY_END);
- }
- else
- {
- throw error("array-start");
- }
- return 0;
- }
-
- @Override
- public long readMapStart() throws IOException
- {
- advance(Symbol.MAP_START);
- if (in.getCurrentToken() == JsonToken.START_OBJECT)
- {
- in.nextToken();
- return doMapNext();
- }
- else
- {
- throw error("map-start");
- }
- }
-
- @Override
- public long mapNext() throws IOException
- {
- advance(Symbol.ITEM_END);
- return doMapNext();
- }
-
- private long doMapNext() throws IOException
- {
- if (in.getCurrentToken() == JsonToken.END_OBJECT)
- {
- in.nextToken();
- advance(Symbol.MAP_END);
- return 0;
- }
- else
- {
- return 1;
- }
- }
-
- @Override
- public long skipMap() throws IOException
- {
- advance(Symbol.MAP_START);
- if (in.getCurrentToken() == JsonToken.START_OBJECT)
- {
- in.skipChildren();
- in.nextToken();
- advance(Symbol.MAP_END);
- }
- else
- {
- throw error("map-start");
- }
- return 0;
- }
-
- @Override
- public int readIndex() throws IOException
- {
- advance(Symbol.UNION);
- Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol();
-
- String label;
- if (in.getCurrentToken() == JsonToken.VALUE_NULL)
- {
- label = "null";
- }
- else if (in.getCurrentToken() == JsonToken.START_OBJECT &&
- in.nextToken() == JsonToken.FIELD_NAME)
- {
- label = in.getText();
- in.nextToken();
- parser.pushSymbol(Symbol.UNION_END);
- }
- else
- {
- throw error("start-union");
- }
- int n = a.findLabel(label);
- if (n < 0)
- throw new AvroTypeException("Unknown union branch " + label);
- parser.pushSymbol(a.getSymbol(n));
- return n;
- }
-
- @Override
- public Symbol doAction(Symbol input, Symbol top) throws IOException
- {
- if (top instanceof Symbol.FieldAdjustAction)
- {
- Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
- String name = fa.fname;
- if (currentReorderBuffer != null)
- {
- List node = currentReorderBuffer.savedFields.get(name);
- if (node != null)
- {
- currentReorderBuffer.savedFields.remove(name);
- currentReorderBuffer.origParser = in;
- in = makeParser(node);
- return null;
- }
- }
- if (in.getCurrentToken() == JsonToken.FIELD_NAME)
- {
- do
- {
- String fn = in.getText();
- in.nextToken();
- if (name.equals(fn))
- {
- return null;
- }
- else
- {
- if (currentReorderBuffer == null)
- {
- currentReorderBuffer = new ReorderBuffer();
- }
- currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in));
- }
- }
- while (in.getCurrentToken() == JsonToken.FIELD_NAME);
- throw new AvroTypeException("Expected field name not found: " + fa.fname);
- }
- }
- else if (top == Symbol.FIELD_END)
- {
- if (currentReorderBuffer != null && currentReorderBuffer.origParser != null)
- {
- in = currentReorderBuffer.origParser;
- currentReorderBuffer.origParser = null;
- }
- }
- else if (top == Symbol.RECORD_START)
- {
- if (in.getCurrentToken() == JsonToken.START_OBJECT)
- {
- in.nextToken();
- reorderBuffers.push(currentReorderBuffer);
- currentReorderBuffer = null;
- }
- else
- {
- throw error("record-start");
- }
- }
- else if (top == Symbol.RECORD_END || top == Symbol.UNION_END)
- {
- if (in.getCurrentToken() == JsonToken.END_OBJECT)
- {
- in.nextToken();
- if (top == Symbol.RECORD_END)
- {
- if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty())
- {
- throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet());
- }
- currentReorderBuffer = reorderBuffers.pop();
- }
- }
- else
- {
- throw error(top == Symbol.RECORD_END ? "record-end" : "union-end");
- }
- }
- else
- {
- throw new AvroTypeException("Unknown action symbol " + top);
- }
- return null;
- }
-
- static class JsonElement
- {
- public final JsonToken token;
- public final String value;
-
- public JsonElement(JsonToken t, String value)
- {
- this.token = t;
- this.value = value;
- }
-
- public JsonElement(JsonToken t)
- {
- this(t, null);
- }
- }
-
- static List getVaueAsTree(JsonParser in) throws IOException
- {
- int level = 0;
- List result = new ArrayList();
- do
- {
- JsonToken t = in.getCurrentToken();
- switch (t)
- {
- case START_OBJECT:
- case START_ARRAY:
- level++;
- result.add(new JsonElement(t));
- break;
- case END_OBJECT:
- case END_ARRAY:
- level--;
- result.add(new JsonElement(t));
- break;
- case FIELD_NAME:
- case VALUE_STRING:
- case VALUE_NUMBER_INT:
- case VALUE_NUMBER_FLOAT:
- case VALUE_TRUE:
- case VALUE_FALSE:
- case VALUE_NULL:
- result.add(new JsonElement(t, in.getText()));
- break;
- }
- in.nextToken();
- }
- while (level != 0);
- result.add(new JsonElement(null));
- return result;
- }
-
- JsonParser makeParser(final List elements) throws IOException
- {
- return new JsonParser()
- {
- int pos = 0;
-
- @Override
- public JsonToken nextValue() throws IOException
- {
- throw new UnsupportedOperationException();
- };
-
- @Override
- public ObjectCodec getCodec()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setCodec(ObjectCodec c)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JsonToken nextToken() throws IOException
- {
- pos++;
- return elements.get(pos).token;
- }
-
- @Override
- public JsonParser skipChildren() throws IOException
- {
- int level = 0;
- do
- {
- switch (elements.get(pos++).token)
- {
- case START_ARRAY:
- case START_OBJECT:
- level++;
- break;
- case END_ARRAY:
- case END_OBJECT:
- level--;
- break;
- }
- }
- while (level > 0);
- return this;
- }
-
- @Override
- public boolean isClosed()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getCurrentName() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JsonStreamContext getParsingContext()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JsonLocation getTokenLocation()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public JsonLocation getCurrentLocation()
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getText() throws IOException
- {
- return elements.get(pos).value;
- }
-
- @Override
- public char[] getTextCharacters() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getTextLength() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getTextOffset() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean hasTextCharacters()
- {
- return false;
- }
-
- @Override
- public Number getNumberValue() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public NumberType getNumberType() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getIntValue() throws IOException
- {
- return Integer.parseInt(getText());
- }
-
- @Override
- public long getLongValue() throws IOException
- {
- return Long.parseLong(getText());
- }
-
- @Override
- public BigInteger getBigIntegerValue() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public float getFloatValue() throws IOException
- {
- return Float.parseFloat(getText());
- }
-
- @Override
- public double getDoubleValue() throws IOException
- {
- return Double.parseDouble(getText());
- }
-
- @Override
- public BigDecimal getDecimalValue() throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public byte[] getBinaryValue(Base64Variant b64variant)
- throws IOException
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getValueAsString(String s) throws IOException
- {
- return "";
- }
-
- @Override
- public JsonToken getCurrentToken()
- {
- return elements.get(pos).token;
- }
-
- @Override
- public int getCurrentTokenId()
- {
- return 0;
- }
-
- @Override
- public boolean hasCurrentToken()
- {
- return false;
- }
-
- @Override
- public boolean hasTokenId(int i)
- {
- return false;
- }
-
- @Override
- public boolean hasToken(JsonToken jsonToken)
- {
- return false;
- }
-
- @Override
- public void clearCurrentToken()
- {
-
- }
-
- @Override
- public JsonToken getLastClearedToken()
- {
- return null;
- }
-
- @Override
- public void overrideCurrentName(String s)
- {
-
- }
-
- @Override
- public Version version()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- AvroTypeException error(String type)
- {
- return new AvroTypeException("Expected " + type +
- ". Got " + in.getCurrentToken());
- }
-
-}
diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java
deleted file mode 100644
index 62a4e799e6..0000000000
--- a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Copyright 2021-2024 Aklivity Inc
- *
- * Licensed under the Aklivity Community License (the "License"); you may not use
- * this file except in compliance with the License. You may obtain a copy of the
- * License at
- *
- * https://www.aklivity.io/aklivity-community-license/
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.avro.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.BitSet;
-import java.util.Objects;
-
-import org.apache.avro.AvroTypeException;
-import org.apache.avro.Schema;
-import org.apache.avro.io.parsing.JsonGrammarGenerator;
-import org.apache.avro.io.parsing.Parser;
-import org.apache.avro.io.parsing.Symbol;
-import org.apache.avro.util.Utf8;
-
-import com.fasterxml.jackson.core.JsonEncoding;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
-
-/**
- * An {@link Encoder} for Avro's JSON data encoding.
- *
- * Construct using {@link EncoderFactory}.
- *
- * JsonEncoder buffers output, and data may not appear on the output until
- * {@link Encoder#flush()} is called.
- *
- * JsonEncoder is not thread-safe.
- */
-public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler
-{
- private static final String LINE_SEPARATOR = System.getProperty("line.separator");
- final Parser parser;
- private boolean includeNamespace = true;
- JsonGenerator out;
-
- /**
- * Has anything been written into the collections?
- */
- protected BitSet isEmpty = new BitSet();
-
- JsonEncoder(
- Schema sc,
- OutputStream out) throws IOException
- {
- this(sc, getJsonGenerator(out, false));
- }
-
- JsonEncoder(
- Schema sc,
- OutputStream out,
- boolean pretty) throws IOException
- {
- this(sc, getJsonGenerator(out, pretty));
- }
-
- JsonEncoder(Schema sc, JsonGenerator out) throws IOException
- {
- configure(out);
- this.parser = new Parser(new JsonGrammarGenerator().generate(sc), this);
- }
-
- @Override
- public void flush() throws IOException
- {
- parser.processImplicitActions();
- if (out != null)
- {
- out.flush();
- }
- }
-
- // by default, one object per line.
- // with pretty option use default pretty printer with root line separator.
- private static JsonGenerator getJsonGenerator(
- OutputStream out,
- boolean pretty) throws IOException
- {
- Objects.requireNonNull(out, "OutputStream cannot be null");
- JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
- if (pretty)
- {
- DefaultPrettyPrinter pp = new DefaultPrettyPrinter()
- {
- @Override
- public void writeRootValueSeparator(JsonGenerator jg) throws IOException
- {
- jg.writeRaw(LINE_SEPARATOR);
- }
- };
- g.setPrettyPrinter(pp);
- }
- else
- {
- MinimalPrettyPrinter pp = new MinimalPrettyPrinter();
- pp.setRootValueSeparator(LINE_SEPARATOR);
- g.setPrettyPrinter(pp);
- }
- return g;
- }
-
- public boolean isIncludeNamespace()
- {
- return includeNamespace;
- }
-
- public void setIncludeNamespace(
- final boolean includeNamespace)
- {
- this.includeNamespace = includeNamespace;
- }
-
- /**
- * Reconfigures this JsonEncoder to use the output stream provided.
- *
- * If the OutputStream provided is null, a NullPointerException is thrown.
- *
- * Otherwise, this JsonEncoder will flush its current output and then
- * reconfigure its output to use a default UTF8 JsonGenerator that writes to the
- * provided OutputStream.
- *
- * @param out The OutputStream to direct output to. Cannot be null.
- * @return this JsonEncoder
- * @throws IOException
- * @throws NullPointerException if {@code out} is {@code null}
- */
- public JsonEncoder configure(
- OutputStream out) throws IOException
- {
- this.configure(getJsonGenerator(out, false));
- return this;
- }
-
- /**
- * Reconfigures this JsonEncoder to output to the JsonGenerator provided.
- *
- * If the JsonGenerator provided is null, a NullPointerException is thrown.
- *
- * Otherwise, this JsonEncoder will flush its current output and then
- * reconfigure its output to use the provided JsonGenerator.
- *
- * @param generator The JsonGenerator to direct output to. Cannot be null.
- * @return this JsonEncoder
- * @throws IOException
- * @throws NullPointerException if {@code generator} is {@code null}
- */
- private JsonEncoder configure(
- JsonGenerator generator) throws IOException
- {
- Objects.requireNonNull(generator, "JsonGenerator cannot be null");
- if (null != parser)
- {
- flush();
- }
- this.out = generator;
- return this;
- }
-
- @Override
- public void writeNull() throws IOException
- {
- parser.advance(Symbol.NULL);
- out.writeNull();
- }
-
- @Override
- public void writeBoolean(
- boolean b) throws IOException
- {
- parser.advance(Symbol.BOOLEAN);
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(
- int n) throws IOException
- {
- parser.advance(Symbol.INT);
- out.writeNumber(n);
- }
-
- @Override
- public void writeLong(
- long n) throws IOException
- {
- parser.advance(Symbol.LONG);
- out.writeNumber(n);
- }
-
- @Override
- public void writeFloat(
- float f) throws IOException
- {
- parser.advance(Symbol.FLOAT);
- out.writeNumber(f);
- }
-
- @Override
- public void writeDouble(
- double d) throws IOException
- {
- parser.advance(Symbol.DOUBLE);
- out.writeNumber(d);
- }
-
- @Override
- public void writeString(
- Utf8 utf8) throws IOException
- {
- writeString(utf8.toString());
- }
-
- @Override
- public void writeString(
- String str) throws IOException
- {
- parser.advance(Symbol.STRING);
- if (parser.topSymbol() == Symbol.MAP_KEY_MARKER)
- {
- parser.advance(Symbol.MAP_KEY_MARKER);
- out.writeFieldName(str);
- }
- else
- {
- out.writeString(str);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException
- {
- if (bytes.hasArray())
- {
- writeBytes(bytes.array(), bytes.position(), bytes.remaining());
- }
- else
- {
- byte[] b = new byte[bytes.remaining()];
- bytes.duplicate().get(b);
- writeBytes(b);
- }
- }
-
- @Override
- public void writeBytes(
- byte[] bytes,
- int start,
- int len) throws IOException
- {
- parser.advance(Symbol.BYTES);
- writeByteArray(bytes, start, len);
- }
-
- private void writeByteArray(
- byte[] bytes,
- int start,
- int len) throws IOException
- {
- out.writeString(new String(bytes, start, len, StandardCharsets.ISO_8859_1));
- }
-
- @Override
- public void writeFixed(
- byte[] bytes,
- int start,
- int len) throws IOException
- {
- parser.advance(Symbol.FIXED);
- Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
- if (len != top.size)
- {
- throw new AvroTypeException(
- "Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.");
- }
- writeByteArray(bytes, start, len);
- }
-
- @Override
- public void writeEnum(
- int e) throws IOException
- {
- parser.advance(Symbol.ENUM);
- Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
- if (e < 0 || e >= top.size)
- {
- throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e);
- }
- out.writeString(top.getLabel(e));
- }
-
- @Override
- public void writeArrayStart() throws IOException
- {
- parser.advance(Symbol.ARRAY_START);
- out.writeStartArray();
- push();
- isEmpty.set(depth());
- }
-
- @Override
- public void writeArrayEnd() throws IOException
- {
- if (!isEmpty.get(pos))
- {
- parser.advance(Symbol.ITEM_END);
- }
- pop();
- parser.advance(Symbol.ARRAY_END);
- out.writeEndArray();
- }
-
- @Override
- public void writeMapStart() throws IOException
- {
- push();
- isEmpty.set(depth());
-
- parser.advance(Symbol.MAP_START);
- out.writeStartObject();
- }
-
- @Override
- public void writeMapEnd() throws IOException
- {
- if (!isEmpty.get(pos))
- {
- parser.advance(Symbol.ITEM_END);
- }
- pop();
-
- parser.advance(Symbol.MAP_END);
- out.writeEndObject();
- }
-
- @Override
- public void startItem() throws IOException
- {
- if (!isEmpty.get(pos))
- {
- parser.advance(Symbol.ITEM_END);
- }
- super.startItem();
- isEmpty.clear(depth());
- }
-
- @Override
- public void writeIndex(
- int unionIndex) throws IOException
- {
- parser.advance(Symbol.UNION);
- Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol();
- Symbol symbol = top.getSymbol(unionIndex);
- if (symbol != Symbol.NULL && includeNamespace)
- {
- out.writeStartObject();
- out.writeFieldName(top.getLabel(unionIndex));
- parser.pushSymbol(Symbol.UNION_END);
- }
- parser.pushSymbol(symbol);
- }
-
- @Override
- public Symbol doAction(Symbol input, Symbol top) throws IOException
- {
- if (top instanceof Symbol.FieldAdjustAction)
- {
- Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
- out.writeFieldName(fa.fname);
- }
- else if (top == Symbol.RECORD_START)
- {
- out.writeStartObject();
- }
- else if (top == Symbol.RECORD_END || top == Symbol.UNION_END)
- {
- out.writeEndObject();
- }
- else if (top != Symbol.FIELD_END)
- {
- throw new AvroTypeException("Unknown action symbol " + top);
- }
- return null;
- }
-}
diff --git a/runtime/model-avro/src/main/moditect/module-info.java b/runtime/model-avro/src/main/moditect/module-info.java
index 48c85cbf4a..d132b1200a 100644
--- a/runtime/model-avro/src/main/moditect/module-info.java
+++ b/runtime/model-avro/src/main/moditect/module-info.java
@@ -14,16 +14,12 @@
*/
module io.aklivity.zilla.runtime.model.avro
{
- requires com.fasterxml.jackson.core;
- requires com.fasterxml.jackson.databind;
requires io.aklivity.zilla.runtime.engine;
- requires org.slf4j;
+ requires io.aklivity.zilla.runtime.common.avro;
+ requires io.aklivity.zilla.runtime.common.json;
exports io.aklivity.zilla.runtime.model.avro.config;
- uses io.aklivity.zilla.runtime.model.avro.internal.avro.Conversion;
- uses io.aklivity.zilla.runtime.model.avro.internal.avro.LogicalTypes$LogicalTypeFactory;
-
provides io.aklivity.zilla.runtime.engine.config.ModelConfigAdapterSpi
with io.aklivity.zilla.runtime.model.avro.internal.config.AvroModelConfigAdapter;
diff --git a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
index e7cfb52f07..10df7331f1 100644
--- a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
+++ b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java
@@ -15,12 +15,14 @@
package io.aklivity.zilla.runtime.model.avro.internal;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Clock;
import org.agrona.DirectBuffer;
+import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Test;
@@ -424,4 +426,261 @@ public void shouldExtract()
};
converter.extracted(floatPath, floatVisitor);
}
+
+ @Test
+ public void shouldRoundTripComplexJson()
+ {
+ String schema = "{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"io.aklivity.example\"," +
+ "\"fields\":[" +
+ "{\"name\":\"id\",\"type\":\"long\"}," +
+ "{\"name\":\"name\",\"type\":\"string\"}," +
+ "{\"name\":\"active\",\"type\":\"boolean\"}," +
+ "{\"name\":\"score\",\"type\":\"double\"}," +
+ "{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}," +
+ "{\"name\":\"props\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}," +
+ "{\"name\":\"status\",\"type\":[\"null\",\"string\"]}," +
+ "{\"name\":\"meta\",\"type\":{\"type\":\"record\",\"name\":\"Meta\"," +
+ "\"fields\":[{\"name\":\"m\",\"type\":\"int\"}]}}]}";
+
+ TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
+ .namespace("test")
+ .name("test0")
+ .type("test")
+ .options(TestCatalogOptionsConfig::builder)
+ .id(9)
+ .schema(schema)
+ .build()
+ .build();
+ when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
+
+ AvroModelConfig writeModel = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ AvroModelConfig readModel = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+
+ AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context);
+ AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context);
+
+ // a large value forces the bounded output to suspend mid-datum, so the streamed output must equal
+ // the whole-buffer output
+ String name = "x".repeat(1000);
+ String json = "{\"id\":7,\"name\":\"" + name + "\",\"active\":true,\"score\":0.5," +
+ "\"tags\":[\"a\",\"b\"],\"props\":{\"k\":\"v\"},\"status\":\"ok\",\"meta\":{\"m\":3}}";
+
+ byte[] avro = convertToBytes(writer, json.getBytes());
+ assertNotNull(avro);
+
+ byte[] roundTrip = convertToBytes(reader, avro);
+ assertNotNull(roundTrip);
+ assertEquals(json, new String(roundTrip));
+ }
+
+ @Test
+ public void shouldExtractBinaryTypes()
+ {
+ String schema = "{\"type\":\"record\",\"name\":\"Obj\",\"fields\":[" +
+ "{\"name\":\"b\",\"type\":\"bytes\"}," +
+ "{\"name\":\"e\",\"type\":{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"A\",\"B\",\"C\"]}}," +
+ "{\"name\":\"f\",\"type\":\"boolean\"}," +
+ "{\"name\":\"x\",\"type\":{\"type\":\"fixed\",\"name\":\"F\",\"size\":2}}," +
+ "{\"name\":\"s\",\"type\":\"string\"}]}";
+
+ TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
+ .namespace("test")
+ .name("test0")
+ .type("test")
+ .options(TestCatalogOptionsConfig::builder)
+ .id(9)
+ .schema(schema)
+ .build()
+ .build();
+ when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
+
+ AvroModelConfig writeModel = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ AvroModelConfig readModel = AvroModelConfig.builder()
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+
+ AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context);
+ String json = "{\"b\":\"/wA=\",\"e\":\"C\",\"f\":true,\"x\":\"AQI=\",\"s\":\"hello\"}";
+ byte[] avro = convertToBytes(writer, json.getBytes());
+ assertNotNull(avro);
+
+ AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context);
+ reader.extract("$.b");
+ reader.extract("$.e");
+ reader.extract("$.f");
+ reader.extract("$.x");
+ reader.extract("$.s");
+
+ DirectBuffer data = new UnsafeBuffer(avro);
+ assertEquals(avro.length, reader.convert(0L, 0L, data, 0, avro.length, ValueConsumer.NOP));
+
+ reader.extracted("$.s", (buffer, index, length) ->
+ assertEquals("hello", buffer.getStringWithoutLengthUtf8(index, length)));
+ reader.extracted("$.e", (buffer, index, length) ->
+ assertEquals("2", buffer.getStringWithoutLengthUtf8(index, length)));
+ reader.extracted("$.f", (buffer, index, length) ->
+ assertEquals("true", buffer.getStringWithoutLengthUtf8(index, length)));
+ reader.extracted("$.b", (buffer, index, length) ->
+ {
+ assertEquals(2, length);
+ assertEquals((byte) 0xff, buffer.getByte(index));
+ assertEquals((byte) 0x00, buffer.getByte(index + 1));
+ });
+ reader.extracted("$.x", (buffer, index, length) ->
+ {
+ assertEquals(2, length);
+ assertEquals((byte) 0x01, buffer.getByte(index));
+ assertEquals((byte) 0x02, buffer.getByte(index + 1));
+ });
+ }
+
+ @Test
+ public void shouldRoundTripLargeArrayJson()
+ {
+ String schema = "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"R\"," +
+ "\"fields\":[{\"name\":\"valueWithAQuiteLongFieldName\",\"type\":\"int\"}]}}";
+
+ TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
+ .namespace("test")
+ .name("test0")
+ .type("test")
+ .options(TestCatalogOptionsConfig::builder)
+ .id(9)
+ .schema(schema)
+ .build()
+ .build();
+ when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
+
+ AvroModelConfig writeModel = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+ AvroModelConfig readModel = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+
+ // the repeated field name makes the JSON far larger than the Avro, exceeding the bounded output
+ // window so the decode drains across several feeds and must reassemble identically
+ StringBuilder builder = new StringBuilder("[");
+ for (int i = 0; i < 60; i++)
+ {
+ builder.append(i > 0 ? "," : "").append("{\"valueWithAQuiteLongFieldName\":").append(i).append("}");
+ }
+ builder.append("]");
+ String json = builder.toString();
+
+ AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context);
+ AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context);
+
+ byte[] avro = convertToBytes(writer, json.getBytes());
+ assertNotNull(avro);
+
+ byte[] roundTrip = convertToBytes(reader, avro);
+ assertNotNull(roundTrip);
+ assertEquals(json, new String(roundTrip));
+ }
+
+ @Test
+ public void shouldReadInvalidAvroExpectJson()
+ {
+ TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new)
+ .namespace("test")
+ .name("test0")
+ .type("test")
+ .options(TestCatalogOptionsConfig::builder)
+ .id(9)
+ .schema(SCHEMA)
+ .build()
+ .build();
+ AvroModelConfig model = AvroModelConfig.builder()
+ .view("json")
+ .catalog()
+ .name("test0")
+ .schema()
+ .strategy("topic")
+ .version("latest")
+ .subject("test-value")
+ .build()
+ .build()
+ .build();
+
+ when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options));
+ when(context.clock()).thenReturn(Clock.systemUTC());
+ when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class));
+ AvroReadConverterHandler converter = new AvroReadConverterHandler(config, model, context);
+
+ DirectBuffer data = new UnsafeBuffer();
+ byte[] bytes = {0x06, 0x69, 0x64, 0x30, 0x10};
+ data.wrap(bytes, 0, bytes.length);
+ assertEquals(-1, converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
+ }
+
+ private static byte[] convertToBytes(
+ ConverterHandler converter,
+ byte[] input)
+ {
+ DirectBuffer data = new UnsafeBuffer(input);
+ MutableDirectBuffer destination = new UnsafeBuffer(new byte[64 * 1024]);
+ int progress = converter.convert(0L, 0L, data, 0, input.length,
+ (buffer, index, length) -> destination.putBytes(0, buffer, index, length));
+ byte[] result = null;
+ if (progress > 0)
+ {
+ result = new byte[progress];
+ destination.getBytes(0, result);
+ }
+ return result;
+ }
}
diff --git a/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml b/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml
index 1178154944..4e0ce6ecd4 100644
--- a/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml
+++ b/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml
@@ -28,7 +28,7 @@ telemetry:
- qname: test:net0
id: model.avro.validation.failed
name: MODEL_AVRO_VALIDATION_FAILED
- message: A message payload failed validation.
+ message: A message payload failed validation. truncated datum
catalogs:
test0:
type: test
From ced9409755457a916d8dddc1136af0fd80822ed8 Mon Sep 17 00:00:00 2001
From: Claude
Date: Tue, 16 Jun 2026 23:36:01 +0000
Subject: [PATCH 2/4] ci: retrigger build after transient Docker Hub pull rate
limit
Co-Authored-By: Claude Opus 4.8
Claude-Session: https://claude.ai/code/session_012CLYLdgT4B97uSUndTsG6N
From 3fadce96cce1a0a60cad95bfee893f22fb53fdcf Mon Sep 17 00:00:00 2001
From: Claude
Date: Thu, 18 Jun 2026 00:24:33 +0000
Subject: [PATCH 3/4] fix(model-avro): adopt (offset, limit) streaming window
API
Adapt to the common-json/avro window API refactor (#1916), which changed
AvroParser.wrap and AvroPipeline.feed from (offset, length) to (offset, limit).
Pass index + length so the encoded-strategy path (and any framed payload at a
non-zero offset) presents the full window instead of truncating.
Co-Authored-By: Claude Opus 4.8
Claude-Session: https://claude.ai/code/session_012CLYLdgT4B97uSUndTsG6N
---
.../zilla/runtime/model/avro/internal/AvroModelHandler.java | 2 +-
.../runtime/model/avro/internal/AvroReadConverterHandler.java | 4 ++--
.../model/avro/internal/AvroWriteConverterHandler.java | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
index 95e31c1715..a227119f52 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java
@@ -116,7 +116,7 @@ protected final boolean validate(
if (parser != null)
{
parser.reset();
- parser.wrap(buffer, index, length, true);
+ parser.wrap(buffer, index, index + length, true);
walk(parser);
status = true;
}
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
index 13c0793525..7a2768059c 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java
@@ -194,7 +194,7 @@ private int deserializeRecord(
int accumulated = 0;
pipeline.generator.wrap(out, 0, limit);
pipeline.pipeline.reset();
- Status status = pipeline.pipeline.feed(data, index, length, true);
+ Status status = pipeline.pipeline.feed(data, index, index + length, true);
while (status == Status.SUSPENDED)
{
pipeline.json.flush();
@@ -202,7 +202,7 @@ private int deserializeRecord(
accumulator.putBytes(accumulated, out, 0, chunk);
accumulated += chunk;
pipeline.generator.wrap(out, 0, limit);
- status = pipeline.pipeline.feed(data, index, length, true);
+ status = pipeline.pipeline.feed(data, index, index + length, true);
}
if (status == Status.COMPLETED)
diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
index e6e2e31efc..ee7be64040 100644
--- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
+++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java
@@ -98,7 +98,7 @@ private int serializeJsonRecord(
// one-character number), so the bounded window admits the whole datum and the feed completes once
pipeline.generator.wrap(out, 0, outLimit(length));
pipeline.pipeline.reset();
- Status status = pipeline.pipeline.feed(data, index, length, true);
+ Status status = pipeline.pipeline.feed(data, index, index + length, true);
if (status == Status.COMPLETED)
{
int chunk = pipeline.generator.length();
From f25c48d89784c271a00237305d12fd93af70dd96 Mon Sep 17 00:00:00 2001
From: Claude
Date: Thu, 18 Jun 2026 02:01:22 +0000
Subject: [PATCH 4/4] ci: retrigger build after transient manager/ZpmInstall
network flake
Co-Authored-By: Claude Opus 4.8
Claude-Session: https://claude.ai/code/session_012CLYLdgT4B97uSUndTsG6N