diff --git a/parquet/pqarrow/encode_arrow_test.go b/parquet/pqarrow/encode_arrow_test.go index b60a960c..10ff5074 100644 --- a/parquet/pqarrow/encode_arrow_test.go +++ b/parquet/pqarrow/encode_arrow_test.go @@ -19,6 +19,7 @@ package pqarrow_test import ( "bytes" "context" + "encoding/base64" "encoding/binary" "fmt" "math" @@ -1532,9 +1533,9 @@ func makeListArray(values arrow.Array, size, nullcount int) arrow.Array { nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size)))) curOffset := 0 - for i := 0; i < size; i++ { + for i := range size { offsetsArr[i] = int32(curOffset) - if !(((i % 2) == 0) && ((i / 2) < nullcount)) { + if i%2 != 0 || i/2 >= nullcount { // non-null list (list with index 1 is always empty) bitutil.SetBit(nullBitmap, i) if i != 1 { @@ -2108,6 +2109,105 @@ func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() { ps.roundTripTable(mem, expected, false) } +// TestListOfStructWithEmptyListStoreSchema tests that ARROW:schema metadata stored +// in a Parquet file uses "element" (not "item") as the list element field name, to +// match the actual Parquet column paths. This is required for compatibility with +// readers like Snowflake that resolve columns by matching ARROW:schema field names +// to Parquet column path segments. See https://github.com/apache/arrow-go/issues/744. +func TestListOfStructWithEmptyListStoreSchema(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + opsStruct := arrow.StructOf( + arrow.Field{Name: "id", Type: arrow.BinaryTypes.String, Nullable: false}, + arrow.Field{Name: "token", Type: arrow.BinaryTypes.String, Nullable: true}, + arrow.Field{Name: "amount", Type: arrow.BinaryTypes.String, Nullable: true}, + ) + // arrow.ListOf uses "item" as the element field name, which would mismatch + // the Parquet column path that uses "element". The fix ensures the stored + // ARROW:schema uses "element" to stay consistent with the Parquet columns. + schema := arrow.NewSchema([]arrow.Field{ + {Name: "block_num", Type: arrow.PrimitiveTypes.Uint64, Nullable: false}, + {Name: "tx_id", Type: arrow.BinaryTypes.String, Nullable: false}, + {Name: "ops", Type: arrow.ListOf(opsStruct), Nullable: true}, + }, nil) + + b := array.NewRecordBuilder(mem, schema) + defer b.Release() + + b.Field(0).(*array.Uint64Builder).AppendValues([]uint64{100, 101, 102}, nil) + b.Field(1).(*array.StringBuilder).AppendValues([]string{"tx-a", "tx-b", "tx-c"}, nil) + + lb := b.Field(2).(*array.ListBuilder) + sb := lb.ValueBuilder().(*array.StructBuilder) + idb := sb.FieldBuilder(0).(*array.StringBuilder) + tokb := sb.FieldBuilder(1).(*array.StringBuilder) + amtb := sb.FieldBuilder(2).(*array.StringBuilder) + + lb.Append(true) + sb.Append(true) + idb.Append("op-1") + tokb.Append("USDC") + amtb.Append("10") + sb.Append(true) + idb.Append("op-2") + tokb.Append("ETH") + amtb.Append("1.5") + lb.Append(true) // empty list + lb.Append(true) + sb.Append(true) + idb.Append("op-3") + tokb.AppendNull() + amtb.Append("42") + + rec := b.NewRecordBatch() + defer rec.Release() + + var buf bytes.Buffer + props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), parquet.WithStats(true)) + arrowProps := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema()) + + pw, err := pqarrow.NewFileWriter(schema, &buf, props, arrowProps) + require.NoError(t, err) + require.NoError(t, pw.Write(rec)) + require.NoError(t, pw.Close()) + + // Verify round-trip data is correct. + pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + defer pf.Close() + + fr, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, mem) + require.NoError(t, err) + + tbl, err := fr.ReadTable(context.Background()) + require.NoError(t, err) + defer tbl.Release() + + require.EqualValues(t, 3, tbl.NumRows()) + + // Verify the stored ARROW:schema uses "element" as the list element field name + // (consistent with the Parquet column path "ops.list.element.*"), not "item" + // (the default Arrow field name from arrow.ListOf()). + arrowSchemaEncoded := pf.MetaData().KeyValueMetadata().FindValue("ARROW:schema") + require.NotNil(t, arrowSchemaEncoded, "ARROW:schema metadata key must be present") + decoded, err := base64.StdEncoding.DecodeString(*arrowSchemaEncoded) + require.NoError(t, err) + // DeserializeSchema wraps bytes in an IPC stream; use ipc.NewReader to decode. + ipcRdr, err := ipc.NewReader(bytes.NewReader(decoded), ipc.WithAllocator(mem)) + require.NoError(t, err) + defer ipcRdr.Release() + storedSchema := ipcRdr.Schema() + + opsField, ok := storedSchema.FieldsByName("ops") + require.True(t, ok) + opsListType, ok := opsField[0].Type.(*arrow.ListType) + require.True(t, ok) + // Must be "element" (matching Parquet column path) not "item" (Arrow default). + assert.Equal(t, "element", opsListType.ElemField().Name, + "ARROW:schema element name must match the Parquet column path segment") +} + func TestParquetArrowIO(t *testing.T) { suite.Run(t, new(ParquetIOTestSuite)) } diff --git a/parquet/pqarrow/file_writer.go b/parquet/pqarrow/file_writer.go index 6c305c59..ed401696 100644 --- a/parquet/pqarrow/file_writer.go +++ b/parquet/pqarrow/file_writer.go @@ -33,6 +33,48 @@ import ( "golang.org/x/xerrors" ) +// normalizeFieldForParquet recursively normalizes an Arrow field so that its +// type matches the Parquet column structure that fieldToNode would produce. +// Specifically, list element field names are set to "element" because +// ListOfWithName (used by fieldToNode) always names the Parquet element group +// "element", regardless of the original Arrow element field name. +func normalizeFieldForParquet(f arrow.Field) arrow.Field { + switch dt := f.Type.(type) { + case *arrow.ListType: + elem := normalizeFieldForParquet(dt.ElemField()) + elem.Name = "element" + return arrow.Field{Name: f.Name, Type: arrow.ListOfField(elem), Nullable: f.Nullable, Metadata: f.Metadata} + case *arrow.FixedSizeListType: + elem := normalizeFieldForParquet(dt.ElemField()) + elem.Name = "element" + return arrow.Field{Name: f.Name, Type: arrow.FixedSizeListOfField(dt.Len(), elem), Nullable: f.Nullable, Metadata: f.Metadata} + case *arrow.StructType: + fields := make([]arrow.Field, dt.NumFields()) + for i := 0; i < dt.NumFields(); i++ { + fields[i] = normalizeFieldForParquet(dt.Field(i)) + } + return arrow.Field{Name: f.Name, Type: arrow.StructOf(fields...), Nullable: f.Nullable, Metadata: f.Metadata} + case *arrow.MapType: + key := normalizeFieldForParquet(dt.KeyField()) + item := normalizeFieldForParquet(dt.ItemField()) + return arrow.Field{Name: f.Name, Type: arrow.MapOfFields(key, item), Nullable: f.Nullable, Metadata: f.Metadata} + } + return f +} + +// normalizeSchemaForParquet returns a copy of the Arrow schema with list element +// field names updated to "element" to match the Parquet column paths produced by +// fieldToNode. This is used when storing the ARROW:schema metadata to ensure +// consistency between the stored schema and the actual Parquet column structure. +func normalizeSchemaForParquet(sc *arrow.Schema) *arrow.Schema { + fields := make([]arrow.Field, sc.NumFields()) + for i, f := range sc.Fields() { + fields[i] = normalizeFieldForParquet(f) + } + meta := sc.Metadata() + return arrow.NewSchema(fields, &meta) +} + // WriteTable is a convenience function to create and write a full array.Table to a parquet file. The schema // and columns will be determined by the schema of the table, writing the file out to the provided writer. // The chunksize will be utilized in order to determine the size of the row groups. @@ -80,7 +122,14 @@ func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterPr } if arrprops.storeSchema { - serializedSchema := flight.SerializeSchema(arrschema, props.Allocator()) + // Normalize the Arrow schema so that list element field names match the + // Parquet column group names. fieldToNode always uses "element" as the + // Parquet group name for list element fields (via ListOfWithName), but + // arrow.ListOf() uses "item" as the Arrow element field name. This + // inconsistency causes readers (e.g. Snowflake) that map ARROW:schema field + // names to Parquet column paths to fail to locate the correct columns. + schemaToStore := normalizeSchemaForParquet(arrschema) + serializedSchema := flight.SerializeSchema(schemaToStore, props.Allocator()) meta.Append("ARROW:schema", base64.StdEncoding.EncodeToString(serializedSchema)) } diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go index aac64825..1167b5a5 100644 --- a/parquet/pqarrow/schema.go +++ b/parquet/pqarrow/schema.go @@ -722,7 +722,7 @@ func listToSchemaField(n *schema.GroupNode, currentLevels file.LevelInfo, ctx *s // If the name is array or ends in _tuple, this should be a list of struct // even for single child elements. listGroup := listNode.(*schema.GroupNode) - if listGroup.NumFields() == 1 && !(listGroup.Name() == "array" || listGroup.Name() == (n.Name()+"_tuple")) { + if listGroup.NumFields() == 1 && (listGroup.Name() != "array" && listGroup.Name() != n.Name()+"_tuple") { // list of primitive type if err := nodeToSchemaField(listGroup.Field(0), currentLevels, ctx, out, &out.Children[0]); err != nil { return err diff --git a/parquet/schema/node.go b/parquet/schema/node.go index 10bd9b09..e239aa5e 100644 --- a/parquet/schema/node.go +++ b/parquet/schema/node.go @@ -172,7 +172,7 @@ func NewPrimitiveNodeLogical(name string, repetition parquet.Repetition, logical n.convertedType, n.decimalMetaData = n.logicalType.ToConvertedType() } - if !(n.logicalType != nil && !n.logicalType.IsNested() && n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData)) { + if n.logicalType == nil || n.logicalType.IsNested() || !n.logicalType.IsCompatible(n.convertedType, n.decimalMetaData) { return nil, fmt.Errorf("invalid logical type %s", n.logicalType) }