diff --git a/ethexe/processor/src/handling/overlaid.rs b/ethexe/processor/src/handling/overlaid.rs index b2b7f53f147..b1750609c43 100644 --- a/ethexe/processor/src/handling/overlaid.rs +++ b/ethexe/processor/src/handling/overlaid.rs @@ -22,7 +22,7 @@ use crate::{ self, CommonRunContext, RunContext, chunks_splitting::{ActorStateHashWithQueueSize, ExecutionChunks}, }, - host::InstanceCreator, + host::{InstanceCreator, InstanceWrapper}, }; use core_processor::common::JournalNote; use ethexe_common::{BlockHeader, db::CodesStorageRO, gear::MessageType}; @@ -178,14 +178,23 @@ impl RunContext for OverlaidRunContext { &mut self.inner } - fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)> { + fn program_code( + &self, + program_id: ActorId, + instrumentation_instance: &mut Option, + ) -> Result<(InstrumentedCode, CodeMetadata)> { let code_id = self .inner .db .program_code_id(program_id) .ok_or_else(|| ProcessorError::MissingCodeIdForProgram(program_id))?; - run::instrumented_code_and_metadata(&self.inner.db, code_id) + run::instrumented_code_and_metadata( + &self.inner.db, + &self.inner.instance_creator, + instrumentation_instance, + code_id, + ) } fn states(&self, processing_queue_type: MessageType) -> Vec { diff --git a/ethexe/processor/src/handling/run/chunk_execution_spawn.rs b/ethexe/processor/src/handling/run/chunk_execution_spawn.rs index 3360de22cb4..052b2bd6760 100644 --- a/ethexe/processor/src/handling/run/chunk_execution_spawn.rs +++ b/ethexe/processor/src/handling/run/chunk_execution_spawn.rs @@ -53,10 +53,13 @@ pub async fn spawn_chunk_execution( timestamp: block_header.timestamp, }; + let mut instrumentation_instance = None; + chunk .into_iter() .map(|(program_id, state_hash)| { - let (instrumented_code, code_metadata) = ctx.program_code(program_id)?; + let (instrumented_code, code_metadata) = + ctx.program_code(program_id, &mut instrumentation_instance)?; let mut executor = ctx.inner().instance_creator.instantiate()?; let db = ctx.inner().db.cas().clone_boxed(); let promise_sink = ctx.inner().promise_sink.clone(); diff --git a/ethexe/processor/src/handling/run/mod.rs b/ethexe/processor/src/handling/run/mod.rs index 50564fe9079..94145caf978 100644 --- a/ethexe/processor/src/handling/run/mod.rs +++ b/ethexe/processor/src/handling/run/mod.rs @@ -110,7 +110,10 @@ pub(super) mod chunks_splitting; pub(crate) use chunks_splitting::ActorStateHashWithQueueSize; -use crate::{BoundPromiseSink, ProcessorError, Result, host::InstanceCreator}; +use crate::{ + BoundPromiseSink, ProcessorError, Result, + host::{InstanceCreator, InstanceWrapper}, +}; use chunk_execution_processing::ChunkJournalsProcessingOutput; use chunks_splitting::ExecutionChunks; use core_processor::common::JournalNote; @@ -118,7 +121,7 @@ use ethexe_common::{ BlockHeader, CALL_REPLY_SOFT_LIMIT, OUTGOING_MESSAGES_BYTES_SOFT_LIMIT, OUTGOING_MESSAGES_SOFT_LIMIT, PROGRAM_MODIFICATIONS_SOFT_LIMIT, PromisePolicy, StateHashWithQueueSize, - db::CodesStorageRO, + db::{CodesStorageRO, CodesStorageRW}, gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType}, }; use ethexe_db::{CASDatabase, Database}; @@ -209,7 +212,11 @@ pub(super) enum LimitsStatus { /// between common and overlaid execution contexts. It's not meant /// to emphasize any particular trait/feature/abstraction. pub(super) trait RunContext { - fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)>; + fn program_code( + &self, + program_id: ActorId, + instrumentation_instance: &mut Option, + ) -> Result<(InstrumentedCode, CodeMetadata)>; /// Get reference to inner. fn inner(&self) -> &CommonRunContext; @@ -339,7 +346,7 @@ pub(super) trait RunContext { pub(crate) struct CommonRunContext { pub(super) db: Database, pub(super) transitions: InBlockTransitions, - instance_creator: InstanceCreator, + pub(super) instance_creator: InstanceCreator, gas_allowance_counter: GasAllowanceCounter, outgoing_messages_limiter: u32, outgoing_messages_bytes_limiter: u32, @@ -395,7 +402,11 @@ impl CommonRunContext { } impl RunContext for CommonRunContext { - fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)> { + fn program_code( + &self, + program_id: ActorId, + instrumentation_instance: &mut Option, + ) -> Result<(InstrumentedCode, CodeMetadata)> { let code_id = self .transitions .registered_programs() @@ -407,7 +418,12 @@ impl RunContext for CommonRunContext { .ok_or_else(|| ProcessorError::MissingCodeIdForProgram(program_id)) })?; - instrumented_code_and_metadata(&self.db, code_id) + instrumented_code_and_metadata( + &self.db, + &self.instance_creator, + instrumentation_instance, + code_id, + ) } fn states(&self, processing_queue_type: MessageType) -> Vec { @@ -425,14 +441,38 @@ impl RunContext for CommonRunContext { pub(super) fn instrumented_code_and_metadata( db: &Database, + instance_creator: &InstanceCreator, + instrumentation_instance: &mut Option, code_id: CodeId, ) -> Result<(InstrumentedCode, CodeMetadata)> { - db.instrumented_code(ethexe_runtime_common::VERSION, code_id) - .and_then(|instrumented_code| { - db.code_metadata(code_id) - .map(|metadata| (instrumented_code, metadata)) - }) - .ok_or_else(|| ProcessorError::MissingInstrumentedCodeForProgram(code_id)) + if let Some(instrumented_code) = db.instrumented_code(ethexe_runtime_common::VERSION, code_id) + && let Some(metadata) = db.code_metadata(code_id) + { + return Ok((instrumented_code, metadata)); + } + + let original_code = db + .original_code(code_id) + .ok_or(ProcessorError::MissingOriginalCodeForProgram(code_id))?; + + if instrumentation_instance.is_none() { + *instrumentation_instance = Some(instance_creator.instantiate()?); + } + let instance = instrumentation_instance + .as_mut() + .expect("instrumentation instance was just initialized"); + let (instrumented_code, code_metadata) = instance + .instrument(&original_code)? + .ok_or(ProcessorError::MissingInstrumentedCodeForProgram(code_id))?; + + db.set_instrumented_code( + ethexe_runtime_common::VERSION, + code_id, + instrumented_code.clone(), + ); + db.set_code_metadata(code_id, code_metadata.clone()); + + Ok((instrumented_code, code_metadata)) } pub(super) fn states( diff --git a/ethexe/processor/src/lib.rs b/ethexe/processor/src/lib.rs index 59fa273f7bb..fdce36c8761 100644 --- a/ethexe/processor/src/lib.rs +++ b/ethexe/processor/src/lib.rs @@ -198,6 +198,9 @@ pub enum ProcessorError { #[error("missing instrumented code for code id {0}")] MissingInstrumentedCodeForProgram(CodeId), + #[error("missing original code for code id {0}")] + MissingOriginalCodeForProgram(CodeId), + #[error("injected message {0:?} was sent to uninitialized program")] InjectedToUninitializedProgram(Box), diff --git a/ethexe/processor/src/tests.rs b/ethexe/processor/src/tests.rs index d8ecf631cda..62a3ecf5ac1 100644 --- a/ethexe/processor/src/tests.rs +++ b/ethexe/processor/src/tests.rs @@ -318,6 +318,60 @@ async fn handle_new_code_invalid() { ); } +#[tokio::test] +async fn process_programs_instruments_valid_code_missing_current_runtime_instrumentation() { + init_logger(); + + let db = Database::memory(); + let mut processor = Processor::new(db.clone()).expect("failed to create processor"); + let chain = BlockChain::mock(2).setup(&db); + let block1 = chain.blocks[1].to_simple(); + + let (code_id, code) = utils::wat_to_wasm(utils::VALID_PROGRAM); + assert_eq!(db.set_original_code(&code), code_id); + db.set_code_valid(code_id, true); + assert!(db.instrumented_code(RUNTIME_ID, code_id).is_none()); + + let actor_id = ActorId::from(0x10000); + let mut handler = setup_handler(db.clone(), block1); + handler + .handle_router_event(RouterRequestEvent::ProgramCreated(ProgramCreatedEvent { + actor_id, + code_id, + })) + .expect("failed to create new program"); + handler + .handle_mirror_event( + actor_id, + MirrorRequestEvent::ExecutableBalanceTopUpRequested( + ExecutableBalanceTopUpRequestedEvent { + value: 350_000_000_000, + }, + ), + ) + .expect("failed to top up balance"); + handler + .handle_mirror_event( + actor_id, + MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent { + id: MessageId::from(1), + source: ActorId::from(10), + payload: vec![], + value: 0, + call_reply: false, + }), + ) + .expect("failed to queue message"); + + processor + .process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None) + .await + .expect("failed to process queues"); + + assert!(db.instrumented_code(RUNTIME_ID, code_id).is_some()); + assert!(db.code_metadata(code_id).is_some()); +} + #[tokio::test] async fn ping_pong() { init_logger();