Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions ethexe/processor/src/handling/overlaid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
self, CommonRunContext, RunContext,
chunks_splitting::{ActorStateHashWithQueueSize, ExecutionChunks},
},
host::InstanceCreator,
host::{InstanceCreator, InstanceWrapper},
};
use core_processor::common::JournalNote;
use ethexe_common::{BlockHeader, db::CodesStorageRO, gear::MessageType};
Expand Down Expand Up @@ -178,14 +178,23 @@ impl RunContext for OverlaidRunContext {
&mut self.inner
}

fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)> {
fn program_code(
&self,
program_id: ActorId,
instrumentation_instance: &mut Option<InstanceWrapper>,
) -> Result<(InstrumentedCode, CodeMetadata)> {
let code_id = self
.inner
.db
.program_code_id(program_id)
.ok_or_else(|| ProcessorError::MissingCodeIdForProgram(program_id))?;

run::instrumented_code_and_metadata(&self.inner.db, code_id)
run::instrumented_code_and_metadata(
&self.inner.db,
&self.inner.instance_creator,
instrumentation_instance,
code_id,
)
}

fn states(&self, processing_queue_type: MessageType) -> Vec<ActorStateHashWithQueueSize> {
Expand Down
5 changes: 4 additions & 1 deletion ethexe/processor/src/handling/run/chunk_execution_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ pub async fn spawn_chunk_execution(
timestamp: block_header.timestamp,
};

let mut instrumentation_instance = None;

chunk
.into_iter()
.map(|(program_id, state_hash)| {
let (instrumented_code, code_metadata) = ctx.program_code(program_id)?;
let (instrumented_code, code_metadata) =
ctx.program_code(program_id, &mut instrumentation_instance)?;
let mut executor = ctx.inner().instance_creator.instantiate()?;
let db = ctx.inner().db.cas().clone_boxed();
let promise_sink = ctx.inner().promise_sink.clone();
Expand Down
64 changes: 52 additions & 12 deletions ethexe/processor/src/handling/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,18 @@ pub(super) mod chunks_splitting;

pub(crate) use chunks_splitting::ActorStateHashWithQueueSize;

use crate::{BoundPromiseSink, ProcessorError, Result, host::InstanceCreator};
use crate::{
BoundPromiseSink, ProcessorError, Result,
host::{InstanceCreator, InstanceWrapper},
};
use chunk_execution_processing::ChunkJournalsProcessingOutput;
use chunks_splitting::ExecutionChunks;
use core_processor::common::JournalNote;
use ethexe_common::{
BlockHeader, CALL_REPLY_SOFT_LIMIT, OUTGOING_MESSAGES_BYTES_SOFT_LIMIT,
OUTGOING_MESSAGES_SOFT_LIMIT, PROGRAM_MODIFICATIONS_SOFT_LIMIT, PromisePolicy,
StateHashWithQueueSize,
db::CodesStorageRO,
db::{CodesStorageRO, CodesStorageRW},
gear::{CHUNK_PROCESSING_GAS_LIMIT, MessageType},
};
use ethexe_db::{CASDatabase, Database};
Expand Down Expand Up @@ -209,7 +212,11 @@ pub(super) enum LimitsStatus {
/// between common and overlaid execution contexts. It's not meant
/// to emphasize any particular trait/feature/abstraction.
pub(super) trait RunContext {
fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)>;
fn program_code(
&self,
program_id: ActorId,
instrumentation_instance: &mut Option<InstanceWrapper>,
) -> Result<(InstrumentedCode, CodeMetadata)>;

/// Get reference to inner.
fn inner(&self) -> &CommonRunContext;
Expand Down Expand Up @@ -339,7 +346,7 @@ pub(super) trait RunContext {
pub(crate) struct CommonRunContext {
pub(super) db: Database,
pub(super) transitions: InBlockTransitions,
instance_creator: InstanceCreator,
pub(super) instance_creator: InstanceCreator,
gas_allowance_counter: GasAllowanceCounter,
outgoing_messages_limiter: u32,
outgoing_messages_bytes_limiter: u32,
Expand Down Expand Up @@ -395,7 +402,11 @@ impl CommonRunContext {
}

impl RunContext for CommonRunContext {
fn program_code(&self, program_id: ActorId) -> Result<(InstrumentedCode, CodeMetadata)> {
fn program_code(
&self,
program_id: ActorId,
instrumentation_instance: &mut Option<InstanceWrapper>,
) -> Result<(InstrumentedCode, CodeMetadata)> {
let code_id = self
.transitions
.registered_programs()
Expand All @@ -407,7 +418,12 @@ impl RunContext for CommonRunContext {
.ok_or_else(|| ProcessorError::MissingCodeIdForProgram(program_id))
})?;

instrumented_code_and_metadata(&self.db, code_id)
instrumented_code_and_metadata(
&self.db,
&self.instance_creator,
instrumentation_instance,
code_id,
)
}

fn states(&self, processing_queue_type: MessageType) -> Vec<ActorStateHashWithQueueSize> {
Expand All @@ -425,14 +441,38 @@ impl RunContext for CommonRunContext {

pub(super) fn instrumented_code_and_metadata(
db: &Database,
instance_creator: &InstanceCreator,
instrumentation_instance: &mut Option<InstanceWrapper>,
code_id: CodeId,
) -> Result<(InstrumentedCode, CodeMetadata)> {
db.instrumented_code(ethexe_runtime_common::VERSION, code_id)
.and_then(|instrumented_code| {
db.code_metadata(code_id)
.map(|metadata| (instrumented_code, metadata))
})
.ok_or_else(|| ProcessorError::MissingInstrumentedCodeForProgram(code_id))
if let Some(instrumented_code) = db.instrumented_code(ethexe_runtime_common::VERSION, code_id)
&& let Some(metadata) = db.code_metadata(code_id)
{
return Ok((instrumented_code, metadata));
}

let original_code = db
.original_code(code_id)
.ok_or(ProcessorError::MissingOriginalCodeForProgram(code_id))?;

if instrumentation_instance.is_none() {
*instrumentation_instance = Some(instance_creator.instantiate()?);
}
let instance = instrumentation_instance
.as_mut()
.expect("instrumentation instance was just initialized");
let (instrumented_code, code_metadata) = instance
.instrument(&original_code)?
.ok_or(ProcessorError::MissingInstrumentedCodeForProgram(code_id))?;

db.set_instrumented_code(
ethexe_runtime_common::VERSION,
code_id,
instrumented_code.clone(),
);
db.set_code_metadata(code_id, code_metadata.clone());
Comment on lines +468 to +473
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When db is an overlaid database (e.g., during RPC execute_for_reply), these set calls only update the in-memory overlay. This means every RPC call for the same uninstrumented program will re-perform instrumentation. Since instrumentation is a computationally heavy operation, this might be a performance bottleneck for RPC. Ideally, deterministic instrumentation results should be cached in the base database, but the current Database abstraction makes this difficult without bypassing the overlay. Given this is a fallback path, it's acceptable, but worth noting for future optimization.

References
  1. Code instrumentation is recognized as a computationally heavy 'main operation' in this repository, and its performance impact should be considered during implementation.


Ok((instrumented_code, code_metadata))
}

pub(super) fn states(
Expand Down
3 changes: 3 additions & 0 deletions ethexe/processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ pub enum ProcessorError {
#[error("missing instrumented code for code id {0}")]
MissingInstrumentedCodeForProgram(CodeId),

#[error("missing original code for code id {0}")]
MissingOriginalCodeForProgram(CodeId),

#[error("injected message {0:?} was sent to uninitialized program")]
InjectedToUninitializedProgram(Box<InjectedTransaction>),

Expand Down
54 changes: 54 additions & 0 deletions ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,60 @@ async fn handle_new_code_invalid() {
);
}

#[tokio::test]
async fn process_programs_instruments_valid_code_missing_current_runtime_instrumentation() {
init_logger();

let db = Database::memory();
let mut processor = Processor::new(db.clone()).expect("failed to create processor");
let chain = BlockChain::mock(2).setup(&db);
let block1 = chain.blocks[1].to_simple();

let (code_id, code) = utils::wat_to_wasm(utils::VALID_PROGRAM);
assert_eq!(db.set_original_code(&code), code_id);
db.set_code_valid(code_id, true);
assert!(db.instrumented_code(RUNTIME_ID, code_id).is_none());

let actor_id = ActorId::from(0x10000);
let mut handler = setup_handler(db.clone(), block1);
handler
.handle_router_event(RouterRequestEvent::ProgramCreated(ProgramCreatedEvent {
actor_id,
code_id,
}))
.expect("failed to create new program");
handler
.handle_mirror_event(
actor_id,
MirrorRequestEvent::ExecutableBalanceTopUpRequested(
ExecutableBalanceTopUpRequestedEvent {
value: 350_000_000_000,
},
),
)
.expect("failed to top up balance");
handler
.handle_mirror_event(
actor_id,
MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::from(1),
source: ActorId::from(10),
payload: vec![],
value: 0,
call_reply: false,
}),
)
.expect("failed to queue message");

processor
.process_queues(handler.transitions, block1, DEFAULT_BLOCK_GAS_LIMIT, None)
.await
.expect("failed to process queues");

assert!(db.instrumented_code(RUNTIME_ID, code_id).is_some());
assert!(db.code_metadata(code_id).is_some());
}

#[tokio::test]
async fn ping_pong() {
init_logger();
Expand Down
Loading