diff --git a/arrow-avro/src/reader/async_reader/builder.rs b/arrow-avro/src/reader/async_reader/builder.rs index 0f9a7abf1cd4..663d78efc3b5 100644 --- a/arrow-avro/src/reader/async_reader/builder.rs +++ b/arrow-avro/src/reader/async_reader/builder.rs @@ -109,8 +109,18 @@ impl ReaderBuilder { } } -impl ReaderBuilder { - async fn read_header(&mut self) -> Result<(Header, u64), AvroError> { +impl ReaderBuilder +where + R: AsyncFileReader, +{ + /// Reads the Avro file header asynchronously to extract metadata. + /// + /// The returned builder contains the parsed header information. + pub async fn read_header(mut self) -> Result, AvroError> { + if self.file_size == 0 { + return Err(AvroError::InvalidArgument("File size cannot be 0".into())); + } + let mut decoder = HeaderDecoder::default(); let mut position = 0; loop { @@ -147,42 +157,83 @@ impl ReaderBuilder { position += read as u64; } - decoder - .flush() - .map(|header| (header, position)) - .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into())) + let header = decoder.flush().ok_or_else(|| { + AvroError::ParseError("Unexpected EOF while reading Avro header".into()) + })?; + Ok(ReaderBuilderWithHeaderInfo::new(self, header, position)) } /// Build the asynchronous Avro reader with the provided parameters. /// This reads the header first to initialize the reader state. - pub async fn try_build(mut self) -> Result, AvroError> { - if self.file_size == 0 { - return Err(AvroError::InvalidArgument("File size cannot be 0".into())); - } - + pub async fn try_build(self) -> Result, AvroError> { // Start by reading the header from the beginning of the avro file // take the writer schema from the header - let (header, header_len) = self.read_header().await?; - let writer_schema = { - let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { - AvroError::ParseError("No Avro schema present in file header".to_string()) - })?; - let json_string = std::str::from_utf8(raw) - .map_err(|e| { - AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}")) - })? - .to_string(); - AvroSchema::new(json_string) - }; + let builder_with_header = self.read_header().await?; + builder_with_header.try_build().await + } +} + +/// Intermediate builder struct that holds the writer schema and header length +/// parsed from the file. +pub struct ReaderBuilderWithHeaderInfo { + inner: ReaderBuilder, + header: Header, + header_len: u64, +} + +impl ReaderBuilderWithHeaderInfo { + fn new(inner: ReaderBuilder, header: Header, header_len: u64) -> Self { + Self { + inner, + header, + header_len, + } + } + + /// Returns the writer schema parsed from the Avro file header. + pub fn writer_schema(&self) -> Result { + let raw = self.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { + AvroError::ParseError("No Avro schema present in file header".to_string()) + })?; + let json_string = std::str::from_utf8(raw) + .map_err(|e| { + AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}")) + })? + .to_string(); + Ok(AvroSchema::new(json_string)) + } + + /// Sets the reader schema used during decoding. + /// + /// If not provided, the writer schema from the OCF header is used directly. + /// + /// A reader schema can be used for schema evolution or projection. + pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self { + self.inner = self.inner.with_reader_schema(reader_schema); + self + } + + /// Specify a projection of column indices to read from the Avro file. + /// This can help optimize reading by only fetching the necessary columns. + pub fn with_projection(mut self, projection: Vec) -> Self { + self.inner = self.inner.with_projection(projection); + self + } + + /// Build the asynchronous Avro reader with the provided parameters. + pub async fn try_build(self) -> Result, AvroError> { + // Take the writer schema from the header + let writer_schema = self.writer_schema()?; // If projection exists, project the reader schema, - // if no reader schema is provided, parse it from the header(get the raw writer schema), and project that + // if no reader schema is provided, project the writer schema. // this projected schema will be the schema used for reading. let projected_reader_schema = self + .inner .projection .as_deref() .map(|projection| { - let base_schema = if let Some(reader_schema) = &self.reader_schema { + let base_schema = if let Some(reader_schema) = &self.inner.reader_schema { reader_schema } else { &writer_schema @@ -195,7 +246,7 @@ impl ReaderBuilder { // (both optional, at worst no reader schema is provided, in which case we read with the writer schema) let effective_reader_schema = projected_reader_schema .as_ref() - .or(self.reader_schema.as_ref()) + .or(self.inner.reader_schema.as_ref()) .map(|s| s.schema()) .transpose()?; @@ -206,47 +257,47 @@ impl ReaderBuilder { builder = builder.with_reader_schema(reader_schema); } builder - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) + .with_utf8view(self.inner.utf8_view) + .with_strict_mode(self.inner.strict_mode) .build() }?; let record_decoder = RecordDecoder::try_new_with_options(root.data_type())?; let decoder = Decoder::from_parts( - self.batch_size, + self.inner.batch_size, record_decoder, None, IndexMap::new(), FingerprintAlgorithm::Rabin, ); - let range = match self.range { + let range = match self.inner.range { Some(r) => { // If this PartitionedFile's range starts at 0, we need to skip the header bytes. // But then we need to seek back 16 bytes to include the sync marker for the first block, // as the logic in this reader searches the data for the first sync marker(after which a block starts), // then reads blocks from the count, size etc. - let start = r.start.max(header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?); - let end = r.end.max(start).min(self.file_size); // Ensure end is not less than start, worst case range is empty + let start = r.start.max(self.header_len.checked_sub(16).ok_or(AvroError::ParseError("Avro header length overflow, header was not long enough to contain avro bytes".to_string()))?); + let end = r.end.max(start).min(self.inner.file_size); // Ensure end is not less than start, worst case range is empty start..end } - None => 0..self.file_size, + None => 0..self.inner.file_size, }; // Determine if there is actually data to fetch, note that we subtract the header len from range.start, // so we need to check if range.end == header_len to see if there's no data after the header - let reader_state = if range.start == range.end || header_len == range.end { + let reader_state = if range.start == range.end || self.header_len == range.end { ReaderState::Finished } else { ReaderState::Idle { - reader: self.reader, + reader: self.inner.reader, } }; - let codec = header.compression()?; - let sync_marker = header.sync(); + let codec = self.header.compression()?; + let sync_marker = self.header.sync(); Ok(AsyncAvroFileReader::new( range, - self.file_size, + self.inner.file_size, decoder, codec, sync_marker,