Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Process:
description: str = dataclasses.field(compare=False)
level: LogLevel
input_digest: Digest
stdin: bytes
immutable_input_digests: FrozenDict[str, Digest]
use_nailgun: tuple[str, ...]
working_directory: str | None
Expand All @@ -130,6 +131,7 @@ def __init__(
description: str,
level: LogLevel = LogLevel.INFO,
input_digest: Digest = EMPTY_DIGEST,
stdin: bytes = b"",
immutable_input_digests: Mapping[str, Digest] | None = None,
use_nailgun: Iterable[str] = (),
working_directory: str | None = None,
Expand Down Expand Up @@ -158,6 +160,10 @@ def __init__(
512KB will be read-only unless they are globbed as part of either `output_files` or
`output_directories`.

To provide data to the process's standard input, use the `stdin` parameter with a bytes
object. The bytes will be piped to the process's stdin. Processes with stdin will
automatically be forced to local execution, since remote execution does not support stdin.

Often, you will want to capture the files/directories created in the process. To do this,
you can either set `output_files` or `output_directories`. The specified paths should be
specified relative to the `working_directory`, if any, and will then be used to populate
Expand All @@ -181,6 +187,7 @@ def __init__(
object.__setattr__(self, "description", description)
object.__setattr__(self, "level", level)
object.__setattr__(self, "input_digest", input_digest)
object.__setattr__(self, "stdin", stdin)
object.__setattr__(
self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
)
Expand Down
48 changes: 48 additions & 0 deletions src/python/pants/engine/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,54 @@ def test_concurrency_templating(rule_runner: RuleRunner) -> None:
assert result.stderr == b""


def test_stdin(rule_runner: RuleRunner) -> None:
"""Test that stdin parameter properly pipes data to the process."""
# Provide content via stdin parameter
stdin_content = b"Hello from stdin!\nLine 2\nLine 3\n"

# Use /bin/cat to read from stdin and output to stdout
process = Process(
argv=("/bin/cat",),
stdin=stdin_content,
description="test stdin piping",
)
result = rule_runner.request(ProcessResult, [process])

# The output should match the input we provided via stdin
assert result.stdout == stdin_content
assert result.stderr == b""


def test_stdin_with_grep(rule_runner: RuleRunner) -> None:
"""Test that stdin works with filtering commands like grep."""
# Provide stdin content with multiple lines
stdin_content = b"apple\nbanana\ncherry\napricot\nblueberry\n"

# Use grep to filter lines starting with 'a'
process = Process(
argv=("/bin/grep", "^a"),
stdin=stdin_content,
description="test stdin with grep",
)
result = rule_runner.request(ProcessResult, [process])

# Should only output lines starting with 'a'
assert result.stdout == b"apple\napricot\n"
assert result.stderr == b""


def test_stdin_empty(rule_runner: RuleRunner) -> None:
"""Test that processes work normally when stdin is not provided."""
# Process without stdin should work normally
process = Process(
argv=("/bin/echo", "hello"),
description="test no stdin",
)
result = rule_runner.request(ProcessResult, [process])

assert result.stdout == b"hello\n"


def test_concurrency_enum():
exactly_one = ProcessConcurrency.exactly(1)
min_one = ProcessConcurrency.range(1, min=1)
Expand Down
43 changes: 42 additions & 1 deletion src/rust/engine/src/nodes/execute_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use process_execution::{
use pyo3::Bound;
use pyo3::prelude::{PyAny, Python};
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyAnyMethods;
use store::{self, Store, StoreError};
use workunit_store::{
Metric, ObservationMetric, RunningWorkunit, UserMetadataItem, WorkunitMetadata,
Expand Down Expand Up @@ -82,6 +83,18 @@ impl ExecuteProcess {
.map(RelativePath::new)
.transpose()?;

let stdin = {
let stdin_py: Bound<'_, PyAny> = externs::getattr(value, "stdin")?;
let stdin_bytes: Vec<u8> = stdin_py
.extract()
.map_err(|e| format!("Failed to extract `stdin` bytes: {e}"))?;
if stdin_bytes.is_empty() {
None
} else {
Some(stdin_bytes)
}
};

let output_files = externs::getattr::<Vec<String>>(value, "output_files")?
.into_iter()
.map(RelativePath::new)
Expand Down Expand Up @@ -160,11 +173,33 @@ impl ExecuteProcess {

let attempt = externs::getattr::<usize>(value, "attempt").unwrap_or(0);

// Force local execution if stdin is provided, since remote execution doesn't support stdin.
// The Bazel Remote Execution API's Command proto has no stdin field, so we transparently
// fall back to local execution rather than failing.
let execution_environment = if stdin.is_some()
&& !matches!(
process_config.environment.strategy,
ProcessExecutionStrategy::Local
)
{
log::info!(
"Process requires stdin; forcing local execution for: {}",
description
);
process_execution::ProcessExecutionEnvironment {
strategy: ProcessExecutionStrategy::Local,
..process_config.environment
}
} else {
process_config.environment
};

Ok(Process {
argv: externs::getattr(value, "argv")?,
env,
working_directory,
input_digests,
stdin,
output_files,
output_directories,
timeout,
Expand All @@ -176,7 +211,7 @@ impl ExecuteProcess {
concurrency_available,
concurrency,
cache_scope,
execution_environment: process_config.environment,
execution_environment,
remote_cache_speculation_delay,
attempt,
})
Expand All @@ -187,10 +222,15 @@ impl ExecuteProcess {
value: Value,
process_config: externs::process::PyProcessExecutionEnvironment,
) -> Result<Self, StoreError> {
log::debug!("ExecuteProcess::lift starting");
let input_digests = Self::lift_process_input_digests(store, &value).await?;
log::debug!("ExecuteProcess::lift input_digests lifted");
let process = Python::attach(|py| {
log::debug!("ExecuteProcess::lift calling lift_process_fields");
Self::lift_process_fields(value.bind(py), input_digests, process_config)
})?;
log::debug!("ExecuteProcess::lift process fields lifted, stdin_bytes={}",
process.stdin.as_ref().map(|b| b.len()).unwrap_or(0));
Ok(Self { process })
}

Expand All @@ -200,6 +240,7 @@ impl ExecuteProcess {
workunit: &mut RunningWorkunit,
backtrack_level: usize,
) -> NodeResult<ProcessResult> {
log::info!("ExecuteProcess::run_node starting for: {}", self.process.description);
let request = self.process;

let command_runner = context
Expand Down
10 changes: 10 additions & 0 deletions src/rust/process_execution/remote/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,16 @@ impl process_execution::CommandRunner for CommandRunner {
_workunit: &mut RunningWorkunit,
request: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
// Processes with stdin should have been forced to local execution in lift_process_fields.
// This assertion ensures they never reach the remote execution code path.
if request.stdin.is_some() {
return Err(ProcessError::Unclassified(
"Internal error: Process with stdin reached remote execution. \
Stdin should have been handled by forcing local execution."
.to_string(),
));
}

// Retrieve capabilities for this server.
let capabilities = self.get_capabilities().await?;
trace!("RE capabilities: {:?}", &capabilities);
Expand Down
29 changes: 29 additions & 0 deletions src/rust/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub const CACHE_KEY_SALT_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_SALT";
// CommandRunner.
pub const CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_TARGET_PLATFORM";

// Environment variable which includes stdin content in the cache key to ensure processes with
// different stdin have different cache keys, even though the Remote Execution API doesn't support stdin.
pub const CACHE_KEY_STDIN_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_STDIN";

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ProcessError {
/// A Digest was not present in either of the local or remote Stores.
Expand Down Expand Up @@ -605,6 +609,8 @@ pub struct Process {
///
pub input_digests: InputDigests,

///

pub output_files: BTreeSet<RelativePath>,

pub output_directories: BTreeSet<RelativePath>,
Expand Down Expand Up @@ -673,6 +679,13 @@ pub struct Process {
/// This is included in hash/eq so it creates a unique node in the runtime graph.
///
pub attempt: usize,

///
/// Optional bytes to pipe to the process's standard input.
///
/// If set, these bytes will be piped to the process's stdin.
///
pub stdin: Option<Vec<u8>>,
}

impl Process {
Expand Down Expand Up @@ -712,6 +725,7 @@ impl Process {
},
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
attempt: 0,
stdin: None,
}
}

Expand Down Expand Up @@ -1336,6 +1350,7 @@ pub async fn make_execute_request(
if name == CACHE_KEY_GEN_VERSION_ENV_VAR_NAME
|| name == CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME
|| name == CACHE_KEY_SALT_ENV_VAR_NAME
|| name == CACHE_KEY_STDIN_ENV_VAR_NAME
{
return Err(format!(
"Cannot set env var with name {name} as that is reserved for internal use by pants"
Expand Down Expand Up @@ -1473,6 +1488,20 @@ pub async fn make_execute_request(
// Store the separate copy back into the Command proto.
command.platform = Some(command_platform);

// If stdin is provided, add a hash of its content as a synthetic environment variable to
// ensure it affects the cache key. This ensures processes with different stdin content get
// different cache keys, even though the remote execution API doesn't support stdin directly.
if let Some(ref stdin_bytes) = req.stdin {
// Compute the digest of the stdin content for the cache key
let stdin_digest = Digest::of_bytes(stdin_bytes);
command
.environment_variables
.push(remexec::command::EnvironmentVariable {
name: CACHE_KEY_STDIN_ENV_VAR_NAME.to_string(),
value: format!("{:?}", stdin_digest), // Format as "Digest { hash: ..., size_bytes: ... }"
});
}

// Sort the environment variables. REv2 spec requires sorting by name for same reasons that
// platform properties are sorted, i.e. consistent hashing.
command
Expand Down
33 changes: 32 additions & 1 deletion src/rust/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ impl CapturedWorkdir for CommandRunner {
req: Process,
exclusive_spawn: bool,
) -> Result<BoxStream<'r, Result<ChildOutput, String>>, CapturedWorkdirError> {
// Get stdin bytes from the process if provided
let stdin_bytes = req.stdin.as_ref().map(|bytes| bytes.clone());

let cwd = if let Some(ref working_directory) = req.working_directory {
workdir_path.join(working_directory)
} else {
Expand All @@ -307,7 +310,11 @@ impl CapturedWorkdir for CommandRunner {
.args(&req.argv[1..])
.current_dir(cwd)
.envs(&req.env)
.stdin(Stdio::null())
.stdin(if stdin_bytes.is_some() {
Stdio::piped()
} else {
Stdio::null()
})
.stdout(Stdio::piped())
.stderr(Stdio::piped());

Expand All @@ -316,6 +323,30 @@ impl CapturedWorkdir for CommandRunner {
})
.await?;

// If we have stdin bytes, spawn a task to write them to the child's stdin
if let Some(bytes) = stdin_bytes {
debug!("Preparing to write {} bytes to stdin", bytes.len());
if let Some(mut stdin) = child.stdin.take() {
tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
debug!("Writing {} bytes to stdin", bytes.len());
match stdin.write_all(&bytes).await {
Ok(_) => {
debug!("Successfully wrote {} bytes to stdin", bytes.len());
}
Err(e) => {
debug!("Failed to write stdin: {:?}", e);
}
}
// Close stdin by dropping it
drop(stdin);
debug!("Closed stdin");
});
} else {
debug!("Warning: child.stdin was None");
}
}

debug!("spawned local process as {:?} for {:?}", child.id(), req);
let stdout_stream = FramedRead::new(child.stdout.take().unwrap(), BytesCodec::new())
.map_ok(|bytes| ChildOutput::Stdout(bytes.into()))
Expand Down
57 changes: 57 additions & 0 deletions src/rust/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,53 @@ async fn stdout_and_stderr_and_exit_code() {
assert_eq!(result.original.output_directory, *EMPTY_DIRECTORY_DIGEST);
}

#[tokio::test]
#[cfg(unix)]
async fn stdin_input() {
// Test that stdin properly pipes data to the process
let store_dir = TempDir::new().unwrap();
let store = Store::local_only(task_executor::Executor::new(), store_dir.path())
.unwrap();

// Provide test data via stdin bytes
let stdin_content = "test input from stdin\n";

// Use stdin bytes directly
let mut process = Process::new(owned_string_vec(&["/bin/cat"]));
process.stdin = Some(stdin_content.as_bytes().to_vec());

let result = run_command_locally_in_dir_with_store(process, store.clone())
.await
.unwrap();

assert_eq!(result.stdout_bytes, stdin_content.as_bytes());
assert_eq!(result.stderr_bytes, "".as_bytes());
assert_eq!(result.original.exit_code, 0);
}

#[tokio::test]
#[cfg(unix)]
async fn stdin_with_grep() {
// Test stdin with a more complex command (grep)
let store_dir = TempDir::new().unwrap();
let store = Store::local_only(task_executor::Executor::new(), store_dir.path())
.unwrap();

// Provide multi-line input via stdin bytes
let stdin_content = "line one\nline two\nline three\n";

// Use grep to filter stdin
let mut process = Process::new(owned_string_vec(&["/bin/grep", "two"]));
process.stdin = Some(stdin_content.as_bytes().to_vec());

let result = run_command_locally_in_dir_with_store(process, store.clone())
.await
.unwrap();

assert_eq!(result.stdout_bytes, "line two\n".as_bytes());
assert_eq!(result.original.exit_code, 0);
}

#[tokio::test]
#[cfg(unix)]
async fn capture_exit_code_signal() {
Expand Down Expand Up @@ -765,6 +812,16 @@ async fn run_command_locally(req: Process) -> Result<LocalTestResult, ProcessErr
run_command_locally_in_dir(req, work_dir_path, &mut workunit, None, None).await
}

async fn run_command_locally_in_dir_with_store(
req: Process,
store: Store,
) -> Result<LocalTestResult, ProcessError> {
let (_, mut workunit) = WorkunitStore::setup_for_tests();
let work_dir = TempDir::new().unwrap();
let work_dir_path = work_dir.path().to_owned();
run_command_locally_in_dir(req, work_dir_path, &mut workunit, Some(store), None).await
}

async fn run_command_locally_in_dir(
req: Process,
dir: PathBuf,
Expand Down
Loading