diff --git a/Cargo.lock b/Cargo.lock index 40f811e3e..d236bf283 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6625,6 +6625,15 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "memmap2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "744133e4a0e0a658e1374cf3bf8e415c4052a15a111acd372764c55b4177d490" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -9792,6 +9801,7 @@ dependencies = [ "fs-err", "futures", "governor 0.10.4", + "memmap2", "monitoring", "prost 0.13.5", "reqwest", diff --git a/crates/extractors/solana/Cargo.toml b/crates/extractors/solana/Cargo.toml index 64a0e82ff..9f1b79666 100644 --- a/crates/extractors/solana/Cargo.toml +++ b/crates/extractors/solana/Cargo.toml @@ -19,6 +19,7 @@ futures.workspace = true fs-err.workspace = true governor.workspace = true monitoring = { path = "../../core/monitoring" } +memmap2 = "0.9.9" reqwest.workspace = true schemars = { workspace = true, optional = true } serde.workspace = true diff --git a/crates/extractors/solana/src/of1_client.rs b/crates/extractors/solana/src/of1_client.rs index e4eb2cbe0..e5a15e4f6 100644 --- a/crates/extractors/solana/src/of1_client.rs +++ b/crates/extractors/solana/src/of1_client.rs @@ -280,9 +280,8 @@ pub(crate) fn stream( } let dest = car_directory.join(local_car_filename(epoch)); - - let buf_reader = match tokio::fs::File::open(&dest).await.map(tokio::io::BufReader::new) { - Ok(reader) => reader, + let file = match fs_err::File::open(dest) { + Ok(f) => f, Err(e) => { car_manager_tx .send(CarManagerMessage::FileProcessed(epoch)) @@ -292,7 +291,20 @@ pub(crate) fn stream( return; } }; - let mut node_reader = car_parser::node::NodeReader::new(buf_reader); + // SAFETY: The file is not modified/deleted while the mmap is in use. + let mmap = match unsafe { memmap2::Mmap::map(&file) } { + Ok(mmap) => mmap, + Err(e) => { + car_manager_tx + .send(CarManagerMessage::FileProcessed(epoch)) + .await + .expect("receiver not dropped"); + yield Err(e.into()); + return; + } + }; + + let mut node_reader = car_parser::node::NodeReader::new(&mmap[..]); while let Some(block) = read_entire_block(&mut node_reader, prev_blockhash).await.transpose() { let block = match block {