Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 89 additions & 38 deletions arrow-avro/src/reader/async_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,18 @@ impl<R> ReaderBuilder<R> {
}
}

impl<R: AsyncFileReader> ReaderBuilder<R> {
async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
impl<R> ReaderBuilder<R>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, do we want to allow maybe a with_header function as well? that will accept a user's header directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the typical usecase, but makes it more flexible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the behavior with this method? Skip reading the header from the file, and start decoding from...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumable the range?
The behaviour would be the exact same, since the header ends with the magic I believe? and we start the actual decoding from the first magic we encounter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the use case would be, the application parses the header once (or just supplies their own), and then passes it to read ranges in the file on the object store, assuming the header stays the same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the use case would be, the application parses the header once (or just supplies their own), and then passes it to read ranges in the file on the object store, assuming the header stays the same?

At the worst case (range is 0-something), we scan the header bytes very fast until we find the magic, no decoding needed, then we start scanning normally.
Best case is range is middleOfFile-something, and we don't need to do the first call to read the header at all since the user provided it. we just scan until the first magic and party on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Header is not currently public, but this could be just an oversight. Its interface looks public-ready.

I also think so, but maybe it's better to do this in a separate PR, making this public has a tendency to bite back 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumable the range?

What if the range is not given?

The behaviour would be the exact same, since the header ends with the magic I believe?

The current behavior uses the discovered length of the header as it was parsed from the file.
If the application supplies its own, the with_header method should also give the length, i.e. the offset past the header to start parsing the data from. Alternatively, we could just scan for the magic from the start of the file (unless the range option directs otherwise), but I'm not sure this is bulletproof.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if range is not given it is 0..EOF
in which case, as I said - we scan the bytes quickly for the magic(which was provided in the header by the user), no decoding happens, then we start decoding normally.

Copy link
Contributor

@jecsand838 jecsand838 Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, do we want to allow maybe a with_header function as well? that will accept a user's header directly?

I’d be a bit hesitant about with_header. For OCF, the file header is the source of truth as it carries the required avro.schema metadata, and this reader also relies on the parsed header length to know where block decoding should begin. If callers can inject a header, we’d be operating on out-of-band metadata rather than the file’s actual header bytes, and we’d also need to define how header length / start offset are supplied and validated. That feels like a separate optimization for cached/ranged reads, rather than part of this PR’s goal of exposing the discovered writer schema imo.

Header is not currently public, but this could be just an oversight. Its interface looks public-ready.

We had tightened the crate's public API prior to initial release. That way shipping potential fixes with minor releases would be simpler. However Header, HeaderDecoder, and read_header being publicly exposed makes complete sense now that there's a good use-case for it imo.

where
R: AsyncFileReader,
{
/// Reads the Avro file header asynchronously to extract metadata.
///
/// The returned builder contains the parsed header information.
pub async fn read_header(mut self) -> Result<ReaderBuilderWithHeaderInfo<R>, AvroError> {
if self.file_size == 0 {
return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
}

let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
Expand Down Expand Up @@ -147,42 +157,83 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
position += read as u64;
}

decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
let header = decoder.flush().ok_or_else(|| {
AvroError::ParseError("Unexpected EOF while reading Avro header".into())
})?;
Ok(ReaderBuilderWithHeaderInfo::new(self, header, position))
}

/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
if self.file_size == 0 {
return Err(AvroError::InvalidArgument("File size cannot be 0".into()));
}

pub async fn try_build(self) -> Result<AsyncAvroFileReader<R>, AvroError> {
// Start by reading the header from the beginning of the avro file
// take the writer schema from the header
let (header, header_len) = self.read_header().await?;
let writer_schema = {
let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
})?
.to_string();
AvroSchema::new(json_string)
};
let builder_with_header = self.read_header().await?;
builder_with_header.try_build().await
}
}

/// Intermediate builder struct that holds the writer schema and header length
/// parsed from the file.
pub struct ReaderBuilderWithHeaderInfo<R> {
inner: ReaderBuilder<R>,
header: Header,
header_len: u64,
}

impl<R> ReaderBuilderWithHeaderInfo<R> {
fn new(inner: ReaderBuilder<R>, header: Header, header_len: u64) -> Self {
Self {
inner,
header,
header_len,
}
}

/// Returns the writer schema parsed from the Avro file header.
pub fn writer_schema(&self) -> Result<AvroSchema, AvroError> {
let raw = self.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
})?
.to_string();
Ok(AvroSchema::new(json_string))
}

/// Sets the reader schema used during decoding.
///
/// If not provided, the writer schema from the OCF header is used directly.
///
/// A reader schema can be used for schema evolution or projection.
pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self {
self.inner = self.inner.with_reader_schema(reader_schema);
self
}

/// Specify a projection of column indices to read from the Avro file.
/// This can help optimize reading by only fetching the necessary columns.
pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
self.inner = self.inner.with_projection(projection);
self
}

/// Build the asynchronous Avro reader with the provided parameters.
pub async fn try_build(self) -> Result<AsyncAvroFileReader<R>, AvroError> {
// Take the writer schema from the header
let writer_schema = self.writer_schema()?;

// If projection exists, project the reader schema,
// if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
// if no reader schema is provided, project the writer schema.
// this projected schema will be the schema used for reading.
let projected_reader_schema = self
.inner
.projection
.as_deref()
.map(|projection| {
let base_schema = if let Some(reader_schema) = &self.reader_schema {
let base_schema = if let Some(reader_schema) = &self.inner.reader_schema {
reader_schema
} else {
&writer_schema
Expand All @@ -195,7 +246,7 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
// (both optional, at worst no reader schema is provided, in which case we read with the writer schema)
let effective_reader_schema = projected_reader_schema
.as_ref()
.or(self.reader_schema.as_ref())
.or(self.inner.reader_schema.as_ref())
.map(|s| s.schema())
.transpose()?;

Expand All @@ -206,47 +257,47 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
builder = builder.with_reader_schema(reader_schema);
}
builder
.with_utf8view(self.utf8_view)
.with_strict_mode(self.strict_mode)
.with_utf8view(self.inner.utf8_view)
.with_strict_mode(self.inner.strict_mode)
.build()
}?;

let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?;
let decoder = Decoder::from_parts(
self.batch_size,
self.inner.batch_size,
record_decoder,
None,
IndexMap::new(),
FingerprintAlgorithm::Rabin,
);
let range = match self.range {
let range = match self.inner.range {
Some(r) => {
// If this PartitionedFile's range starts at 0, we need to skip the header bytes.
// But then we need to seek back 16 bytes to include the sync marker for the first block,
// as the logic in this reader searches the data for the first sync marker(after which a block starts),
// then reads blocks from the count, size etc.
let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty
let start = r.start.max(self.header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?);
let end = r.end.max(start).min(self.inner.file_size); // Ensure end is not less than start, worst case range is empty
start..end
}
None => 0..self.file_size,
None => 0..self.inner.file_size,
};

// Determine if there is actually data to fetch, note that we subtract the header len from range.start,
// so we need to check if range.end == header_len to see if there's no data after the header
let reader_state = if range.start == range.end || header_len == range.end {
let reader_state = if range.start == range.end || self.header_len == range.end {
ReaderState::Finished
} else {
ReaderState::Idle {
reader: self.reader,
reader: self.inner.reader,
}
};
let codec = header.compression()?;
let sync_marker = header.sync();
let codec = self.header.compression()?;
let sync_marker = self.header.sync();

Ok(AsyncAvroFileReader::new(
range,
self.file_size,
self.inner.file_size,
decoder,
codec,
sync_marker,
Expand Down
Loading