-
Notifications
You must be signed in to change notification settings - Fork 458
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats #2190
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats #2190
Conversation
26bf237 to
ed1f8fc
Compare
ed1f8fc to
a7642db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces MAP type support for the INDEXED row format, achieving feature parity with ARROW and COMPACTED formats. The implementation adds MAP serialization/deserialization capabilities across the Fluss common layer, Flink integration, and lake integrations (Paimon, Iceberg placeholder).
Key Changes:
- Added MAP type support to IndexedRow format with
IndexedRowReader.readMap()andIndexedRowWriter.writeMap()methods - Introduced
GenericMap,BinaryMap, and format-specific map implementations (IndexedMap, CompactedMap, AlignedMap) with corresponding serializers - Extended Flink, Paimon, and Arrow integrations to handle MAP type conversions and vector operations
Reviewed changes
Copilot reviewed 66 out of 66 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
IndexedRowReader.java |
Added readMap() method and MAP case in field reader factory |
IndexedRowWriter.java |
Added writeMap() method for MAP serialization |
BinaryMap.java |
New binary representation of MAP type with key/value arrays |
GenericMap.java |
Generic MAP implementation wrapping Java maps |
MapSerializer.java |
Serializer for converting InternalMap to BinaryMap |
FlinkAsFlussMap.java |
Adapter for Flink MapData to Fluss InternalMap |
PaimonMapAsFlussMap.java |
Adapter for Paimon map to Fluss InternalMap |
ArrowMapWriter.java / ArrowMapColumnVector.java |
Arrow format MAP support |
| Various test files | Comprehensive test coverage for MAP operations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Object arr, int offset, int length, int elementSize) { | ||
| final long headerInBytes = calculateHeaderInBytes(length); | ||
| final long valueRegionInBytes = elementSize * length; | ||
| final long valueRegionInBytes = (long) elementSize * length; |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cast to (long) is applied only to elementSize, not to the entire multiplication result. This could still cause integer overflow when elementSize * length exceeds Integer.MAX_VALUE before the cast is applied. The cast should wrap the entire multiplication expression: (long) elementSize * length.
|
|
||
| java.util.Map<Object, Object> javaMap = new java.util.LinkedHashMap<>(); | ||
| for (int i = 0; i < values.length; i += 2) { | ||
| javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null); |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test is always true, because of this condition.
| javaMap.put(values[i], (i + 1 < values.length) ? values[i + 1] : null); | |
| javaMap.put(values[i], values[i + 1]); |
| * @since 0.9 | ||
| */ | ||
| @PublicEvolving | ||
| public class BinaryMap extends BinarySection implements InternalMap { |
Copilot
AI
Dec 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class BinaryMap overrides hashCode but not equals.
Purpose
Linked issue: close #1973
Introduce MAP type support for INDEXED format, achieving feature parity with ARROW and COMPACTED formats for MAP data type handling.
Brief change log
readMap()method toIndexedRowReaderfor MAP type deserialization, following the same pattern asreadArray()UnsupportedOperationExceptionfor MAP type inIndexedRowReader.createFieldReader()InternalMapimport toIndexedRowReaderBinaryWriterTest.testCreateValueSetterForMapThrowsException()totestValueSetterWithMapType()- changed from exception test to positive validation testIndexedRowTest.assertAllTypeEquals()to include MAP type assertions (validates size, key array, value array, and null handling)IndexedRowReaderTest.assertAllTypeEqualsForReader()to verify MAP readingTests
Unit Tests:
IndexedRowTest(7 tests) - Validates MAP read/write operations with IndexedRowIndexedRowReaderTest(2 tests) - Validates MAP deserialization with IndexedRowReaderBinaryWriterTest(18 tests) - Validates MAP type ValueWriter creation and usageArrowReaderWriterTest(2 tests) - Confirms ARROW format MAP support continues to workCompactedRowTest(20 tests) - Confirms COMPACTED format MAP support continues to workTest Results: All 49 tests passed successfully (0 failures, 0 errors, 0 skipped)
Test Command:
mvn test -pl fluss-common -Dtest=IndexedRowTest,IndexedRowReaderTest,BinaryWriterTest,ArrowReaderWriterTest,CompactedRowTest -Dcheckstyle.skip=true -Dspotless.check.skip=true### API and Format
API: No breaking changes. This is purely additive functionality that removes an
UnsupportedOperationExceptionand enables existing MAP type infrastructure for INDEXED format.Storage Format: No format changes. The MAP storage format in INDEXED rows was already defined and uses the same
BinaryMapserialization mechanism as other formats. This change only enables reading MAP data that was already writeable.Documentation
No documentation changes required. This change brings INDEXED format to feature parity with ARROW and COMPACTED formats for MAP type support. MAP type usage is already documented in existing Fluss documentation. The change completes internal implementation of existing functionality rather than introducing new user-facing features.