Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED,
format.Encoding_BYTE_STREAM_SPLIT:
format.Encoding_BYTE_STREAM_SPLIT,
format.Encoding_ALP:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
Expand Down
9 changes: 9 additions & 0 deletions parquet/file/column_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,15 @@ func (p *PrimitiveWriterTestSuite) TestRequiredByteStreamSplit() {
}
}

func (p *PrimitiveWriterTestSuite) TestRequiredAlp() {
switch p.Typ {
case reflect.TypeOf(float32(0)), reflect.TypeOf(float64(0)):
p.testRequiredWithEncoding(parquet.Encodings.ALP)
default:
p.Panics(func() { p.testRequiredWithEncoding(parquet.Encodings.ALP) })
}
}

func (p *PrimitiveWriterTestSuite) TestRequiredDictionary() {
p.testRequiredWithEncoding(parquet.Encodings.PlainDict)
}
Expand Down
58 changes: 58 additions & 0 deletions parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/csv"
"fmt"
"io"
"math"
"os"
"path"
"testing"
Expand Down Expand Up @@ -941,3 +942,60 @@ func TestListColumns(t *testing.T) {
}
}
}

func TestAlpEncodingFileRead(t *testing.T) {
testFile := os.Getenv("ALP_TEST_FILE")
if testFile == "" {
t.Skip("ALP_TEST_FILE not set, skipping")
}
if _, err := os.Stat(testFile); os.IsNotExist(err) {
t.Skipf("ALP_TEST_FILE not found: %s", testFile)
}
Comment on lines +947 to +953
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the ALP_TEST_FILE get added to the parquet-testing repo rather than needing a separate env var to specify it?


props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(testFile, false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

nRows := 1024
require.Equal(t, 1, fileReader.NumRowGroups())
meta := fileReader.MetaData()
require.EqualValues(t, nRows, meta.GetNumRows())
require.Equal(t, 1, meta.Schema.NumColumns())
require.Equal(t, "value_f64", meta.Schema.Column(0).Name())
require.Equal(t, parquet.Types.Double, meta.Schema.Column(0).PhysicalType())

rgr := fileReader.RowGroup(0)
require.EqualValues(t, nRows, rgr.NumRows())

rdr, err := rgr.Column(0)
require.NoError(t, err)

f64Reader, ok := rdr.(*file.Float64ColumnChunkReader)
require.True(t, ok)

values := make([]float64, nRows)
total, read, err := f64Reader.ReadBatch(int64(nRows), values, nil, nil)
require.NoError(t, err)
require.EqualValues(t, nRows, total)
require.EqualValues(t, nRows, read)

md, err := rgr.MetaData().ColumnChunk(0)
require.NoError(t, err)
encodings := md.Encodings()
found := false
for _, enc := range encodings {
if enc == parquet.Encodings.ALP {
found = true
break
}
}
require.True(t, found, "expected ALP encoding in column chunk metadata, got %v", encodings)

for i := 0; i < nRows; i++ {
expected := 0.125 + float64(i)*0.25
assert.Equal(t, math.Float64bits(expected), math.Float64bits(values[i]),
"value[%d]: got %v (bits %016x), want %v (bits %016x)",
i, values[i], math.Float64bits(values[i]), expected, math.Float64bits(expected))
}
}
149 changes: 149 additions & 0 deletions parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,155 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) {
require.NoError(t, rdr.Close())
}

func TestAlpFileRoundtrip(t *testing.T) {
for _, tc := range []struct {
name string
values []float64
}{
{
name: "monetary",
values: []float64{1.23, 4.56, 7.89, 10.11, 12.13, 14.15, 16.17, 18.19},
},
{
name: "large_vector",
values: func() []float64 {
v := make([]float64, 2048)
for i := range v {
v[i] = 0.125 + float64(i)*0.25
}
return v
}(),
},
{
name: "integers",
values: []float64{1, 2, 3, 100, 200, 300, 1000, 2000},
},
{
name: "special_values",
values: []float64{0, -0.0, 1.5, -1.5, math.Inf(1), math.Inf(-1), math.NaN()},
},
} {
t.Run(tc.name, func(t *testing.T) {
props := parquet.NewWriterProperties(
parquet.WithEncoding(parquet.Encodings.ALP),
parquet.WithDictionaryDefault(false),
)

field, err := schema.NewPrimitiveNode("value", parquet.Repetitions.Required, parquet.Types.Double, -1, -1)
require.NoError(t, err)

sc, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))

rgw := writer.AppendRowGroup()
cw, err := rgw.NextColumn()
require.NoError(t, err)

f64Writer, ok := cw.(*file.Float64ColumnChunkWriter)
require.True(t, ok)

nVals, err := f64Writer.WriteBatch(tc.values, nil, nil)
require.NoError(t, err)
require.EqualValues(t, len(tc.values), nVals)

require.NoError(t, cw.Close())
require.NoError(t, rgw.Close())
require.NoError(t, writer.Close())

reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)

require.Equal(t, 1, reader.NumRowGroups())
require.EqualValues(t, len(tc.values), reader.NumRows())

rgr := reader.RowGroup(0)
cr, err := rgr.Column(0)
require.NoError(t, err)

f64Reader, ok := cr.(*file.Float64ColumnChunkReader)
require.True(t, ok)

output := make([]float64, len(tc.values))
total, read, err := f64Reader.ReadBatch(int64(len(tc.values)), output, nil, nil)
require.NoError(t, err)
require.EqualValues(t, len(tc.values), total)
require.EqualValues(t, len(tc.values), read)

// Bit-exact comparison (handles NaN, -0.0 correctly)
for i := range tc.values {
assert.Equal(t, math.Float64bits(tc.values[i]), math.Float64bits(output[i]),
"index %d: got %v (bits %016x), want %v (bits %016x)",
i, output[i], math.Float64bits(output[i]), tc.values[i], math.Float64bits(tc.values[i]))
}

require.NoError(t, reader.Close())
})
}
}

func TestAlpFloat32FileRoundtrip(t *testing.T) {
values := []float32{1.23, 4.56, 7.89, 0, -1.5, 100.0, 0.001}

props := parquet.NewWriterProperties(
parquet.WithEncoding(parquet.Encodings.ALP),
parquet.WithDictionaryDefault(false),
)

field, err := schema.NewPrimitiveNode("value", parquet.Repetitions.Required, parquet.Types.Float, -1, -1)
require.NoError(t, err)

sc, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
writer := file.NewParquetWriter(sink, sc, file.WithWriterProps(props))

rgw := writer.AppendRowGroup()
cw, err := rgw.NextColumn()
require.NoError(t, err)

f32Writer, ok := cw.(*file.Float32ColumnChunkWriter)
require.True(t, ok)

nVals, err := f32Writer.WriteBatch(values, nil, nil)
require.NoError(t, err)
require.EqualValues(t, len(values), nVals)

require.NoError(t, cw.Close())
require.NoError(t, rgw.Close())
require.NoError(t, writer.Close())

reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)

require.Equal(t, 1, reader.NumRowGroups())
require.EqualValues(t, len(values), reader.NumRows())

rgr := reader.RowGroup(0)
cr, err := rgr.Column(0)
require.NoError(t, err)

f32Reader, ok := cr.(*file.Float32ColumnChunkReader)
require.True(t, ok)

output := make([]float32, len(values))
total, read, err := f32Reader.ReadBatch(int64(len(values)), output, nil, nil)
require.NoError(t, err)
require.EqualValues(t, len(values), total)
require.EqualValues(t, len(values), read)

for i := range values {
assert.Equal(t, math.Float32bits(values[i]), math.Float32bits(output[i]),
"index %d: got %v (bits %08x), want %v (bits %08x)",
i, output[i], math.Float32bits(output[i]), values[i], math.Float32bits(values[i]))
}

require.NoError(t, reader.Close())
}

func TestLZ4RawFileRoundtrip(t *testing.T) {
input := []int64{
-1, 0, 1, 2, 3, 4, 5, 123456789, -123456789,
Expand Down
Loading
Loading