diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java index 96ed25792c..09a30acfb4 100644 --- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java +++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonGeneratorImpl.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.common.avro.internal.json; import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.branchName; +import static io.aklivity.zilla.runtime.common.avro.internal.json.AvroJsonUnion.nullableSingle; import java.util.IdentityHashMap; import java.util.List; @@ -67,6 +68,7 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator private static final int RESERVE = 24; private final JsonGeneratorEx json; + private final boolean canonical; private final AvroType rootType; private final Map> fieldsByType; private final Map> branchesByType; @@ -85,8 +87,17 @@ public final class AvroJsonGeneratorImpl implements AvroGenerator public AvroJsonGeneratorImpl( AvroSchema schema, JsonGeneratorEx json) + { + this(schema, json, false); + } + + public AvroJsonGeneratorImpl( + AvroSchema schema, + JsonGeneratorEx json, + boolean canonical) { this.json = json; + this.canonical = canonical; this.rootType = schema.type(); this.fieldsByType = new IdentityHashMap<>(); this.branchesByType = new IdentityHashMap<>(); @@ -180,8 +191,9 @@ public void writeIndex( int index) { value(); - AvroType branch = branches(valueType).get(index); - boolean wrapped = branch.kind() != AvroKind.NULL; + List branches = branches(valueType); + AvroType branch = branches.get(index); + boolean wrapped = branch.kind() != AvroKind.NULL && !(canonical && nullableSingle(branches)); if (wrapped) { json.writeStartObject(); diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java index e62ca1acc1..7ec443cb6c 100644 --- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java +++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonParserImpl.java @@ -65,6 +65,7 @@ public final class AvroJsonParserImpl implements AvroParser private static final int DONE = 2; private final JsonParserEx json; + private final boolean canonical; private final AvroType rootType; private final UnsafeBuffer segment; private final UnsafeBuffer segmentView; @@ -109,8 +110,17 @@ public long getStreamOffset() public AvroJsonParserImpl( AvroSchema schema, JsonParserEx json) + { + this(schema, json, false); + } + + public AvroJsonParserImpl( + AvroSchema schema, + JsonParserEx json, + boolean canonical) { this.json = json; + this.canonical = canonical; this.rootType = schema.type(); this.segment = new UnsafeBuffer(0, 0); this.segmentView = new UnsafeBuffer(0, 0); @@ -438,6 +448,14 @@ private AvroEvent stepUnion( } event = selectBranch(type, index, false); } + else if (canonical && AvroJsonUnion.nullableSingle(branches)) + { + if (next != null) + { + int index = AvroJsonUnion.nullBranchIndex(branches) ^ 1; + event = selectBranch(type, index, false); + } + } else if (next == JsonEvent.START_OBJECT) { consume(); diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java index bdf31bcc4f..824969d43c 100644 --- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java +++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/internal/json/AvroJsonUnion.java @@ -119,4 +119,15 @@ static int nullBranchIndex( } return index; } + + /** + * {@code true} when {@code branches} is a union of {@code null} and exactly one other type — the + * nullable-single shape whose non-null branch the canonical JSON encoding represents as a bare value + * rather than the {@code {"": value}} wrapper. + */ + static boolean nullableSingle( + List branches) + { + return branches.size() == 2 && nullBranchIndex(branches) >= 0; + } } diff --git a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java index 8306d4aa25..a611359d40 100644 --- a/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java +++ b/runtime/common-avro/src/main/java/io/aklivity/zilla/runtime/common/avro/json/AvroJson.java @@ -73,6 +73,20 @@ public static AvroParser parser( return new AvroJsonParserImpl(schema, parser); } + /** + * Variant of {@link #parser(AvroSchema, JsonParserEx)} that, when {@code canonical} is {@code true}, + * reads a nullable-single union (a union of {@code null} and exactly one other type) from a bare JSON + * value — {@code null} or the unwrapped value — rather than the {@code {"": value}} wrapper. + * Every other union shape is unchanged. + */ + public static AvroParser parser( + AvroSchema schema, + JsonParserEx parser, + boolean canonical) + { + return new AvroJsonParserImpl(schema, parser, canonical); + } + /** * Begins a JSON → Avro push pipeline driven by {@code parser}: the returned {@link AvroStream} * walks {@code schema} in lockstep with the JSON pull events and emits the Avro event stream. Append @@ -86,6 +100,18 @@ public static AvroStream stream( return Avro.stream(parser(schema, parser)); } + /** + * Variant of {@link #stream(AvroSchema, JsonParserEx)} with the canonical nullable-single union reading + * of {@link #parser(AvroSchema, JsonParserEx, boolean)}. + */ + public static AvroStream stream( + AvroSchema schema, + JsonParserEx parser, + boolean canonical) + { + return Avro.stream(parser(schema, parser, canonical)); + } + /** * Returns a schema-bound Avro → JSON {@link AvroGenerator} that maps each positional Avro write * onto {@code generator}, applying the documented type mapping. Wrap it over the target buffer via @@ -98,4 +124,18 @@ public static AvroGenerator generator( { return new AvroJsonGeneratorImpl(schema, generator); } + + /** + * Variant of {@link #generator(AvroSchema, JsonGeneratorEx)} that, when {@code canonical} is + * {@code true}, writes a nullable-single union (a union of {@code null} and exactly one other type) as a + * bare JSON value — {@code null} or the unwrapped value — rather than the {@code {"": value}} + * wrapper. Every other union shape is unchanged. + */ + public static AvroGenerator generator( + AvroSchema schema, + JsonGeneratorEx generator, + boolean canonical) + { + return new AvroJsonGeneratorImpl(schema, generator, canonical); + } } diff --git a/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java b/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java index 02149d578f..241a8012d5 100644 --- a/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java +++ b/runtime/common-avro/src/test/java/io/aklivity/zilla/runtime/common/avro/json/AvroJsonTest.java @@ -225,6 +225,86 @@ public void shouldRejectUnionWithoutNullBranch() assertRejected("[\"int\",\"string\"]", "null"); } + @Test + public void shouldEncodeCanonicalNullableUnionValue() + { + assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x02, 0x02, 0x78 }, "\"x\""); + } + + @Test + public void shouldEncodeCanonicalNullableUnionNull() + { + assertCanonicalJson("[\"null\",\"string\"]", new byte[] { 0x00 }, "null"); + } + + @Test + public void shouldEncodeCanonicalNullableUnionInRecord() + { + assertCanonicalJson(""" + {"type":"record","name":"Event","fields":[ + {"name":"id","type":"string"}, + {"name":"status","type":["null","string"]}]}""", + new byte[] { 0x06, 0x69, 0x64, 0x30, 0x02, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65 }, + "{\"id\":\"id0\",\"status\":\"positive\"}"); + } + + @Test + public void shouldWrapNonNullableSingleUnionInCanonicalMode() + { + // a union with more than one non-null branch keeps the wrapped encoding even in canonical mode + assertCanonicalJson("[\"null\",\"int\",\"string\"]", new byte[] { 0x04, 0x02, 0x78 }, "{\"string\":\"x\"}"); + } + + private void assertCanonicalJson( + String schemaText, + byte[] binary, + String expectedJson) + { + AvroSchema schema = Avro.schema(schemaText); + assertEquals(expectedJson, avroToJsonCanonical(schema, binary)); + assertArrayEquals(binary, jsonToAvroCanonical(schema, expectedJson)); + } + + private static String avroToJsonCanonical( + AvroSchema schema, + byte[] binary) + { + MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, binary.length * 8)]); + JsonGeneratorEx json = JsonEx.createGenerator(); + AvroGenerator generator = AvroJson.generator(schema, json, true).wrap(out, 0, out.capacity()); + AvroPipeline pipeline = Avro.stream(Avro.parser(schema)).into(AvroSink.of(generator)); + pipeline.reset(); + Status status = pipeline.feed(new UnsafeBuffer(binary), 0, binary.length); + if (status != Status.COMPLETED) + { + throw new AssertionError("avro -> json did not complete: " + status); + } + json.flush(); + byte[] bytes = new byte[json.length()]; + out.getBytes(0, bytes); + return new String(bytes, UTF_8); + } + + private static byte[] jsonToAvroCanonical( + AvroSchema schema, + String json) + { + byte[] jsonBytes = json.getBytes(UTF_8); + MutableDirectBuffer out = new UnsafeBuffer(new byte[Math.max(256, jsonBytes.length * 4)]); + AvroGenerator generator = Avro.generator(schema, out, 0); + JsonParserEx parser = JsonEx.createParser(); + AvroPipeline pipeline = AvroJson.stream(schema, parser, true).into(AvroSink.of(generator)); + pipeline.reset(); + Status status = pipeline.feed(new UnsafeBuffer(jsonBytes), 0, jsonBytes.length); + if (status != Status.COMPLETED) + { + throw new AssertionError("json -> avro did not complete: " + status); + } + byte[] avro = new byte[generator.length()]; + out.getBytes(0, avro); + return avro; + } + private void assertRejected( String schemaText, String json) diff --git a/runtime/model-avro/NOTICE b/runtime/model-avro/NOTICE index 4e7f359987..b607a04633 100644 --- a/runtime/model-avro/NOTICE +++ b/runtime/model-avro/NOTICE @@ -10,13 +10,8 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. This project includes: - Apache Avro under Apache-2.0 - Apache Commons Codec under Apache-2.0 - Apache Commons Compress under Apache-2.0 - Apache Commons IO under Apache License, Version 2.0 - Apache Commons Lang under Apache License, Version 2.0 - Jackson-annotations under The Apache Software License, Version 2.0 - Jackson-core under The Apache Software License, Version 2.0 - jackson-databind under The Apache Software License, Version 2.0 - SLF4J API Module under MIT + agrona under The Apache License, Version 2.0 + Jakarta JSON Processing API under Eclipse Public License 2.0 or GNU General Public License, version 2 with the GNU Classpath Exception + zilla::runtime::common-avro under Aklivity Community License Agreement + zilla::runtime::common-json under Aklivity Community License Agreement diff --git a/runtime/model-avro/pom.xml b/runtime/model-avro/pom.xml index 81aef3a62d..95387f0b32 100644 --- a/runtime/model-avro/pom.xml +++ b/runtime/model-avro/pom.xml @@ -43,22 +43,20 @@ ${project.groupId} - engine - test-jar + common-avro ${project.version} - test - org.apache.avro - avro - - - com.fasterxml.jackson.core - jackson-core + ${project.groupId} + common-json + ${project.version} - com.fasterxml.jackson.core - jackson-databind + ${project.groupId} + engine + test-jar + ${project.version} + test org.mockito @@ -146,34 +144,6 @@ - - org.apache.maven.plugins - maven-shade-plugin - - - package - - shade - - - - - org.apache.avro:avro - - - - - org.apache.avro - io.aklivity.zilla.runtime.model.avro.internal.avro - - - true - true - true - - - - org.moditect moditect-maven-plugin @@ -199,7 +169,6 @@ io/aklivity/zilla/runtime/model/avro/internal/types/**/*.class - org/apache/avro/**/*.class diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java index 4a5d6ebac4..2b75c097ab 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroField.java @@ -19,14 +19,27 @@ import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW; -public class AvroField +public final class AvroField { + private static final int INITIAL_CAPACITY = 24; + public final OctetsFW value; - public final MutableDirectBuffer buffer; + + private MutableDirectBuffer buffer; public AvroField() { this.value = new OctetsFW(); - this.buffer = new UnsafeBuffer(new byte[24]); + this.buffer = new UnsafeBuffer(new byte[INITIAL_CAPACITY]); + } + + public MutableDirectBuffer buffer( + int capacity) + { + if (capacity > buffer.capacity()) + { + buffer = new UnsafeBuffer(new byte[Math.max(buffer.capacity() * 2, capacity)]); + } + return buffer; } } diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java index 75bf85c98b..a227119f52 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java @@ -14,13 +14,8 @@ */ package io.aklivity.zilla.runtime.model.avro.internal; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.agrona.DirectBuffer; @@ -28,81 +23,54 @@ import org.agrona.MutableDirectBuffer; import org.agrona.collections.Int2IntHashMap; import org.agrona.collections.Int2ObjectCache; -import org.agrona.io.DirectBufferInputStream; -import org.agrona.io.ExpandableDirectBufferOutputStream; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; +import io.aklivity.zilla.runtime.common.avro.Avro; +import io.aklivity.zilla.runtime.common.avro.AvroEvent; +import io.aklivity.zilla.runtime.common.avro.AvroKind; +import io.aklivity.zilla.runtime.common.avro.AvroParser; +import io.aklivity.zilla.runtime.common.avro.AvroSchema; +import io.aklivity.zilla.runtime.common.avro.AvroType; +import io.aklivity.zilla.runtime.common.avro.AvroValidationException; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.CatalogedConfig; import io.aklivity.zilla.runtime.engine.config.SchemaConfig; import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBooleanFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBytesFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroDoubleFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroFloatFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroIntFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroLongFW; -import io.aklivity.zilla.runtime.model.avro.internal.types.AvroUnionFW; import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW; public abstract class AvroModelHandler { protected static final String VIEW_JSON = "json"; - private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); - private static final OutputStream EMPTY_OUTPUT_STREAM = new ByteArrayOutputStream(0); + // headroom so the bounded output window admits any single scalar value (worst-case JSON string escaping is + // ~6x, base64 ~1.34x, a double is 8 bytes); a datum whose total exceeds the window drains in chunks + private static final int OUT_SCALE = 8; + private static final int OUT_SLACK = 1024; + private static final int JSON_FIELD_STRUCTURE_LENGTH = "\"\":\"\",".length(); private static final int JSON_FIELD_UNION_LENGTH = "\"\":{\"DATA_TYPE\":\"\"},".length(); - private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[]," .length(); + private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[],".length(); private static final int JSON_FIELD_MAP_LENGTH = "\"\":{},".length(); protected final SchemaConfig catalog; protected final CatalogHandler handler; - protected final DecoderFactory decoderFactory; - protected final EncoderFactory encoderFactory; - protected final BinaryDecoder decoder; - protected final BinaryEncoder encoder; protected final String subject; protected final String view; - protected final ExpandableDirectBufferOutputStream expandable; - protected final DirectBufferInputStream in; protected final AvroModelEventContext event; protected final Map extracted; + protected final MutableDirectBuffer out; + protected final MutableDirectBuffer accumulator; - private final Int2ObjectCache schemas; - private final Int2ObjectCache> readers; - private final Int2ObjectCache> writers; - private final Int2ObjectCache records; + private final Int2ObjectCache schemas; + private final Int2ObjectCache parsers; private final Int2IntHashMap paddings; - private final AvroBytesFW bytesRO; - private final AvroIntFW intRO; - private final AvroLongFW longRO; - private final AvroFloatFW floatRO; - private final AvroDoubleFW doubleRO; - private final AvroUnionFW unionRO; private final int paddingMaxItems; - protected int progress; - protected AvroModelHandler( AvroModelConfiguration config, AvroModelConfig options, EngineContext context) { - this.decoderFactory = DecoderFactory.get(); - this.decoder = decoderFactory.binaryDecoder(EMPTY_INPUT_STREAM, null); - this.encoderFactory = EncoderFactory.get(); - this.encoder = encoderFactory.binaryEncoder(EMPTY_OUTPUT_STREAM, null); CatalogedConfig cataloged = options.cataloged.get(0); this.handler = context.supplyCatalog(cataloged.id); this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null; @@ -111,23 +79,28 @@ protected AvroModelHandler( ? catalog.subject : options.subject; this.schemas = new Int2ObjectCache<>(1, 1024, i -> {}); - this.readers = new Int2ObjectCache<>(1, 1024, i -> {}); - this.writers = new Int2ObjectCache<>(1, 1024, i -> {}); - this.records = new Int2ObjectCache<>(1, 1024, i -> {}); + this.parsers = new Int2ObjectCache<>(1, 1024, i -> {}); this.paddings = new Int2IntHashMap(-1); - this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer()); - this.in = new DirectBufferInputStream(); this.event = new AvroModelEventContext(context); this.extracted = new HashMap<>(); - this.bytesRO = new AvroBytesFW(); - this.intRO = new AvroIntFW(); - this.longRO = new AvroLongFW(); - this.floatRO = new AvroFloatFW(); - this.doubleRO = new AvroDoubleFW(); - this.unionRO = new AvroUnionFW(); + this.out = new ExpandableDirectByteBuffer(); + this.accumulator = new ExpandableDirectByteBuffer(); this.paddingMaxItems = config.paddingMaxItems(); } + // the bounded output window for a datum decoded/encoded from a payload of the given length, grown so any + // single scalar value fits; a total exceeding the window drains across successive feeds + protected final int outLimit( + int length) + { + int limit = length * OUT_SCALE + OUT_SLACK; + if (out.capacity() < limit) + { + out.putByte(limit - 1, (byte) 0); + } + return limit; + } + protected final boolean validate( long traceId, long bindingId, @@ -139,51 +112,23 @@ protected final boolean validate( boolean status = false; try { - Schema schema = supplySchema(schemaId); - if (schema != null) + AvroParser parser = supplyParser(schemaId); + if (parser != null) { - switch (schema.getType()) - { - case STRING: - status = true; - break; - case RECORD: - GenericRecord record = supplyRecord(schemaId); - in.wrap(buffer, index, length); - GenericDatumReader reader = supplyReader(schemaId); - if (reader != null) - { - decoderFactory.binaryDecoder(in, decoder); - reader.read(record, decoder); - status = true; - } - progress = index; - extractFields(buffer, index + length, schema); - break; - default: - break; - } + parser.reset(); + parser.wrap(buffer, index, index + length, true); + walk(parser); + status = true; } } - catch (IOException | AvroRuntimeException ex) + catch (AvroValidationException ex) { event.validationFailure(traceId, bindingId, ex.getMessage()); } return status; } - protected void extractFields( - DirectBuffer buffer, - int limit, - Schema schema) - { - for (Schema.Field field : schema.getFields()) - { - extract(field.schema(), buffer, limit, extracted.get(field.name())); - } - } - - protected final Schema supplySchema( + protected final AvroSchema supplySchema( int schemaId) { return schemas.computeIfAbsent(schemaId, this::resolveSchema); @@ -192,247 +137,180 @@ protected final Schema supplySchema( protected final int supplyPadding( int schemaId) { - return paddings.computeIfAbsent(schemaId, id -> calculatePadding(supplySchema(id))); - } - - protected final GenericDatumReader supplyReader( - int schemaId) - { - return readers.computeIfAbsent(schemaId, this::createReader); - } - - protected final GenericDatumWriter supplyWriter( - int schemaId) - { - return writers.computeIfAbsent(schemaId, this::createWriter); + return paddings.computeIfAbsent(schemaId, id -> + { + AvroSchema schema = supplySchema(id); + return calculatePadding(schema != null ? schema.type() : null); + }); } - protected final GenericRecord supplyRecord( + private AvroParser supplyParser( int schemaId) { - return records.computeIfAbsent(schemaId, this::createRecord); + return parsers.computeIfAbsent(schemaId, id -> + { + AvroSchema schema = supplySchema(id); + return schema != null ? Avro.parser(schema) : null; + }); } - private GenericDatumReader createReader( + private AvroSchema resolveSchema( int schemaId) { - Schema schema = supplySchema(schemaId); - GenericDatumReader reader = null; - if (schema != null) + AvroSchema schema = null; + String schemaText = handler.resolve(schemaId); + if (schemaText != null) { - reader = new GenericDatumReader(schema); + schema = Avro.schema(schemaText); } - return reader; + return schema; } - private GenericDatumWriter createWriter( - int schemaId) + private void walk( + AvroParser parser) { - Schema schema = supplySchema(schemaId); - GenericDatumWriter writer = null; - if (schema != null) + boolean extracting = !extracted.isEmpty(); + int depth = 0; + AvroField current = null; + while (parser.hasNext()) { - writer = new GenericDatumWriter(schema); + AvroEvent next = parser.nextEvent(); + if (next == null) + { + break; + } + switch (next) + { + case START_RECORD: + case START_ARRAY: + case START_MAP: + depth++; + current = null; + break; + case END_RECORD: + case END_ARRAY: + case END_MAP: + depth--; + current = null; + break; + case FIELD_NAME: + current = extracting && depth == 1 ? extracted.get(parser.getField()) : null; + break; + case UNION_BRANCH: + case MAP_KEY: + case START_MESSAGE: + case END_MESSAGE: + break; + default: + if (current != null) + { + writeExtract(current, next, parser); + } + current = null; + break; + } } - return writer; } - private GenericRecord createRecord( - int schemaId) - { - Schema schema = supplySchema(schemaId); - return schema != null && schema.getType() == Schema.Type.RECORD - ? new GenericData.Record(schema) - : null; - } - - private Schema resolveSchema( - int schemaId) + private void writeExtract( + AvroField field, + AvroEvent next, + AvroParser parser) { - Schema schema = null; - String schemaText = handler.resolve(schemaId); - if (schemaText != null) + OctetsFW value = field.value; + switch (next) + { + case STRING: + case BYTES: + case FIXED: { - schema = new Schema.Parser().parse(schemaText); + DirectBuffer segment = parser.getSegment(); + int length = segment.capacity(); + MutableDirectBuffer buffer = field.buffer(length); + buffer.putBytes(0, segment, 0, length); + value.wrap(buffer, 0, length); + break; + } + case ENUM: + case INT: + { + MutableDirectBuffer buffer = field.buffer(32); + int length = buffer.putIntAscii(0, parser.getInt()); + value.wrap(buffer, 0, length); + break; + } + case LONG: + { + MutableDirectBuffer buffer = field.buffer(32); + int length = buffer.putLongAscii(0, parser.getLong()); + value.wrap(buffer, 0, length); + break; + } + case FLOAT: + { + String text = String.valueOf(parser.getFloat()); + MutableDirectBuffer buffer = field.buffer(text.length()); + int length = buffer.putStringWithoutLengthAscii(0, text); + value.wrap(buffer, 0, length); + break; + } + case DOUBLE: + { + String text = String.valueOf(parser.getDouble()); + MutableDirectBuffer buffer = field.buffer(text.length()); + int length = buffer.putStringWithoutLengthAscii(0, text); + value.wrap(buffer, 0, length); + break; + } + case BOOLEAN: + { + String text = String.valueOf(parser.getBoolean()); + MutableDirectBuffer buffer = field.buffer(text.length()); + int length = buffer.putStringWithoutLengthAscii(0, text); + value.wrap(buffer, 0, length); + break; + } + default: + break; } - return schema; } private int calculatePadding( - Schema schema) + AvroType type) { int padding = 0; - if (schema != null) + if (type != null) { padding = 10; - if (schema.getType().equals(Schema.Type.RECORD)) + if (type.kind() == AvroKind.RECORD) { - for (Schema.Field field : schema.getFields()) + for (io.aklivity.zilla.runtime.common.avro.AvroField field : type.fields()) { - padding += field.name().getBytes().length; + padding += field.name().getBytes(StandardCharsets.UTF_8).length; - switch (field.schema().getType()) + AvroType fieldType = field.type(); + switch (fieldType.kind()) { case RECORD: - { - padding += calculatePadding(field.schema()); + padding += calculatePadding(fieldType); break; - } case UNION: - { padding += JSON_FIELD_UNION_LENGTH; break; - } case MAP: - { - padding += JSON_FIELD_MAP_LENGTH + paddingMaxItems + - calculatePadding(field.schema().getValueType()); + padding += JSON_FIELD_MAP_LENGTH + paddingMaxItems + calculatePadding(fieldType.values()); break; - } case ARRAY: - { - padding += JSON_FIELD_ARRAY_LENGTH + paddingMaxItems + - calculatePadding(field.schema().getElementType()); + padding += JSON_FIELD_ARRAY_LENGTH + paddingMaxItems + calculatePadding(fieldType.items()); break; - } default: - { padding += JSON_FIELD_STRUCTURE_LENGTH; break; } - } } } } return padding; } - - private void extract( - Schema schema, - DirectBuffer data, - int limit, - AvroField field) - { - switch (schema.getType()) - { - case RECORD: - extractFields(data, limit, schema); - break; - case BYTES: - case STRING: - AvroBytesFW bytes = bytesRO.wrap(data, progress, limit); - OctetsFW value = bytes.value(); - progress = bytes.limit(); - if (field != null) - { - OctetsFW octets = field.value; - octets.wrap(value.buffer(), value.offset(), value.limit()); - } - break; - case ENUM: - case INT: - AvroIntFW int32 = intRO.wrap(data, progress, limit); - int intValue = int32.value(); - progress = int32.limit(); - if (field != null) - { - MutableDirectBuffer text = field.buffer; - int length = text.putIntAscii(0, intValue); - field.value.wrap(text, 0, length); - } - break; - case FLOAT: - AvroFloatFW avroFloat = floatRO.wrap(data, progress, limit); - int len = 0; - DirectBuffer buffer = avroFloat.value().value(); - float floatValue = Float.intBitsToFloat(decodeNumberBytes(len, buffer)); - progress = avroFloat.limit(); - if (field != null) - { - MutableDirectBuffer text = field.buffer; - int length = text.putStringWithoutLengthAscii(0, String.valueOf(floatValue)); - field.value.wrap(text, 0, length); - } - break; - case LONG: - AvroLongFW avroLong = longRO.wrap(data, progress, limit); - long longValue = avroLong.value(); - progress = avroLong.limit(); - if (field != null) - { - MutableDirectBuffer text = field.buffer; - int length = text.putLongAscii(0, longValue); - field.value.wrap(text, 0, length); - } - break; - case DOUBLE: - AvroDoubleFW avroDouble = doubleRO.wrap(data, progress, limit); - len = 0; - buffer = avroDouble.value().value(); - int decoded = (buffer.getByte(len++) & 0xff) | - ((buffer.getByte(len++) & 0xff) << 8) | - ((buffer.getByte(len++) & 0xff) << 16) | - ((buffer.getByte(len++) & 0xff) << 24); - - double doubleValue = Double.longBitsToDouble((((long) decoded) & 0xffffffffL) | - (((long) decodeNumberBytes(len, buffer)) << 32)); - progress = avroDouble.limit(); - if (field != null) - { - MutableDirectBuffer text = field.buffer; - int length = text.putStringWithoutLengthAscii(0, String.valueOf(doubleValue)); - field.value.wrap(text, 0, length); - } - break; - case BOOLEAN: - AvroBooleanFW avroBoolean = new AvroBooleanFW().wrap(data, progress, limit); - value = avroBoolean.value(); - progress = avroBoolean.limit(); - if (field != null) - { - field.value.wrap(value.buffer(), value.offset(), value.limit()); - } - break; - case FIXED: - int fixedSize = schema.getFixedSize(); - if (field != null) - { - field.value.wrap(data, progress, progress + fixedSize); - } - progress += fixedSize; - break; - case UNION: - List types = schema.getTypes(); - Integer nullIndex = schema.getIndexNamed("null"); - if (nullIndex != null && types.size() == 2) - { - AvroUnionFW avroUnion = unionRO.wrap(data, progress, limit); - int index = avroUnion.index(); - - if (index != nullIndex) - { - progress = avroUnion.limit(); - - int nonNullIndex = nullIndex ^ 1; - Schema nonNull = types.get(nonNullIndex); - - extract(nonNull, data, limit, field); - } - } - break; - default: - break; - } - } - - private static int decodeNumberBytes( - int len, - DirectBuffer buffer) - { - return (buffer.getByte(len++) & 0xff) | - ((buffer.getByte(len++) & 0xff) << 8) | - ((buffer.getByte(len++) & 0xff) << 16) | - ((buffer.getByte(len) & 0xff) << 24); - } } diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java index 62acccdf63..7a2768059c 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java @@ -16,20 +16,22 @@ import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID; -import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.agrona.DirectBuffer; +import org.agrona.collections.Int2ObjectCache; import org.agrona.concurrent.UnsafeBuffer; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.CanonicalJsonEncoder; -import org.apache.avro.io.JsonEncoder; +import io.aklivity.zilla.runtime.common.avro.Avro; +import io.aklivity.zilla.runtime.common.avro.AvroGenerator; +import io.aklivity.zilla.runtime.common.avro.AvroPipeline; +import io.aklivity.zilla.runtime.common.avro.AvroPipeline.Status; +import io.aklivity.zilla.runtime.common.avro.AvroSchema; +import io.aklivity.zilla.runtime.common.avro.AvroSink; +import io.aklivity.zilla.runtime.common.avro.json.AvroJson; +import io.aklivity.zilla.runtime.common.json.JsonEx; +import io.aklivity.zilla.runtime.common.json.JsonGeneratorEx; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.model.ConverterHandler; import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; @@ -43,6 +45,7 @@ public class AvroReadConverterHandler extends AvroModelHandler implements Conver private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer(); private final Matcher matcher; + private final Int2ObjectCache pipelines; public AvroReadConverterHandler( AvroModelConfiguration config, @@ -51,6 +54,7 @@ public AvroReadConverterHandler( { super(config, options, context); this.matcher = PATH_PATTERN.matcher(""); + this.pipelines = new Int2ObjectCache<>(1, 1024, i -> {}); } @Override @@ -99,7 +103,7 @@ public int convert( int length, ValueConsumer next) { - for (AvroField field: extracted.values()) + for (AvroField field : extracted.values()) { field.value.wrap(EMPTY_BUFFER, 0, 0); } @@ -158,12 +162,10 @@ private int decodePayload( if (VIEW_JSON.equals(view)) { - deserializeRecord(traceId, bindingId, schemaId, data, index, length); - int recordLength = expandable.position(); - if (recordLength > 0) + valLength = deserializeRecord(traceId, bindingId, schemaId, data, index, length, next); + if (valLength != -1 && !extracted.isEmpty()) { - next.accept(expandable.buffer(), 0, recordLength); - valLength = recordLength; + validate(traceId, bindingId, schemaId, data, index, length); } } else if (validate(traceId, bindingId, schemaId, data, index, length)) @@ -174,36 +176,95 @@ else if (validate(traceId, bindingId, schemaId, data, index, length)) return valLength; } - private void deserializeRecord( + private int deserializeRecord( long traceId, long bindingId, int schemaId, - DirectBuffer buffer, + DirectBuffer data, int index, - int length) + int length, + ValueConsumer next) { - try + int valLength = -1; + + AvroToJson pipeline = supplyPipeline(schemaId); + if (pipeline != null) { - GenericDatumReader reader = supplyReader(schemaId); - GenericDatumWriter writer = supplyWriter(schemaId); - if (reader != null) + int limit = outLimit(length); + int accumulated = 0; + pipeline.generator.wrap(out, 0, limit); + pipeline.pipeline.reset(); + Status status = pipeline.pipeline.feed(data, index, index + length, true); + while (status == Status.SUSPENDED) + { + pipeline.json.flush(); + int chunk = pipeline.json.length(); + accumulator.putBytes(accumulated, out, 0, chunk); + accumulated += chunk; + pipeline.generator.wrap(out, 0, limit); + status = pipeline.pipeline.feed(data, index, index + length, true); + } + + if (status == Status.COMPLETED) + { + pipeline.json.flush(); + int chunk = pipeline.json.length(); + if (accumulated == 0) + { + next.accept(out, 0, chunk); + valLength = chunk; + } + else + { + accumulator.putBytes(accumulated, out, 0, chunk); + accumulated += chunk; + next.accept(accumulator, 0, accumulated); + valLength = accumulated; + } + } + else { - GenericRecord record = supplyRecord(schemaId); - in.wrap(buffer, index, length); - expandable.wrap(expandable.buffer()); - record = reader.read(record, decoderFactory.binaryDecoder(in, decoder)); - Schema schema = record.getSchema(); - JsonEncoder out = new CanonicalJsonEncoder(schema, expandable); - writer.write(record, out); - out.flush(); - - progress = index; - extractFields(buffer, index + length, schema); + event.validationFailure(traceId, bindingId, "Invalid Avro encoding"); } } - catch (IOException | AvroRuntimeException ex) + return valLength; + } + + private AvroToJson supplyPipeline( + int schemaId) + { + return pipelines.computeIfAbsent(schemaId, this::newPipeline); + } + + private AvroToJson newPipeline( + int schemaId) + { + AvroToJson pipeline = null; + AvroSchema schema = supplySchema(schemaId); + if (schema != null) + { + JsonGeneratorEx json = JsonEx.createGenerator(); + AvroGenerator generator = AvroJson.generator(schema, json, true); + AvroPipeline avro = Avro.stream(Avro.parser(schema)).into(AvroSink.of(generator)); + pipeline = new AvroToJson(avro, json, generator); + } + return pipeline; + } + + private static final class AvroToJson + { + private final AvroPipeline pipeline; + private final JsonGeneratorEx json; + private final AvroGenerator generator; + + private AvroToJson( + AvroPipeline pipeline, + JsonGeneratorEx json, + AvroGenerator generator) { - event.validationFailure(traceId, bindingId, ex.getMessage()); + this.pipeline = pipeline; + this.json = json; + this.generator = generator; } } } diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java index c94f649ec5..eace94458f 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroValidatorHandler.java @@ -14,15 +14,8 @@ */ package io.aklivity.zilla.runtime.model.avro.internal; -import java.io.IOException; -import java.io.InputStream; - import org.agrona.DirectBuffer; import org.agrona.ExpandableDirectByteBuffer; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.model.ValidatorHandler; @@ -33,6 +26,8 @@ public class AvroValidatorHandler extends AvroModelHandler implements ValidatorH { private final ExpandableDirectByteBuffer buffer; + private int progress; + public AvroValidatorHandler( AvroModelConfiguration config, AvroModelConfig options, @@ -72,13 +67,11 @@ public boolean validate( } else { - in.wrap(buffer, 0, progress); - int schemaId = catalog != null && catalog.id > 0 ? catalog.id : handler.resolve(subject, catalog.version); - status = validate(traceId, bindingId, schemaId, in); + status = validate(traceId, bindingId, schemaId, buffer, 0, progress); } } } @@ -100,36 +93,6 @@ private boolean validatePayload( int length, ValueConsumer next) { - in.wrap(data, index, length); - return validate(traceId, bindingId, schemaId, in); - } - - private boolean validate( - long traceId, - long bindingId, - int schemaId, - InputStream in) - { - boolean status = false; - try - { - Schema schema = supplySchema(schemaId); - if (schema != null) - { - GenericRecord record = supplyRecord(schemaId); - GenericDatumReader reader = supplyReader(schemaId); - if (reader != null) - { - decoderFactory.binaryDecoder(in, decoder); - reader.read(record, decoder); - status = true; - } - } - } - catch (IOException | AvroRuntimeException ex) - { - event.validationFailure(traceId, bindingId, ex.getMessage()); - } - return status; + return validate(traceId, bindingId, schemaId, data, index, length); } } diff --git a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java index 6b8e3e0ac6..ee7be64040 100644 --- a/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java +++ b/runtime/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java @@ -14,16 +14,18 @@ */ package io.aklivity.zilla.runtime.model.avro.internal; -import java.io.IOException; - import org.agrona.DirectBuffer; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.CanonicalJsonDecoder; +import org.agrona.collections.Int2ObjectCache; +import io.aklivity.zilla.runtime.common.avro.Avro; +import io.aklivity.zilla.runtime.common.avro.AvroGenerator; +import io.aklivity.zilla.runtime.common.avro.AvroPipeline; +import io.aklivity.zilla.runtime.common.avro.AvroPipeline.Status; +import io.aklivity.zilla.runtime.common.avro.AvroSchema; +import io.aklivity.zilla.runtime.common.avro.AvroSink; +import io.aklivity.zilla.runtime.common.avro.json.AvroJson; +import io.aklivity.zilla.runtime.common.json.JsonEx; +import io.aklivity.zilla.runtime.common.json.JsonParserEx; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.model.ConverterHandler; @@ -32,12 +34,15 @@ public class AvroWriteConverterHandler extends AvroModelHandler implements ConverterHandler { + private final Int2ObjectCache pipelines; + public AvroWriteConverterHandler( AvroModelConfiguration config, AvroModelConfig options, EngineContext context) { super(config, options, context); + this.pipelines = new Int2ObjectCache<>(1, 1024, i -> {}); } @Override @@ -79,54 +84,67 @@ private int serializeJsonRecord( long traceId, long bindingId, int schemaId, - DirectBuffer buffer, + DirectBuffer data, int index, int length, ValueConsumer next) { int valLength = -1; - try - { - Schema schema = supplySchema(schemaId); - if (schema != null) + JsonToAvro pipeline = supplyPipeline(schemaId); + if (pipeline != null) + { + // Avro binary is never more than OUT_SCALE times the JSON it encodes (a double is 8 bytes from a + // one-character number), so the bounded window admits the whole datum and the feed completes once + pipeline.generator.wrap(out, 0, outLimit(length)); + pipeline.pipeline.reset(); + Status status = pipeline.pipeline.feed(data, index, index + length, true); + if (status == Status.COMPLETED) { - switch (schema.getType()) - { - case STRING: - next.accept(buffer, index, length); - valLength = length; - break; - case RECORD: - GenericDatumReader reader = supplyReader(schemaId); - GenericDatumWriter writer = supplyWriter(schemaId); - if (reader != null) - { - GenericRecord record = supplyRecord(schemaId); - in.wrap(buffer, index, length); - expandable.wrap(expandable.buffer()); - CanonicalJsonDecoder decoder = new CanonicalJsonDecoder(schema, in); - record = reader.read(record, decoder); - encoderFactory.binaryEncoder(expandable, encoder); - writer.write(record, encoder); - encoder.flush(); - int position = expandable.position(); - if (position > 0) - { - next.accept(expandable.buffer(), 0, position); - valLength = position; - } - } - break; - default: - break; - } + int chunk = pipeline.generator.length(); + next.accept(out, 0, chunk); + valLength = chunk; + } + else + { + event.validationFailure(traceId, bindingId, "Invalid JSON encoding"); } } - catch (IOException | AvroRuntimeException ex) + return valLength; + } + + private JsonToAvro supplyPipeline( + int schemaId) + { + return pipelines.computeIfAbsent(schemaId, this::newPipeline); + } + + private JsonToAvro newPipeline( + int schemaId) + { + JsonToAvro pipeline = null; + AvroSchema schema = supplySchema(schemaId); + if (schema != null) { - event.validationFailure(traceId, bindingId, ex.getMessage()); + JsonParserEx parser = JsonEx.createParser(); + AvroGenerator generator = Avro.generator(schema, out, 0); + AvroPipeline avro = AvroJson.stream(schema, parser, true).into(AvroSink.of(generator)); + pipeline = new JsonToAvro(avro, generator); + } + return pipeline; + } + + private static final class JsonToAvro + { + private final AvroPipeline pipeline; + private final AvroGenerator generator; + + private JsonToAvro( + AvroPipeline pipeline, + AvroGenerator generator) + { + this.pipeline = pipeline; + this.generator = generator; } - return valLength; } } diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java deleted file mode 100644 index 3656e1d44d..0000000000 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonDecoder.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.avro.io; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.charset.Charset; -import java.util.List; - -import org.apache.avro.AvroTypeException; -import org.apache.avro.Schema; -import org.apache.avro.io.parsing.Symbol; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -public final class CanonicalJsonDecoder extends JsonDecoder -{ - private static final Method ADVANCE; - private static final Method ERROR; - private static final Field IN; - - static - { - try - { - ADVANCE = JsonDecoder.class.getDeclaredMethod("advance", Symbol.class); - ERROR = JsonDecoder.class.getDeclaredMethod("error", String.class); - IN = JsonDecoder.class.getDeclaredField("in"); - ADVANCE.setAccessible(true); - ERROR.setAccessible(true); - IN.setAccessible(true); - } - catch (NoSuchMethodException ex) - { - throw new RuntimeException(ex); - } - catch (SecurityException ex) - { - throw new RuntimeException(ex); - } - catch (NoSuchFieldException ex) - { - throw new RuntimeException(ex); - } - } - - - public CanonicalJsonDecoder(final Schema schema, final InputStream in) - throws IOException - { - super(schema, in); - } - - public CanonicalJsonDecoder(final Schema schema, final String in) - throws IOException - { - this(schema, new ByteArrayInputStream(in.getBytes(Charset.forName("UTF-8")))); - } - - /** - * Overwrite this function to optime json decoding of union {null, type}. - * - * @return - * @throws IOException - */ - @Override - public int readIndex() throws IOException - { - try - { - ADVANCE.invoke(this, Symbol.UNION); - JsonParser lin = getParser(); - Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); - - String label; - final JsonToken currentToken = lin.getCurrentToken(); - if (currentToken == JsonToken.VALUE_NULL) - { - label = "null"; - } - else if (CanonicalJsonEncoder.isNullableSingle(a)) - { - label = CanonicalJsonEncoder.getNullableSingle(a); - } - else if (currentToken == JsonToken.START_OBJECT && - lin.nextToken() == JsonToken.FIELD_NAME) - { - label = lin.getText(); - lin.nextToken(); - parser.pushSymbol(Symbol.UNION_END); - } - else - { - throw (AvroTypeException) ERROR.invoke(this, "start-union"); - } - int n = a.findLabel(label); - if (n < 0) - { - throw new AvroTypeException("Unknown union branch " + label); - } - parser.pushSymbol(a.getSymbol(n)); - return n; - } - catch (IllegalAccessException ex) - { - throw new RuntimeException(ex); - } - catch (IllegalArgumentException ex) - { - throw new RuntimeException(ex); - } - catch (InvocationTargetException ex) - { - throw new RuntimeException(ex); - } - } - - /** - * Overwrite to inject default values. - * - * @param input - * @param top - * @return - * @throws IOException - */ - - @Override - public Symbol doAction(final Symbol input, final Symbol top) throws IOException - { - try - { - JsonParser in = getParser(); - if (top instanceof Symbol.FieldAdjustAction) - { - Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; - String name = fa.fname; - if (currentReorderBuffer != null) - { - List node = currentReorderBuffer.savedFields.get(name); - if (node != null) - { - currentReorderBuffer.savedFields.remove(name); - currentReorderBuffer.origParser = in; - setParser(makeParser(node)); - return null; - } - } - if (in.getCurrentToken() == JsonToken.FIELD_NAME) - { - do - { - String fn = in.getText(); - in.nextToken(); - if (name.equals(fn)) - { - return null; - } - else - { - if (currentReorderBuffer == null) - { - currentReorderBuffer = new JsonDecoder.ReorderBuffer(); - } - currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in)); - } - } - while (in.getCurrentToken() == JsonToken.FIELD_NAME); - } - } - else if (top == Symbol.FIELD_END) - { - if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) - { - setParser(currentReorderBuffer.origParser); - currentReorderBuffer.origParser = null; - } - } - else if (top == Symbol.RECORD_START) - { - if (in.getCurrentToken() == JsonToken.START_OBJECT) - { - in.nextToken(); - reorderBuffers.push(currentReorderBuffer); - currentReorderBuffer = null; - } - else - { - throw error("record-start"); - } - } - else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) - { - if (in.getCurrentToken() == JsonToken.END_OBJECT) - { - in.nextToken(); - if (top == Symbol.RECORD_END) - { - if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) - { - throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet()); - } - currentReorderBuffer = reorderBuffers.pop(); - } - } - else - { - throw error(top == Symbol.RECORD_END ? "record-end" : "union-end"); - } - } - else - { - throw new AvroTypeException("Unknown action symbol " + top); - } - return null; - } - catch (IllegalAccessException ex) - { - throw new RuntimeException(ex); - } - } - - private static final JsonElement NULL_JSON_ELEMENT = new JsonElement(null); - - private JsonParser getParser() throws IllegalAccessException - { - return (JsonParser) IN.get(this); - } - - private void setParser(final JsonParser parser) throws IllegalAccessException - { - IN.set(this, parser); - } -} diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java deleted file mode 100644 index d9a67d8adb..0000000000 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/CanonicalJsonEncoder.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.avro.io; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.avro.Schema; -import org.apache.avro.io.parsing.Parser; -import org.apache.avro.io.parsing.Symbol; - -import com.fasterxml.jackson.core.JsonGenerator; - -/** - * A derived encoder that does the skipping of fields that match the index. It also encodes unions of null and a single - * type as a more normal key=value rather than key={type=value}. - * - * @author zfarkas - */ -public final class CanonicalJsonEncoder extends JsonEncoder -{ - - - public CanonicalJsonEncoder( - final Schema sc, - final OutputStream out) throws IOException - { - super(sc, out); - } - - public CanonicalJsonEncoder( - final Schema sc, - final OutputStream out, - final boolean pretty) throws IOException - { - super(sc, out, pretty); - } - - public CanonicalJsonEncoder( - final Schema sc, - final JsonGenerator out) throws IOException - { - super(sc, out); - } - - public Parser getParser() - { - return parser; - } - - public static boolean isNullableSingle( - final Symbol.Alternative top) - { - return top.size() == 2 && ("null".equals(top.getLabel(0)) || "null".equals(top.getLabel(1))); - } - - public static String getNullableSingle( - final Symbol.Alternative top) - { - final String label = top.getLabel(0); - return "null".equals(label) ? top.getLabel(1) : label; - } - - /** - * Overwrite this function to optime json decoding of union {null, type}. - * - * @param unionIndex - * @throws IOException - */ - - @Override - public void writeIndex( - final int unionIndex) throws IOException - { - parser.advance(Symbol.UNION); - Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); - Symbol symbol = top.getSymbol(unionIndex); - if (symbol != Symbol.NULL && !isNullableSingle(top)) - { - out.writeStartObject(); - out.writeFieldName(top.getLabel(unionIndex)); - parser.pushSymbol(Symbol.UNION_END); - } - parser.pushSymbol(symbol); - } - -} diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java deleted file mode 100644 index 9de5203574..0000000000 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonDecoder.java +++ /dev/null @@ -1,942 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.avro.io; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Stack; - -import org.apache.avro.AvroTypeException; -import org.apache.avro.Schema; -import org.apache.avro.io.parsing.JsonGrammarGenerator; -import org.apache.avro.io.parsing.Parser; -import org.apache.avro.io.parsing.Symbol; -import org.apache.avro.util.Utf8; - -import com.fasterxml.jackson.core.Base64Variant; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonLocation; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonStreamContext; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.core.ObjectCodec; -import com.fasterxml.jackson.core.Version; - -/** A {@link Decoder} for Avro's JSON data encoding. - *

- * Construct using {@link DecoderFactory}. - *

- * JsonDecoder is not thread-safe. - * */ -public class JsonDecoder extends ParsingDecoder - implements Parser.ActionHandler -{ - private JsonParser in; - private static JsonFactory jsonFactory = new JsonFactory(); - Stack reorderBuffers = new Stack(); - ReorderBuffer currentReorderBuffer; - - static class ReorderBuffer - { - public Map> savedFields = new HashMap>(); - public JsonParser origParser = null; - } - - static final String CHARSET = "ISO-8859-1"; - - private JsonDecoder(Symbol root, InputStream in) throws IOException - { - super(root); - configure(in); - } - - private JsonDecoder(Symbol root, String in) throws IOException - { - super(root); - configure(in); - } - - JsonDecoder(Schema schema, InputStream in) throws IOException - { - this(getSymbol(schema), in); - } - - JsonDecoder(Schema schema, String in) throws IOException - { - this(getSymbol(schema), in); - } - - private static Symbol getSymbol(Schema schema) - { - if (null == schema) - { - throw new NullPointerException("Schema cannot be null!"); - } - return new JsonGrammarGenerator().generate(schema); - } - - /** - * Reconfigures this JsonDecoder to use the InputStream provided. - *

- * If the InputStream provided is null, a NullPointerException is thrown. - *

- * Otherwise, this JsonDecoder will reset its state and then - * reconfigure its input. - * @param in - * The IntputStream to read from. Cannot be null. - * @throws IOException - * @return this JsonDecoder - */ - public JsonDecoder configure(InputStream in) throws IOException - { - if (null == in) - { - throw new NullPointerException("InputStream to read from cannot be null!"); - } - parser.reset(); - this.in = jsonFactory.createJsonParser(in); - this.in.nextToken(); - return this; - } - - /** - * Reconfigures this JsonDecoder to use the String provided for input. - *

- * If the String provided is null, a NullPointerException is thrown. - *

- * Otherwise, this JsonDecoder will reset its state and then - * reconfigure its input. - * @param in - * The String to read from. Cannot be null. - * @throws IOException - * @return this JsonDecoder - */ - public JsonDecoder configure(String in) throws IOException - { - if (null == in) - { - throw new NullPointerException("String to read from cannot be null!"); - } - parser.reset(); - this.in = new JsonFactory().createJsonParser(in); - this.in.nextToken(); - return this; - } - - private void advance(Symbol symbol) throws IOException - { - this.parser.processTrailingImplicitActions(); - if (in.getCurrentToken() == null && this.parser.depth() == 1) - throw new EOFException(); - parser.advance(symbol); - } - - @Override - public void readNull() throws IOException - { - advance(Symbol.NULL); - if (in.getCurrentToken() == JsonToken.VALUE_NULL) - { - in.nextToken(); - } - else - { - throw error("null"); - } - } - - @Override - public boolean readBoolean() throws IOException - { - advance(Symbol.BOOLEAN); - JsonToken t = in.getCurrentToken(); - if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) - { - in.nextToken(); - return t == JsonToken.VALUE_TRUE; - } - else - { - throw error("boolean"); - } - } - - @Override - public int readInt() throws IOException - { - advance(Symbol.INT); - if (in.getCurrentToken().isNumeric()) - { - int result = in.getIntValue(); - in.nextToken(); - return result; - } - else - { - throw error("int"); - } - } - - @Override - public long readLong() throws IOException - { - advance(Symbol.LONG); - if (in.getCurrentToken().isNumeric()) - { - long result = in.getLongValue(); - in.nextToken(); - return result; - } - else - { - throw error("long"); - } - } - - @Override - public float readFloat() throws IOException - { - advance(Symbol.FLOAT); - if (in.getCurrentToken().isNumeric()) - { - float result = in.getFloatValue(); - in.nextToken(); - return result; - } - else - { - throw error("float"); - } - } - - @Override - public double readDouble() throws IOException - { - advance(Symbol.DOUBLE); - if (in.getCurrentToken().isNumeric()) - { - double result = in.getDoubleValue(); - in.nextToken(); - return result; - } - else - { - throw error("double"); - } - } - - @Override - public Utf8 readString(Utf8 old) throws IOException - { - return new Utf8(readString()); - } - - @Override - public String readString() throws IOException - { - advance(Symbol.STRING); - if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) - { - parser.advance(Symbol.MAP_KEY_MARKER); - if (in.getCurrentToken() != JsonToken.FIELD_NAME) - { - throw error("map-key"); - } - } - else - { - if (in.getCurrentToken() != JsonToken.VALUE_STRING) - { - throw error("string"); - } - } - String result = in.getText(); - in.nextToken(); - return result; - } - - @Override - public void skipString() throws IOException - { - advance(Symbol.STRING); - if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) - { - parser.advance(Symbol.MAP_KEY_MARKER); - if (in.getCurrentToken() != JsonToken.FIELD_NAME) - { - throw error("map-key"); - } - } - else - { - if (in.getCurrentToken() != JsonToken.VALUE_STRING) - { - throw error("string"); - } - } - in.nextToken(); - } - - @Override - public ByteBuffer readBytes(ByteBuffer old) throws IOException - { - advance(Symbol.BYTES); - if (in.getCurrentToken() == JsonToken.VALUE_STRING) - { - byte[] result = readByteArray(); - in.nextToken(); - return ByteBuffer.wrap(result); - } - else - { - throw error("bytes"); - } - } - - private byte[] readByteArray() throws IOException - { - byte[] result = in.getText().getBytes(CHARSET); - return result; - } - - @Override - public void skipBytes() throws IOException - { - advance(Symbol.BYTES); - if (in.getCurrentToken() == JsonToken.VALUE_STRING) - { - in.nextToken(); - } - else - { - throw error("bytes"); - } - } - - private void checkFixed(int size) throws IOException - { - advance(Symbol.FIXED); - Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); - if (size != top.size) - { - throw new AvroTypeException( - "Incorrect length for fixed binary: expected " + - top.size + " but received " + size + " bytes."); - } - } - - @Override - public void readFixed(byte[] bytes, int start, int len) throws IOException - { - checkFixed(len); - if (in.getCurrentToken() == JsonToken.VALUE_STRING) - { - byte[] result = readByteArray(); - in.nextToken(); - if (result.length != len) - { - throw new AvroTypeException("Expected fixed length " + len - + ", but got" + result.length); - } - System.arraycopy(result, 0, bytes, start, len); - } - else - { - throw error("fixed"); - } - } - - @Override - public void skipFixed(int length) throws IOException - { - checkFixed(length); - doSkipFixed(length); - } - - private void doSkipFixed(int length) throws IOException - { - if (in.getCurrentToken() == JsonToken.VALUE_STRING) - { - byte[] result = readByteArray(); - in.nextToken(); - if (result.length != length) - { - throw new AvroTypeException("Expected fixed length " + length - + ", but got" + result.length); - } - } - else - { - throw error("fixed"); - } - } - - @Override - protected void skipFixed() throws IOException - { - advance(Symbol.FIXED); - Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); - doSkipFixed(top.size); - } - - @Override - public int readEnum() throws IOException - { - advance(Symbol.ENUM); - Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); - if (in.getCurrentToken() == JsonToken.VALUE_STRING) - { - in.getText(); - int n = top.findLabel(in.getText()); - if (n >= 0) - { - in.nextToken(); - return n; - } - throw new AvroTypeException("Unknown symbol in enum " + in.getText()); - } - else - { - throw error("fixed"); - } - } - - @Override - public long readArrayStart() throws IOException - { - advance(Symbol.ARRAY_START); - if (in.getCurrentToken() == JsonToken.START_ARRAY) - { - in.nextToken(); - return doArrayNext(); - } - else - { - throw error("array-start"); - } - } - - @Override - public long arrayNext() throws IOException - { - advance(Symbol.ITEM_END); - return doArrayNext(); - } - - private long doArrayNext() throws IOException - { - if (in.getCurrentToken() == JsonToken.END_ARRAY) - { - parser.advance(Symbol.ARRAY_END); - in.nextToken(); - return 0; - } - else - { - return 1; - } - } - - @Override - public long skipArray() throws IOException - { - advance(Symbol.ARRAY_START); - if (in.getCurrentToken() == JsonToken.START_ARRAY) - { - in.skipChildren(); - in.nextToken(); - advance(Symbol.ARRAY_END); - } - else - { - throw error("array-start"); - } - return 0; - } - - @Override - public long readMapStart() throws IOException - { - advance(Symbol.MAP_START); - if (in.getCurrentToken() == JsonToken.START_OBJECT) - { - in.nextToken(); - return doMapNext(); - } - else - { - throw error("map-start"); - } - } - - @Override - public long mapNext() throws IOException - { - advance(Symbol.ITEM_END); - return doMapNext(); - } - - private long doMapNext() throws IOException - { - if (in.getCurrentToken() == JsonToken.END_OBJECT) - { - in.nextToken(); - advance(Symbol.MAP_END); - return 0; - } - else - { - return 1; - } - } - - @Override - public long skipMap() throws IOException - { - advance(Symbol.MAP_START); - if (in.getCurrentToken() == JsonToken.START_OBJECT) - { - in.skipChildren(); - in.nextToken(); - advance(Symbol.MAP_END); - } - else - { - throw error("map-start"); - } - return 0; - } - - @Override - public int readIndex() throws IOException - { - advance(Symbol.UNION); - Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); - - String label; - if (in.getCurrentToken() == JsonToken.VALUE_NULL) - { - label = "null"; - } - else if (in.getCurrentToken() == JsonToken.START_OBJECT && - in.nextToken() == JsonToken.FIELD_NAME) - { - label = in.getText(); - in.nextToken(); - parser.pushSymbol(Symbol.UNION_END); - } - else - { - throw error("start-union"); - } - int n = a.findLabel(label); - if (n < 0) - throw new AvroTypeException("Unknown union branch " + label); - parser.pushSymbol(a.getSymbol(n)); - return n; - } - - @Override - public Symbol doAction(Symbol input, Symbol top) throws IOException - { - if (top instanceof Symbol.FieldAdjustAction) - { - Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; - String name = fa.fname; - if (currentReorderBuffer != null) - { - List node = currentReorderBuffer.savedFields.get(name); - if (node != null) - { - currentReorderBuffer.savedFields.remove(name); - currentReorderBuffer.origParser = in; - in = makeParser(node); - return null; - } - } - if (in.getCurrentToken() == JsonToken.FIELD_NAME) - { - do - { - String fn = in.getText(); - in.nextToken(); - if (name.equals(fn)) - { - return null; - } - else - { - if (currentReorderBuffer == null) - { - currentReorderBuffer = new ReorderBuffer(); - } - currentReorderBuffer.savedFields.put(fn, getVaueAsTree(in)); - } - } - while (in.getCurrentToken() == JsonToken.FIELD_NAME); - throw new AvroTypeException("Expected field name not found: " + fa.fname); - } - } - else if (top == Symbol.FIELD_END) - { - if (currentReorderBuffer != null && currentReorderBuffer.origParser != null) - { - in = currentReorderBuffer.origParser; - currentReorderBuffer.origParser = null; - } - } - else if (top == Symbol.RECORD_START) - { - if (in.getCurrentToken() == JsonToken.START_OBJECT) - { - in.nextToken(); - reorderBuffers.push(currentReorderBuffer); - currentReorderBuffer = null; - } - else - { - throw error("record-start"); - } - } - else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) - { - if (in.getCurrentToken() == JsonToken.END_OBJECT) - { - in.nextToken(); - if (top == Symbol.RECORD_END) - { - if (currentReorderBuffer != null && !currentReorderBuffer.savedFields.isEmpty()) - { - throw error("Unknown fields: " + currentReorderBuffer.savedFields.keySet()); - } - currentReorderBuffer = reorderBuffers.pop(); - } - } - else - { - throw error(top == Symbol.RECORD_END ? "record-end" : "union-end"); - } - } - else - { - throw new AvroTypeException("Unknown action symbol " + top); - } - return null; - } - - static class JsonElement - { - public final JsonToken token; - public final String value; - - public JsonElement(JsonToken t, String value) - { - this.token = t; - this.value = value; - } - - public JsonElement(JsonToken t) - { - this(t, null); - } - } - - static List getVaueAsTree(JsonParser in) throws IOException - { - int level = 0; - List result = new ArrayList(); - do - { - JsonToken t = in.getCurrentToken(); - switch (t) - { - case START_OBJECT: - case START_ARRAY: - level++; - result.add(new JsonElement(t)); - break; - case END_OBJECT: - case END_ARRAY: - level--; - result.add(new JsonElement(t)); - break; - case FIELD_NAME: - case VALUE_STRING: - case VALUE_NUMBER_INT: - case VALUE_NUMBER_FLOAT: - case VALUE_TRUE: - case VALUE_FALSE: - case VALUE_NULL: - result.add(new JsonElement(t, in.getText())); - break; - } - in.nextToken(); - } - while (level != 0); - result.add(new JsonElement(null)); - return result; - } - - JsonParser makeParser(final List elements) throws IOException - { - return new JsonParser() - { - int pos = 0; - - @Override - public JsonToken nextValue() throws IOException - { - throw new UnsupportedOperationException(); - }; - - @Override - public ObjectCodec getCodec() - { - throw new UnsupportedOperationException(); - } - - @Override - public void setCodec(ObjectCodec c) - { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public JsonToken nextToken() throws IOException - { - pos++; - return elements.get(pos).token; - } - - @Override - public JsonParser skipChildren() throws IOException - { - int level = 0; - do - { - switch (elements.get(pos++).token) - { - case START_ARRAY: - case START_OBJECT: - level++; - break; - case END_ARRAY: - case END_OBJECT: - level--; - break; - } - } - while (level > 0); - return this; - } - - @Override - public boolean isClosed() - { - throw new UnsupportedOperationException(); - } - - @Override - public String getCurrentName() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public JsonStreamContext getParsingContext() - { - throw new UnsupportedOperationException(); - } - - @Override - public JsonLocation getTokenLocation() - { - throw new UnsupportedOperationException(); - } - - @Override - public JsonLocation getCurrentLocation() - { - throw new UnsupportedOperationException(); - } - - @Override - public String getText() throws IOException - { - return elements.get(pos).value; - } - - @Override - public char[] getTextCharacters() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public int getTextLength() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public int getTextOffset() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean hasTextCharacters() - { - return false; - } - - @Override - public Number getNumberValue() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public NumberType getNumberType() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public int getIntValue() throws IOException - { - return Integer.parseInt(getText()); - } - - @Override - public long getLongValue() throws IOException - { - return Long.parseLong(getText()); - } - - @Override - public BigInteger getBigIntegerValue() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public float getFloatValue() throws IOException - { - return Float.parseFloat(getText()); - } - - @Override - public double getDoubleValue() throws IOException - { - return Double.parseDouble(getText()); - } - - @Override - public BigDecimal getDecimalValue() throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getBinaryValue(Base64Variant b64variant) - throws IOException - { - throw new UnsupportedOperationException(); - } - - @Override - public String getValueAsString(String s) throws IOException - { - return ""; - } - - @Override - public JsonToken getCurrentToken() - { - return elements.get(pos).token; - } - - @Override - public int getCurrentTokenId() - { - return 0; - } - - @Override - public boolean hasCurrentToken() - { - return false; - } - - @Override - public boolean hasTokenId(int i) - { - return false; - } - - @Override - public boolean hasToken(JsonToken jsonToken) - { - return false; - } - - @Override - public void clearCurrentToken() - { - - } - - @Override - public JsonToken getLastClearedToken() - { - return null; - } - - @Override - public void overrideCurrentName(String s) - { - - } - - @Override - public Version version() - { - throw new UnsupportedOperationException(); - } - }; - } - - AvroTypeException error(String type) - { - return new AvroTypeException("Expected " + type + - ". Got " + in.getCurrentToken()); - } - -} diff --git a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java b/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java deleted file mode 100644 index 62a4e799e6..0000000000 --- a/runtime/model-avro/src/main/java/org/apache/avro/io/JsonEncoder.java +++ /dev/null @@ -1,401 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.avro.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.BitSet; -import java.util.Objects; - -import org.apache.avro.AvroTypeException; -import org.apache.avro.Schema; -import org.apache.avro.io.parsing.JsonGrammarGenerator; -import org.apache.avro.io.parsing.Parser; -import org.apache.avro.io.parsing.Symbol; -import org.apache.avro.util.Utf8; - -import com.fasterxml.jackson.core.JsonEncoding; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; -import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; - -/** - * An {@link Encoder} for Avro's JSON data encoding. - *

- * Construct using {@link EncoderFactory}. - *

- * JsonEncoder buffers output, and data may not appear on the output until - * {@link Encoder#flush()} is called. - *

- * JsonEncoder is not thread-safe. - */ -public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler -{ - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); - final Parser parser; - private boolean includeNamespace = true; - JsonGenerator out; - - /** - * Has anything been written into the collections? - */ - protected BitSet isEmpty = new BitSet(); - - JsonEncoder( - Schema sc, - OutputStream out) throws IOException - { - this(sc, getJsonGenerator(out, false)); - } - - JsonEncoder( - Schema sc, - OutputStream out, - boolean pretty) throws IOException - { - this(sc, getJsonGenerator(out, pretty)); - } - - JsonEncoder(Schema sc, JsonGenerator out) throws IOException - { - configure(out); - this.parser = new Parser(new JsonGrammarGenerator().generate(sc), this); - } - - @Override - public void flush() throws IOException - { - parser.processImplicitActions(); - if (out != null) - { - out.flush(); - } - } - - // by default, one object per line. - // with pretty option use default pretty printer with root line separator. - private static JsonGenerator getJsonGenerator( - OutputStream out, - boolean pretty) throws IOException - { - Objects.requireNonNull(out, "OutputStream cannot be null"); - JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8); - if (pretty) - { - DefaultPrettyPrinter pp = new DefaultPrettyPrinter() - { - @Override - public void writeRootValueSeparator(JsonGenerator jg) throws IOException - { - jg.writeRaw(LINE_SEPARATOR); - } - }; - g.setPrettyPrinter(pp); - } - else - { - MinimalPrettyPrinter pp = new MinimalPrettyPrinter(); - pp.setRootValueSeparator(LINE_SEPARATOR); - g.setPrettyPrinter(pp); - } - return g; - } - - public boolean isIncludeNamespace() - { - return includeNamespace; - } - - public void setIncludeNamespace( - final boolean includeNamespace) - { - this.includeNamespace = includeNamespace; - } - - /** - * Reconfigures this JsonEncoder to use the output stream provided. - *

- * If the OutputStream provided is null, a NullPointerException is thrown. - *

- * Otherwise, this JsonEncoder will flush its current output and then - * reconfigure its output to use a default UTF8 JsonGenerator that writes to the - * provided OutputStream. - * - * @param out The OutputStream to direct output to. Cannot be null. - * @return this JsonEncoder - * @throws IOException - * @throws NullPointerException if {@code out} is {@code null} - */ - public JsonEncoder configure( - OutputStream out) throws IOException - { - this.configure(getJsonGenerator(out, false)); - return this; - } - - /** - * Reconfigures this JsonEncoder to output to the JsonGenerator provided. - *

- * If the JsonGenerator provided is null, a NullPointerException is thrown. - *

- * Otherwise, this JsonEncoder will flush its current output and then - * reconfigure its output to use the provided JsonGenerator. - * - * @param generator The JsonGenerator to direct output to. Cannot be null. - * @return this JsonEncoder - * @throws IOException - * @throws NullPointerException if {@code generator} is {@code null} - */ - private JsonEncoder configure( - JsonGenerator generator) throws IOException - { - Objects.requireNonNull(generator, "JsonGenerator cannot be null"); - if (null != parser) - { - flush(); - } - this.out = generator; - return this; - } - - @Override - public void writeNull() throws IOException - { - parser.advance(Symbol.NULL); - out.writeNull(); - } - - @Override - public void writeBoolean( - boolean b) throws IOException - { - parser.advance(Symbol.BOOLEAN); - out.writeBoolean(b); - } - - @Override - public void writeInt( - int n) throws IOException - { - parser.advance(Symbol.INT); - out.writeNumber(n); - } - - @Override - public void writeLong( - long n) throws IOException - { - parser.advance(Symbol.LONG); - out.writeNumber(n); - } - - @Override - public void writeFloat( - float f) throws IOException - { - parser.advance(Symbol.FLOAT); - out.writeNumber(f); - } - - @Override - public void writeDouble( - double d) throws IOException - { - parser.advance(Symbol.DOUBLE); - out.writeNumber(d); - } - - @Override - public void writeString( - Utf8 utf8) throws IOException - { - writeString(utf8.toString()); - } - - @Override - public void writeString( - String str) throws IOException - { - parser.advance(Symbol.STRING); - if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) - { - parser.advance(Symbol.MAP_KEY_MARKER); - out.writeFieldName(str); - } - else - { - out.writeString(str); - } - } - - @Override - public void writeBytes(ByteBuffer bytes) throws IOException - { - if (bytes.hasArray()) - { - writeBytes(bytes.array(), bytes.position(), bytes.remaining()); - } - else - { - byte[] b = new byte[bytes.remaining()]; - bytes.duplicate().get(b); - writeBytes(b); - } - } - - @Override - public void writeBytes( - byte[] bytes, - int start, - int len) throws IOException - { - parser.advance(Symbol.BYTES); - writeByteArray(bytes, start, len); - } - - private void writeByteArray( - byte[] bytes, - int start, - int len) throws IOException - { - out.writeString(new String(bytes, start, len, StandardCharsets.ISO_8859_1)); - } - - @Override - public void writeFixed( - byte[] bytes, - int start, - int len) throws IOException - { - parser.advance(Symbol.FIXED); - Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); - if (len != top.size) - { - throw new AvroTypeException( - "Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes."); - } - writeByteArray(bytes, start, len); - } - - @Override - public void writeEnum( - int e) throws IOException - { - parser.advance(Symbol.ENUM); - Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); - if (e < 0 || e >= top.size) - { - throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e); - } - out.writeString(top.getLabel(e)); - } - - @Override - public void writeArrayStart() throws IOException - { - parser.advance(Symbol.ARRAY_START); - out.writeStartArray(); - push(); - isEmpty.set(depth()); - } - - @Override - public void writeArrayEnd() throws IOException - { - if (!isEmpty.get(pos)) - { - parser.advance(Symbol.ITEM_END); - } - pop(); - parser.advance(Symbol.ARRAY_END); - out.writeEndArray(); - } - - @Override - public void writeMapStart() throws IOException - { - push(); - isEmpty.set(depth()); - - parser.advance(Symbol.MAP_START); - out.writeStartObject(); - } - - @Override - public void writeMapEnd() throws IOException - { - if (!isEmpty.get(pos)) - { - parser.advance(Symbol.ITEM_END); - } - pop(); - - parser.advance(Symbol.MAP_END); - out.writeEndObject(); - } - - @Override - public void startItem() throws IOException - { - if (!isEmpty.get(pos)) - { - parser.advance(Symbol.ITEM_END); - } - super.startItem(); - isEmpty.clear(depth()); - } - - @Override - public void writeIndex( - int unionIndex) throws IOException - { - parser.advance(Symbol.UNION); - Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); - Symbol symbol = top.getSymbol(unionIndex); - if (symbol != Symbol.NULL && includeNamespace) - { - out.writeStartObject(); - out.writeFieldName(top.getLabel(unionIndex)); - parser.pushSymbol(Symbol.UNION_END); - } - parser.pushSymbol(symbol); - } - - @Override - public Symbol doAction(Symbol input, Symbol top) throws IOException - { - if (top instanceof Symbol.FieldAdjustAction) - { - Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; - out.writeFieldName(fa.fname); - } - else if (top == Symbol.RECORD_START) - { - out.writeStartObject(); - } - else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) - { - out.writeEndObject(); - } - else if (top != Symbol.FIELD_END) - { - throw new AvroTypeException("Unknown action symbol " + top); - } - return null; - } -} diff --git a/runtime/model-avro/src/main/moditect/module-info.java b/runtime/model-avro/src/main/moditect/module-info.java index 48c85cbf4a..d132b1200a 100644 --- a/runtime/model-avro/src/main/moditect/module-info.java +++ b/runtime/model-avro/src/main/moditect/module-info.java @@ -14,16 +14,12 @@ */ module io.aklivity.zilla.runtime.model.avro { - requires com.fasterxml.jackson.core; - requires com.fasterxml.jackson.databind; requires io.aklivity.zilla.runtime.engine; - requires org.slf4j; + requires io.aklivity.zilla.runtime.common.avro; + requires io.aklivity.zilla.runtime.common.json; exports io.aklivity.zilla.runtime.model.avro.config; - uses io.aklivity.zilla.runtime.model.avro.internal.avro.Conversion; - uses io.aklivity.zilla.runtime.model.avro.internal.avro.LogicalTypes$LogicalTypeFactory; - provides io.aklivity.zilla.runtime.engine.config.ModelConfigAdapterSpi with io.aklivity.zilla.runtime.model.avro.internal.config.AvroModelConfigAdapter; diff --git a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java index e7cfb52f07..10df7331f1 100644 --- a/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java +++ b/runtime/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelTest.java @@ -15,12 +15,14 @@ package io.aklivity.zilla.runtime.model.avro.internal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.time.Clock; import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; import org.junit.Before; import org.junit.Test; @@ -424,4 +426,261 @@ public void shouldExtract() }; converter.extracted(floatPath, floatVisitor); } + + @Test + public void shouldRoundTripComplexJson() + { + String schema = "{\"type\":\"record\",\"name\":\"Event\",\"namespace\":\"io.aklivity.example\"," + + "\"fields\":[" + + "{\"name\":\"id\",\"type\":\"long\"}," + + "{\"name\":\"name\",\"type\":\"string\"}," + + "{\"name\":\"active\",\"type\":\"boolean\"}," + + "{\"name\":\"score\",\"type\":\"double\"}," + + "{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}," + + "{\"name\":\"props\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}," + + "{\"name\":\"status\",\"type\":[\"null\",\"string\"]}," + + "{\"name\":\"meta\",\"type\":{\"type\":\"record\",\"name\":\"Meta\"," + + "\"fields\":[{\"name\":\"m\",\"type\":\"int\"}]}}]}"; + + TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new) + .namespace("test") + .name("test0") + .type("test") + .options(TestCatalogOptionsConfig::builder) + .id(9) + .schema(schema) + .build() + .build(); + when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options)); + + AvroModelConfig writeModel = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + AvroModelConfig readModel = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + + AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context); + AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context); + + // a large value forces the bounded output to suspend mid-datum, so the streamed output must equal + // the whole-buffer output + String name = "x".repeat(1000); + String json = "{\"id\":7,\"name\":\"" + name + "\",\"active\":true,\"score\":0.5," + + "\"tags\":[\"a\",\"b\"],\"props\":{\"k\":\"v\"},\"status\":\"ok\",\"meta\":{\"m\":3}}"; + + byte[] avro = convertToBytes(writer, json.getBytes()); + assertNotNull(avro); + + byte[] roundTrip = convertToBytes(reader, avro); + assertNotNull(roundTrip); + assertEquals(json, new String(roundTrip)); + } + + @Test + public void shouldExtractBinaryTypes() + { + String schema = "{\"type\":\"record\",\"name\":\"Obj\",\"fields\":[" + + "{\"name\":\"b\",\"type\":\"bytes\"}," + + "{\"name\":\"e\",\"type\":{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"A\",\"B\",\"C\"]}}," + + "{\"name\":\"f\",\"type\":\"boolean\"}," + + "{\"name\":\"x\",\"type\":{\"type\":\"fixed\",\"name\":\"F\",\"size\":2}}," + + "{\"name\":\"s\",\"type\":\"string\"}]}"; + + TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new) + .namespace("test") + .name("test0") + .type("test") + .options(TestCatalogOptionsConfig::builder) + .id(9) + .schema(schema) + .build() + .build(); + when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options)); + + AvroModelConfig writeModel = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + AvroModelConfig readModel = AvroModelConfig.builder() + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + + AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context); + String json = "{\"b\":\"/wA=\",\"e\":\"C\",\"f\":true,\"x\":\"AQI=\",\"s\":\"hello\"}"; + byte[] avro = convertToBytes(writer, json.getBytes()); + assertNotNull(avro); + + AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context); + reader.extract("$.b"); + reader.extract("$.e"); + reader.extract("$.f"); + reader.extract("$.x"); + reader.extract("$.s"); + + DirectBuffer data = new UnsafeBuffer(avro); + assertEquals(avro.length, reader.convert(0L, 0L, data, 0, avro.length, ValueConsumer.NOP)); + + reader.extracted("$.s", (buffer, index, length) -> + assertEquals("hello", buffer.getStringWithoutLengthUtf8(index, length))); + reader.extracted("$.e", (buffer, index, length) -> + assertEquals("2", buffer.getStringWithoutLengthUtf8(index, length))); + reader.extracted("$.f", (buffer, index, length) -> + assertEquals("true", buffer.getStringWithoutLengthUtf8(index, length))); + reader.extracted("$.b", (buffer, index, length) -> + { + assertEquals(2, length); + assertEquals((byte) 0xff, buffer.getByte(index)); + assertEquals((byte) 0x00, buffer.getByte(index + 1)); + }); + reader.extracted("$.x", (buffer, index, length) -> + { + assertEquals(2, length); + assertEquals((byte) 0x01, buffer.getByte(index)); + assertEquals((byte) 0x02, buffer.getByte(index + 1)); + }); + } + + @Test + public void shouldRoundTripLargeArrayJson() + { + String schema = "{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"R\"," + + "\"fields\":[{\"name\":\"valueWithAQuiteLongFieldName\",\"type\":\"int\"}]}}"; + + TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new) + .namespace("test") + .name("test0") + .type("test") + .options(TestCatalogOptionsConfig::builder) + .id(9) + .schema(schema) + .build() + .build(); + when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options)); + + AvroModelConfig writeModel = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + AvroModelConfig readModel = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + + // the repeated field name makes the JSON far larger than the Avro, exceeding the bounded output + // window so the decode drains across several feeds and must reassemble identically + StringBuilder builder = new StringBuilder("["); + for (int i = 0; i < 60; i++) + { + builder.append(i > 0 ? "," : "").append("{\"valueWithAQuiteLongFieldName\":").append(i).append("}"); + } + builder.append("]"); + String json = builder.toString(); + + AvroWriteConverterHandler writer = new AvroWriteConverterHandler(config, writeModel, context); + AvroReadConverterHandler reader = new AvroReadConverterHandler(config, readModel, context); + + byte[] avro = convertToBytes(writer, json.getBytes()); + assertNotNull(avro); + + byte[] roundTrip = convertToBytes(reader, avro); + assertNotNull(roundTrip); + assertEquals(json, new String(roundTrip)); + } + + @Test + public void shouldReadInvalidAvroExpectJson() + { + TestCatalogConfig catalog = CatalogConfig.builder(TestCatalogConfig::new) + .namespace("test") + .name("test0") + .type("test") + .options(TestCatalogOptionsConfig::builder) + .id(9) + .schema(SCHEMA) + .build() + .build(); + AvroModelConfig model = AvroModelConfig.builder() + .view("json") + .catalog() + .name("test0") + .schema() + .strategy("topic") + .version("latest") + .subject("test-value") + .build() + .build() + .build(); + + when(context.supplyCatalog(catalog.id)).thenReturn(new TestCatalogHandler(catalog.options)); + when(context.clock()).thenReturn(Clock.systemUTC()); + when(context.supplyEventWriter()).thenReturn(mock(MessageConsumer.class)); + AvroReadConverterHandler converter = new AvroReadConverterHandler(config, model, context); + + DirectBuffer data = new UnsafeBuffer(); + byte[] bytes = {0x06, 0x69, 0x64, 0x30, 0x10}; + data.wrap(bytes, 0, bytes.length); + assertEquals(-1, converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP)); + } + + private static byte[] convertToBytes( + ConverterHandler converter, + byte[] input) + { + DirectBuffer data = new UnsafeBuffer(input); + MutableDirectBuffer destination = new UnsafeBuffer(new byte[64 * 1024]); + int progress = converter.convert(0L, 0L, data, 0, input.length, + (buffer, index, length) -> destination.putBytes(0, buffer, index, length)); + byte[] result = null; + if (progress > 0) + { + result = new byte[progress]; + destination.getBytes(0, result); + } + return result; + } } diff --git a/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml b/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml index 1178154944..4e0ce6ecd4 100644 --- a/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml +++ b/specs/model-avro.spec/src/main/scripts/io/aklivity/zilla/specs/model/avro/config/event.yaml @@ -28,7 +28,7 @@ telemetry: - qname: test:net0 id: model.avro.validation.failed name: MODEL_AVRO_VALIDATION_FAILED - message: A message payload failed validation. + message: A message payload failed validation. truncated datum catalogs: test0: type: test