diff --git a/arrow-avro/src/reader/async_reader/builder.rs b/arrow-avro/src/reader/async_reader/builder.rs index 0f9a7abf1cd..1856410d671 100644 --- a/arrow-avro/src/reader/async_reader/builder.rs +++ b/arrow-avro/src/reader/async_reader/builder.rs @@ -18,7 +18,7 @@ use crate::codec::AvroFieldBuilder; use crate::errors::AvroError; use crate::reader::async_reader::ReaderState; -use crate::reader::header::{Header, HeaderDecoder}; +use crate::reader::header::{Header, HeaderDecoder, HeaderInfo}; use crate::reader::record::RecordDecoder; use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder}; use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY}; @@ -109,50 +109,71 @@ impl ReaderBuilder { } } -impl ReaderBuilder { - async fn read_header(&mut self) -> Result<(Header, u64), AvroError> { - let mut decoder = HeaderDecoder::default(); - let mut position = 0; - loop { - let range_to_fetch = position - ..(position + self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)) - .min(self.file_size); +/// Reads the Avro file header (magic, metadata, sync marker) asynchronously from `reader`. +/// +/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes. +pub async fn read_header_info( + reader: &mut R, + file_size: u64, + header_size_hint: Option, +) -> Result +where + R: AsyncFileReader, +{ + read_header(reader, file_size, header_size_hint) + .await + .map(|(header, header_len)| HeaderInfo::new(header, header_len)) +} - // Maybe EOF after the header, no actual data - if range_to_fetch.is_empty() { - break; - } +async fn read_header( + reader: &mut R, + file_size: u64, + header_size_hint: Option, +) -> Result<(Header, u64), AvroError> +where + R: AsyncFileReader, +{ + let mut decoder = HeaderDecoder::default(); + let mut position = 0; + loop { + let range_to_fetch = position + ..(position + header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size); - let current_data = self - .reader - .get_bytes(range_to_fetch.clone()) - .await - .map_err(|err| { - AvroError::General(format!( - "Error fetching Avro header from file reader: {err}" - )) - })?; - if current_data.is_empty() { - return Err(AvroError::EOF( - "Unexpected EOF while fetching header data".into(), - )); - } + // Maybe EOF after the header, no actual data + if range_to_fetch.is_empty() { + break; + } - let read = current_data.len(); - let decoded = decoder.decode(¤t_data)?; - if decoded != read { - position += decoded as u64; - break; - } - position += read as u64; + let current_data = reader + .get_bytes(range_to_fetch.clone()) + .await + .map_err(|err| { + AvroError::General(format!( + "Error fetching Avro header from file reader: {err}" + )) + })?; + if current_data.is_empty() { + return Err(AvroError::EOF( + "Unexpected EOF while fetching header data".into(), + )); } - decoder - .flush() - .map(|header| (header, position)) - .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into())) + let read = current_data.len(); + let decoded = decoder.decode(¤t_data)?; + if decoded != read { + position += decoded as u64; + break; + } + position += read as u64; } + decoder + .flush() + .map(|header| (header, position)) + .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into())) +} + +impl ReaderBuilder { /// Build the asynchronous Avro reader with the provided parameters. /// This reads the header first to initialize the reader state. pub async fn try_build(mut self) -> Result, AvroError> { @@ -162,7 +183,27 @@ impl ReaderBuilder { // Start by reading the header from the beginning of the avro file // take the writer schema from the header - let (header, header_len) = self.read_header().await?; + let (header, header_len) = + read_header(&mut self.reader, self.file_size, self.header_size_hint).await?; + self.build_internal(&header, header_len) + } + + /// Build the asynchronous Avro reader with the provided header. + /// + /// This allows initializing the reader with pre-parsed header information. + /// Note that this method is not async because it does not need to perform any I/O operations. + pub fn build_with_header( + self, + header_info: HeaderInfo, + ) -> Result, AvroError> { + self.build_internal(header_info.header(), header_info.header_len()) + } + + fn build_internal( + self, + header: &Header, + header_len: u64, + ) -> Result, AvroError> { let writer_schema = { let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { AvroError::ParseError("No Avro schema present in file header".to_string()) diff --git a/arrow-avro/src/reader/async_reader/mod.rs b/arrow-avro/src/reader/async_reader/mod.rs index 02c00a60e0e..1f74103e52c 100644 --- a/arrow-avro/src/reader/async_reader/mod.rs +++ b/arrow-avro/src/reader/async_reader/mod.rs @@ -15,6 +15,11 @@ // specific language governing permissions and limitations // under the License. +//! Asyncronous implementation of Avro file reader. +//! +//! This module provides [`AsyncAvroFileReader`], which supports reading and decoding +//! the Avro OCF format from any source that implements [`AsyncFileReader`]. + use crate::compression::CompressionCodec; use crate::reader::Decoder; use crate::reader::block::{BlockDecoder, BlockDecoderState}; @@ -32,7 +37,7 @@ mod async_file_reader; mod builder; pub use async_file_reader::AsyncFileReader; -pub use builder::ReaderBuilder; +pub use builder::{ReaderBuilder, read_header_info}; #[cfg(feature = "object_store")] mod store; @@ -1281,6 +1286,44 @@ mod tests { assert_eq!(batch.num_rows(), 8); } + #[tokio::test] + async fn test_builder_with_header_info() { + let file = arrow_test_data("avro/alltypes_plain.avro"); + let store = Arc::new(LocalFileSystem::new()); + let location = Path::from_filesystem_path(&file).unwrap(); + + let file_size = store.head(&location).await.unwrap().size; + + let mut file_reader = AvroObjectReader::new(store, location); + + let header_info = read_header_info(&mut file_reader, file_size, None) + .await + .unwrap(); + + assert_eq!(header_info.header_len(), 675); + + let writer_schema = header_info.writer_schema().unwrap(); + let expected_avro_json: serde_json::Value = serde_json::from_str( + get_alltypes_schema() + .metadata() + .get(SCHEMA_METADATA_KEY) + .unwrap(), + ) + .unwrap(); + let actual_avro_json: serde_json::Value = + serde_json::from_str(&writer_schema.json_string).unwrap(); + assert_eq!(actual_avro_json, expected_avro_json); + + let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024) + .build_with_header(header_info) + .unwrap(); + + let batches: Vec = reader.try_collect().await.unwrap(); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 8) + } + #[tokio::test] async fn test_roundtrip_write_then_async_read() { use crate::writer::AvroWriter; diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs index b5efd8bcdb6..58a68dfa54a 100644 --- a/arrow-avro/src/reader/header.rs +++ b/arrow-avro/src/reader/header.rs @@ -20,12 +20,17 @@ use crate::compression::{CODEC_METADATA_KEY, CompressionCodec}; use crate::errors::AvroError; use crate::reader::vlq::VLQDecoder; -use crate::schema::{SCHEMA_METADATA_KEY, Schema}; +use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY, Schema}; use std::io::BufRead; +use std::str; +use std::sync::Arc; /// Read the Avro file header (magic, metadata, sync marker) from `reader`. -pub(crate) fn read_header(mut reader: R) -> Result { +/// +/// On success, returns the parsed [`Header`] and the number of bytes read from `reader`. +pub(crate) fn read_header(mut reader: R) -> Result<(Header, u64), AvroError> { let mut decoder = HeaderDecoder::default(); + let mut position = 0; loop { let buf = reader.fill_buf()?; if buf.is_empty() { @@ -34,12 +39,14 @@ pub(crate) fn read_header(mut reader: R) -> Result); + +struct HeaderInfoInner { + header: Header, + header_len: u64, +} + +/// Reads the Avro file header (magic, metadata, sync marker) from `reader`. +/// +/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes. +pub fn read_header_info(reader: R) -> Result { + let (header, header_len) = read_header(reader)?; + Ok(HeaderInfo::new(header, header_len)) +} + +impl HeaderInfo { + pub(crate) fn new(header: Header, header_len: u64) -> Self { + Self(Arc::new(HeaderInfoInner { header, header_len })) + } + + pub(crate) fn header(&self) -> &Header { + &self.0.header + } + + /// Returns the writer schema for this file. + pub fn writer_schema(&self) -> Result { + let raw = self.0.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| { + AvroError::ParseError("No Avro schema present in file header".to_string()) + })?; + let json_string = str::from_utf8(raw) + .map_err(|e| { + AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}")) + })? + .to_string(); + Ok(AvroSchema::new(json_string)) + } + + /// Returns the [`CompressionCodec`] if any + pub fn compression(&self) -> Result, AvroError> { + self.0.header.compression() + } + + /// Returns the length of the header in bytes. + pub fn header_len(&self) -> u64 { + self.0.header_len + } + + /// Returns the sync token for this file. + pub fn sync(&self) -> [u8; 16] { + self.0.header.sync() + } +} + /// A decoder for [`Header`] /// /// The avro file format does not encode the length of the header, and so it @@ -315,7 +380,7 @@ mod test { fn decode_file(file: &str) -> Header { let file = File::open(file).unwrap(); - read_header(BufReader::with_capacity(1000, file)).unwrap() + read_header(BufReader::with_capacity(1000, file)).unwrap().0 } #[test] diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 63b61b601e0..35d9dd1bb5c 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -500,7 +500,9 @@ mod record; mod vlq; #[cfg(feature = "async")] -mod async_reader; +pub mod async_reader; + +pub use header::{HeaderInfo, read_header_info}; #[cfg(feature = "object_store")] pub use async_reader::AvroObjectReader; @@ -1273,7 +1275,7 @@ impl ReaderBuilder { /// the discovered writer (and optional reader) schema, and prepares to iterate blocks, /// decompressing if necessary. pub fn build(self, mut reader: R) -> Result, ArrowError> { - let header = read_header(&mut reader)?; + let (header, _) = read_header(&mut reader)?; let decoder = self.make_decoder(Some(&header), self.reader_schema.as_ref())?; Ok(Reader { reader, @@ -1632,7 +1634,7 @@ mod test { fn load_writer_schema_json(path: &str) -> Value { let file = File::open(path).unwrap(); - let header = super::read_header(BufReader::new(file)).unwrap(); + let (header, _) = super::read_header(BufReader::new(file)).unwrap(); let schema = header.schema().unwrap().unwrap(); serde_json::to_value(&schema).unwrap() }