Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/notes/2.31.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ Allow `InteractiveProcess` to set the working directory relative to the sandbox

Pex is no longer included *as a Python module* in the default lockfile. Pex is still used as a cli tool as intended.

Pants now supports sending a string of bytes to the stdin of a Process.

#### nFPM backend

Added a new rule to help in-repo plugins implement the `inject_nfpm_package_fields(InjectNfpmPackageFieldsRequest) -> InjectedNfpmPackageFields` polymorphic rule. The `get_package_field_sets_for_nfpm_content_file_deps` rule (in the `pants.backend.nfpm.util_rules.contents` module) collects selected `PackageFieldSet`s from the contents of an `nfpm_*_package` so that the packages can be analyzed to inject things like package requirements.
Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/engine/intrinsics.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ async def add_prefix(add_prefix: AddPrefix) -> Digest:
async def execute_process(
process: Process, process_execution_environment: ProcessExecutionEnvironment
) -> FallibleProcessResult:
# Validate that stdin is not used with remote execution
# because the RBE API does not support stdin.
if process.stdin is not None and process_execution_environment.remote_execution:
raise ValueError(
f"Process '{process.description}' cannot use stdin with remote execution. "
"Configure the process to run locally or remove stdin."
)
return await native_engine.execute_process(process, process_execution_environment)


Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Process:
description: str = dataclasses.field(compare=False)
level: LogLevel
input_digest: Digest
stdin: bytes | None
immutable_input_digests: FrozenDict[str, Digest]
use_nailgun: tuple[str, ...]
working_directory: str | None
Expand All @@ -130,6 +131,7 @@ def __init__(
description: str,
level: LogLevel = LogLevel.INFO,
input_digest: Digest = EMPTY_DIGEST,
stdin: bytes | None = None,
immutable_input_digests: Mapping[str, Digest] | None = None,
use_nailgun: Iterable[str] = (),
working_directory: str | None = None,
Expand Down Expand Up @@ -158,6 +160,10 @@ def __init__(
512KB will be read-only unless they are globbed as part of either `output_files` or
`output_directories`.

To provide data to the process's standard input, use the `stdin` parameter with a bytes
object. The bytes will be piped to the process's stdin. Note that stdin is only supported
for local and workspace execution; remote execution does not support stdin and will fail.

Often, you will want to capture the files/directories created in the process. To do this,
you can either set `output_files` or `output_directories`. The specified paths should be
specified relative to the `working_directory`, if any, and will then be used to populate
Expand All @@ -181,6 +187,7 @@ def __init__(
object.__setattr__(self, "description", description)
object.__setattr__(self, "level", level)
object.__setattr__(self, "input_digest", input_digest)
object.__setattr__(self, "stdin", stdin)
object.__setattr__(
self, "immutable_input_digests", FrozenDict(immutable_input_digests or {})
)
Expand Down
28 changes: 28 additions & 0 deletions src/python/pants/engine/process_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,34 @@ def test_concurrency_templating(rule_runner: RuleRunner) -> None:
assert result.stderr == b""


def test_stdin(rule_runner: RuleRunner) -> None:
stdin_content = b"Hello from stdin!\nLine 2\nLine 3\n"
process = Process(
argv=("/bin/cat",),
stdin=stdin_content,
description="test stdin piping",
)
result = rule_runner.request(ProcessResult, [process])

assert result.stdout == stdin_content
assert result.stderr == b""


def test_stdin_empty(rule_runner: RuleRunner) -> None:
file_content = b"file contents\n"
input_digest = rule_runner.request(
Digest, [CreateDigest([FileContent("test.txt", file_content)])]
)
process = Process(
argv=("/bin/cat", "-", "test.txt"),
description="test empty stdin",
input_digest=input_digest,
)
result = rule_runner.request(ProcessResult, [process])

assert result.stdout == file_content


def test_concurrency_enum():
exactly_one = ProcessConcurrency.exactly(1)
min_one = ProcessConcurrency.range(1, min=1)
Expand Down
14 changes: 14 additions & 0 deletions src/rust/engine/src/nodes/execute_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use process_execution::{
use pyo3::Bound;
use pyo3::prelude::{PyAny, Python};
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyAnyMethods;
use store::{self, Store, StoreError};
use workunit_store::{
Metric, ObservationMetric, RunningWorkunit, UserMetadataItem, WorkunitMetadata,
Expand Down Expand Up @@ -82,6 +83,18 @@ impl ExecuteProcess {
.map(RelativePath::new)
.transpose()?;

let stdin = {
let stdin_py: Bound<'_, PyAny> = externs::getattr(value, "stdin")?;
if stdin_py.is_none() {
None
} else {
let stdin_bytes: Vec<u8> = stdin_py
.extract()
.map_err(|e| format!("Failed to extract `stdin` bytes: {e}"))?;
Some(stdin_bytes)
}
};

let output_files = externs::getattr::<Vec<String>>(value, "output_files")?
.into_iter()
.map(RelativePath::new)
Expand Down Expand Up @@ -165,6 +178,7 @@ impl ExecuteProcess {
env,
working_directory,
input_digests,
stdin,
output_files,
output_directories,
timeout,
Expand Down
11 changes: 11 additions & 0 deletions src/rust/process_execution/remote/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,17 @@ impl process_execution::CommandRunner for CommandRunner {
_workunit: &mut RunningWorkunit,
request: Process,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
// Remote execution does not support stdin.
// This is checked up in the python engine, but checking
// here as well in case another caller comes along.
if request.stdin.is_some() {
return Err(ProcessError::Unclassified(format!(
"Process '{}' cannot use stdin with remote execution. \
Configure the process to run locally or remove stdin.",
request.description
)));
}

// Retrieve capabilities for this server.
let capabilities = self.get_capabilities().await?;
trace!("RE capabilities: {:?}", &capabilities);
Expand Down
32 changes: 32 additions & 0 deletions src/rust/process_execution/remote/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ async fn make_execute_request() {
.collect(),
working_directory: None,
input_digests: InputDigests::with_input_files(input_directory.directory_digest()),
stdin: None,
// Intentionally poorly sorted:
output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(),
output_directories: relative_paths(&["directory/name"]).collect(),
Expand Down Expand Up @@ -185,6 +186,7 @@ async fn make_execute_request_with_instance_name() {
.collect(),
working_directory: None,
input_digests: InputDigests::with_input_files(input_directory.directory_digest()),
stdin: None,
// Intentionally poorly sorted:
output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(),
output_directories: relative_paths(&["directory/name"]).collect(),
Expand Down Expand Up @@ -300,6 +302,7 @@ async fn make_execute_request_with_cache_key_gen_version() {
.collect(),
working_directory: None,
input_digests: InputDigests::with_input_files(input_directory.directory_digest()),
stdin: None,
// Intentionally poorly sorted:
output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(),
output_directories: relative_paths(&["directory/name"]).collect(),
Expand Down Expand Up @@ -575,6 +578,7 @@ async fn make_execute_request_with_timeout() {
.collect(),
working_directory: None,
input_digests: InputDigests::with_input_files(input_directory.directory_digest()),
stdin: None,
// Intentionally poorly sorted:
output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(),
output_directories: relative_paths(&["directory/name"]).collect(),
Expand Down Expand Up @@ -675,6 +679,7 @@ async fn make_execute_request_with_append_only_caches() {
.collect(),
working_directory: Some(RelativePath::new(Path::new("animals")).unwrap()),
input_digests: InputDigests::with_input_files(input_directory.directory_digest()),
stdin: None,
output_files: BTreeSet::new(),
output_directories: BTreeSet::new(),
timeout: one_second(),
Expand Down Expand Up @@ -838,6 +843,7 @@ async fn make_execute_request_using_immutable_inputs() {
.collect(),
working_directory: None,
input_digests,
stdin: None,
output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(),
output_directories: relative_paths(&["directory/name"]).collect(),
timeout: None,
Expand Down Expand Up @@ -2853,3 +2859,29 @@ fn assert_cancellation_requests(
fn one_second() -> Option<Duration> {
Some(Duration::from_millis(1000))
}

#[tokio::test]
async fn stdin_with_remote_execution_fails() {
let (_, mut workunit) = WorkunitStore::setup_for_tests();

let cas = mock::StubCAS::builder().build().await;
let (command_runner, _store) = create_command_runner(cas.address(), &cas).await;

let mut process = Process::new(owned_string_vec(&["/bin/cat"]));
process.stdin = Some(b"test input".to_vec());
process.description = "test stdin rejection".to_string();
process.execution_environment = make_environment(Platform::Linux_x86_64);

let result = command_runner
.run(Context::default(), &mut workunit, process)
.await;

assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string()
.contains("cannot use stdin with remote execution"),
"Expected error about stdin with remote execution, got: {}",
err
);
}
29 changes: 29 additions & 0 deletions src/rust/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ pub const CACHE_KEY_SALT_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_SALT";
// CommandRunner.
pub const CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_TARGET_PLATFORM";

// Environment variable which includes stdin content in the cache key to ensure processes with
// different stdin have different cache keys.
// Even though the Remote Execution API doesn't support stdin, the remote cache is used for local processes.
pub const CACHE_KEY_STDIN_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_STDIN";

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ProcessError {
/// A Digest was not present in either of the local or remote Stores.
Expand Down Expand Up @@ -673,6 +678,13 @@ pub struct Process {
/// This is included in hash/eq so it creates a unique node in the runtime graph.
///
pub attempt: usize,

///
/// Optional bytes to pipe to the process's standard input.
///
/// If set, these bytes will be piped to the process's stdin.
///
pub stdin: Option<Vec<u8>>,
}

impl Process {
Expand Down Expand Up @@ -712,6 +724,7 @@ impl Process {
},
remote_cache_speculation_delay: std::time::Duration::from_millis(0),
attempt: 0,
stdin: None,
}
}

Expand Down Expand Up @@ -1336,6 +1349,7 @@ pub async fn make_execute_request(
if name == CACHE_KEY_GEN_VERSION_ENV_VAR_NAME
|| name == CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME
|| name == CACHE_KEY_SALT_ENV_VAR_NAME
|| name == CACHE_KEY_STDIN_ENV_VAR_NAME
{
return Err(format!(
"Cannot set env var with name {name} as that is reserved for internal use by pants"
Expand Down Expand Up @@ -1473,6 +1487,21 @@ pub async fn make_execute_request(
// Store the separate copy back into the Command proto.
command.platform = Some(command_platform);

// If stdin is provided, add a hash of its content as a synthetic environment variable to
// ensure it affects the cache key. This ensures processes with different stdin content get
// different cache keys. Even though the RBE API doesn't support stdin, the remote cache
// is used for local processes.
if let Some(ref stdin_bytes) = req.stdin {
// Compute the digest of the stdin content for the cache key
let stdin_digest = Digest::of_bytes(stdin_bytes);
command
.environment_variables
.push(remexec::command::EnvironmentVariable {
name: CACHE_KEY_STDIN_ENV_VAR_NAME.to_string(),
value: format!("{:?}", stdin_digest), // Format as "Digest { hash: ..., size_bytes: ... }"
});
}

// Sort the environment variables. REv2 spec requires sorting by name for same reasons that
// platform properties are sorted, i.e. consistent hashing.
command
Expand Down
40 changes: 39 additions & 1 deletion src/rust/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use store::{
};
use task_executor::Executor;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
use tokio::sync::RwLock;
use tokio::time::timeout;
Expand Down Expand Up @@ -293,6 +294,9 @@ impl CapturedWorkdir for CommandRunner {
req: Process,
exclusive_spawn: bool,
) -> Result<BoxStream<'r, Result<ChildOutput, String>>, CapturedWorkdirError> {
// Get stdin bytes from the process if provided
let stdin_bytes = req.stdin.clone();

let cwd = if let Some(ref working_directory) = req.working_directory {
workdir_path.join(working_directory)
} else {
Expand All @@ -307,7 +311,11 @@ impl CapturedWorkdir for CommandRunner {
.args(&req.argv[1..])
.current_dir(cwd)
.envs(&req.env)
.stdin(Stdio::null())
.stdin(if stdin_bytes.is_some() {
Stdio::piped()
} else {
Stdio::null()
})
.stdout(Stdio::piped())
.stderr(Stdio::piped());

Expand All @@ -316,6 +324,19 @@ impl CapturedWorkdir for CommandRunner {
})
.await?;

let stdin_write_handle = if let Some(bytes) = stdin_bytes {
child.stdin.take().map(|mut stdin| {
tokio::spawn(async move {
stdin
.write_all(&bytes)
.await
.map_err(|e| format!("Failed to write to stdin: {e}"))
})
})
} else {
None
};

debug!("spawned local process as {:?} for {:?}", child.id(), req);
let stdout_stream = FramedRead::new(child.stdout.take().unwrap(), BytesCodec::new())
.map_ok(|bytes| ChildOutput::Stdout(bytes.into()))
Expand All @@ -326,6 +347,23 @@ impl CapturedWorkdir for CommandRunner {
.fuse()
.boxed();
let exit_stream = async move {
// If stdin write was spawned, wait for it to complete and propagate any error
if let Some(handle) = stdin_write_handle {
match handle.await {
Err(e) => {
return Err(std::io::Error::other(format!(
"Stdin write task panicked: {e}"
)));
}
Ok(Err(e)) => {
return Err(std::io::Error::other(format!(
"Failed to write to stdin: {e}"
)));
}
Ok(Ok(())) => {}
}
}

child
.wait()
.map_ok(|exit_status| {
Expand Down
Loading
Loading