Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 80 additions & 39 deletions arrow-avro/src/reader/async_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::codec::AvroFieldBuilder;
use crate::errors::AvroError;
use crate::reader::async_reader::ReaderState;
use crate::reader::header::{Header, HeaderDecoder};
use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
use crate::reader::record::RecordDecoder;
use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
Expand Down Expand Up @@ -109,50 +109,71 @@ impl<R> ReaderBuilder<R> {
}
}

impl<R: AsyncFileReader> ReaderBuilder<R> {
async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
let range_to_fetch = position
..(position + self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
.min(self.file_size);
/// Reads the Avro file header (magic, metadata, sync marker) asynchronously from `reader`.
///
/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes.
pub async fn read_header_info<R>(
reader: &mut R,
file_size: u64,
header_size_hint: Option<u64>,
) -> Result<HeaderInfo, AvroError>
where
R: AsyncFileReader,
{
read_header(reader, file_size, header_size_hint)
.await
.map(|(header, header_len)| HeaderInfo::new(header, header_len))
}

// Maybe EOF after the header, no actual data
if range_to_fetch.is_empty() {
break;
}
async fn read_header<R>(
reader: &mut R,
file_size: u64,
header_size_hint: Option<u64>,
) -> Result<(Header, u64), AvroError>
where
R: AsyncFileReader,
{
let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
let range_to_fetch = position
..(position + header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);

let current_data = self
.reader
.get_bytes(range_to_fetch.clone())
.await
.map_err(|err| {
AvroError::General(format!(
"Error fetching Avro header from file reader: {err}"
))
})?;
if current_data.is_empty() {
return Err(AvroError::EOF(
"Unexpected EOF while fetching header data".into(),
));
}
// Maybe EOF after the header, no actual data
if range_to_fetch.is_empty() {
break;
}

let read = current_data.len();
let decoded = decoder.decode(&current_data)?;
if decoded != read {
position += decoded as u64;
break;
}
position += read as u64;
let current_data = reader
.get_bytes(range_to_fetch.clone())
.await
.map_err(|err| {
AvroError::General(format!(
"Error fetching Avro header from file reader: {err}"
))
})?;
if current_data.is_empty() {
return Err(AvroError::EOF(
"Unexpected EOF while fetching header data".into(),
));
}

decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
let read = current_data.len();
let decoded = decoder.decode(&current_data)?;
if decoded != read {
position += decoded as u64;
break;
}
position += read as u64;
}

decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".into()))
}

impl<R: AsyncFileReader> ReaderBuilder<R> {
/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, AvroError> {
Expand All @@ -162,7 +183,27 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {

// Start by reading the header from the beginning of the avro file
// take the writer schema from the header
let (header, header_len) = self.read_header().await?;
let (header, header_len) =
read_header(&mut self.reader, self.file_size, self.header_size_hint).await?;
self.build_internal(&header, header_len)
}

/// Build the asynchronous Avro reader with the provided header.
///
/// This allows initializing the reader with pre-parsed header information.
/// Note that this method is not async because it does not need to perform any I/O operations.
pub fn build_with_header(
self,
header_info: HeaderInfo,
) -> Result<AsyncAvroFileReader<R>, AvroError> {
self.build_internal(header_info.header(), header_info.header_len())
}

fn build_internal(
self,
header: &Header,
header_len: u64,
) -> Result<AsyncAvroFileReader<R>, AvroError> {
let writer_schema = {
let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
Expand Down
45 changes: 44 additions & 1 deletion arrow-avro/src/reader/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

//! Asyncronous implementation of Avro file reader.
//!
//! This module provides [`AsyncAvroFileReader`], which supports reading and decoding
//! the Avro OCF format from any source that implements [`AsyncFileReader`].

use crate::compression::CompressionCodec;
use crate::reader::Decoder;
use crate::reader::block::{BlockDecoder, BlockDecoderState};
Expand All @@ -32,7 +37,7 @@ mod async_file_reader;
mod builder;

pub use async_file_reader::AsyncFileReader;
pub use builder::ReaderBuilder;
pub use builder::{ReaderBuilder, read_header_info};

#[cfg(feature = "object_store")]
mod store;
Expand Down Expand Up @@ -1281,6 +1286,44 @@ mod tests {
assert_eq!(batch.num_rows(), 8);
}

#[tokio::test]
async fn test_builder_with_header_info() {
let file = arrow_test_data("avro/alltypes_plain.avro");
let store = Arc::new(LocalFileSystem::new());
let location = Path::from_filesystem_path(&file).unwrap();

let file_size = store.head(&location).await.unwrap().size;

let mut file_reader = AvroObjectReader::new(store, location);

let header_info = read_header_info(&mut file_reader, file_size, None)
.await
.unwrap();

assert_eq!(header_info.header_len(), 675);

let writer_schema = header_info.writer_schema().unwrap();
let expected_avro_json: serde_json::Value = serde_json::from_str(
get_alltypes_schema()
.metadata()
.get(SCHEMA_METADATA_KEY)
.unwrap(),
)
.unwrap();
let actual_avro_json: serde_json::Value =
serde_json::from_str(&writer_schema.json_string).unwrap();
assert_eq!(actual_avro_json, expected_avro_json);

let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
.build_with_header(header_info)
.unwrap();

let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();

let batch = &batches[0];
assert_eq!(batch.num_rows(), 8)
}

#[tokio::test]
async fn test_roundtrip_write_then_async_read() {
use crate::writer::AvroWriter;
Expand Down
71 changes: 68 additions & 3 deletions arrow-avro/src/reader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
use crate::errors::AvroError;
use crate::reader::vlq::VLQDecoder;
use crate::schema::{SCHEMA_METADATA_KEY, Schema};
use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY, Schema};
use std::io::BufRead;
use std::str;
use std::sync::Arc;

/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header, AvroError> {
///
/// On success, returns the parsed [`Header`] and the number of bytes read from `reader`.
pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<(Header, u64), AvroError> {
let mut decoder = HeaderDecoder::default();
let mut position = 0;
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
Expand All @@ -34,12 +39,14 @@ pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header, AvroError
let read = buf.len();
let decoded = decoder.decode(buf)?;
reader.consume(decoded);
position += decoded as u64;
if decoded != read {
break;
}
}
decoder
.flush()
.map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro header".to_string()))
}

Expand Down Expand Up @@ -124,6 +131,64 @@ impl Header {
}
}

/// Header information for an Avro OCF file.
///
/// The header can be parsed once and shared used to construct multiple readers
/// for the same file, and so this struct is designed to be cheaply clonable.
#[derive(Clone)]
pub struct HeaderInfo(Arc<HeaderInfoInner>);

struct HeaderInfoInner {
header: Header,
header_len: u64,
}

/// Reads the Avro file header (magic, metadata, sync marker) from `reader`.
///
/// On success, returns the parsed [`HeaderInfo`] containing the header and its length in bytes.
pub fn read_header_info<R: BufRead>(reader: R) -> Result<HeaderInfo, AvroError> {
let (header, header_len) = read_header(reader)?;
Ok(HeaderInfo::new(header, header_len))
}

impl HeaderInfo {
pub(crate) fn new(header: Header, header_len: u64) -> Self {
Self(Arc::new(HeaderInfoInner { header, header_len }))
}

pub(crate) fn header(&self) -> &Header {
&self.0.header
}

/// Returns the writer schema for this file.
pub fn writer_schema(&self) -> Result<AvroSchema, AvroError> {
let raw = self.0.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
AvroError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = str::from_utf8(raw)
.map_err(|e| {
AvroError::ParseError(format!("Invalid UTF-8 in Avro schema header: {e}"))
})?
.to_string();
Ok(AvroSchema::new(json_string))
}

/// Returns the [`CompressionCodec`] if any
pub fn compression(&self) -> Result<Option<CompressionCodec>, AvroError> {
self.0.header.compression()
}

/// Returns the length of the header in bytes.
pub fn header_len(&self) -> u64 {
self.0.header_len
}

/// Returns the sync token for this file.
pub fn sync(&self) -> [u8; 16] {
self.0.header.sync()
}
}

/// A decoder for [`Header`]
///
/// The avro file format does not encode the length of the header, and so it
Expand Down Expand Up @@ -315,7 +380,7 @@ mod test {

fn decode_file(file: &str) -> Header {
let file = File::open(file).unwrap();
read_header(BufReader::with_capacity(1000, file)).unwrap()
read_header(BufReader::with_capacity(1000, file)).unwrap().0
}

#[test]
Expand Down
8 changes: 5 additions & 3 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,9 @@ mod record;
mod vlq;

#[cfg(feature = "async")]
mod async_reader;
pub mod async_reader;

pub use header::{HeaderInfo, read_header_info};

#[cfg(feature = "object_store")]
pub use async_reader::AvroObjectReader;
Expand Down Expand Up @@ -1273,7 +1275,7 @@ impl ReaderBuilder {
/// the discovered writer (and optional reader) schema, and prepares to iterate blocks,
/// decompressing if necessary.
pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
let header = read_header(&mut reader)?;
let (header, _) = read_header(&mut reader)?;
let decoder = self.make_decoder(Some(&header), self.reader_schema.as_ref())?;
Ok(Reader {
reader,
Expand Down Expand Up @@ -1632,7 +1634,7 @@ mod test {

fn load_writer_schema_json(path: &str) -> Value {
let file = File::open(path).unwrap();
let header = super::read_header(BufReader::new(file)).unwrap();
let (header, _) = super::read_header(BufReader::new(file)).unwrap();
let schema = header.schema().unwrap().unwrap();
serde_json::to_value(&schema).unwrap()
}
Expand Down
Loading