diff --git a/src/clusterfuzz/_internal/bot/tasks/task_types.py b/src/clusterfuzz/_internal/bot/tasks/task_types.py index 1d0e38a8ae7..7f568516871 100644 --- a/src/clusterfuzz/_internal/bot/tasks/task_types.py +++ b/src/clusterfuzz/_internal/bot/tasks/task_types.py @@ -115,8 +115,8 @@ def is_remote_utask(command, job): # Return True even if we can't query the db. return True - return batch_service.is_remote_task( - command, job) or swarming.is_swarming_task(command, job) + return batch_service.is_remote_task(command, + job) or swarming.is_swarming_task(job) def task_main_runs_on_uworker(): @@ -178,11 +178,8 @@ def execute(self, task_argument, job_type, uworker_env): return logs.info('Queueing utask for remote execution.', download_url=download_url) - if batch_service.is_remote_task(command, job_type): - tasks.add_utask_main(command, download_url, job_type) - else: - assert swarming.is_swarming_task(command, job_type) - swarming.push_swarming_task(command, download_url, job_type) + assert batch_service.is_remote_task(command, job_type) + tasks.add_utask_main(command, download_url, job_type) @logs.task_stage_context(logs.Stage.PREPROCESS) def preprocess(self, task_argument, job_type, uworker_env): diff --git a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py index 63924785a37..107ea5d907a 100644 --- a/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py +++ b/src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py @@ -63,10 +63,9 @@ def _timestamp_now() -> Timestamp: return ts -def _get_execution_mode(utask_module, job_type): +def _get_execution_mode(job_type): """Determines whether this task in executed on swarming or batch.""" - command = task_utils.get_command_from_module(utask_module.__name__) - if swarming.is_swarming_task(command, job_type): + if swarming.is_swarming_task(job_type): return Mode.SWARMING return Mode.BATCH @@ -410,7 +409,7 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env): signed download URL for the uworker's input and the (unsigned) download URL for its output.""" with _MetricRecorder(_Subtask.PREPROCESS) as recorder: - execution_mode = _get_execution_mode(utask_module, job_type) + execution_mode = _get_execution_mode(job_type) uworker_input = _preprocess(utask_module, task_argument, job_type, uworker_env, recorder, execution_mode) if not uworker_input: @@ -501,8 +500,7 @@ def tworker_postprocess(output_download_url) -> None: task_utils.reset_task_stage_env() utask_module = get_utask_module(uworker_output.uworker_input.module_name) - execution_mode = _get_execution_mode(utask_module, - uworker_output.uworker_input.job_type) + execution_mode = _get_execution_mode(uworker_output.uworker_input.job_type) recorder.set_task_details( utask_module, uworker_output.uworker_input.job_type, diff --git a/src/clusterfuzz/_internal/metrics/logs.py b/src/clusterfuzz/_internal/metrics/logs.py index bc25979e6c1..8fa6c026795 100644 --- a/src/clusterfuzz/_internal/metrics/logs.py +++ b/src/clusterfuzz/_internal/metrics/logs.py @@ -1013,6 +1013,7 @@ class Stage(enum.Enum): MAIN = 'main' POSTPROCESS = 'postprocess' UNKNOWN = 'unknown' + SCHEDULER = 'scheduler' NA = 'n/a' diff --git a/src/clusterfuzz/_internal/remote_task/remote_task_gate.py b/src/clusterfuzz/_internal/remote_task/remote_task_gate.py index ab563533a93..1ed4e1ee89c 100644 --- a/src/clusterfuzz/_internal/remote_task/remote_task_gate.py +++ b/src/clusterfuzz/_internal/remote_task/remote_task_gate.py @@ -24,7 +24,6 @@ from clusterfuzz._internal import swarming from clusterfuzz._internal.base import feature_flags -from clusterfuzz._internal.base.tasks import task_utils from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.remote_task import remote_task_adapters from clusterfuzz._internal.remote_task import remote_task_types @@ -61,9 +60,8 @@ def _get_adapter(self) -> str: def _is_swarming_applicable(self): return feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled - def _is_swarming_task(self, module, job_type): - return swarming.is_swarming_task( - task_utils.get_command_from_module(module), job_type) + def _is_swarming_task(self, job_type): + return swarming.is_swarming_task(job_type) def _handle_swarming_job(self, module, job_type, input_download_url): return self._service_map['swarming'].create_utask_main_job( @@ -124,8 +122,7 @@ def get_job_frequency(self): def create_utask_main_job(self, module, job_type, input_download_url): """Creates a single remote task, selecting a backend dynamically.""" - if self._is_swarming_applicable() and self._is_swarming_task( - module, job_type): + if self._is_swarming_applicable() and self._is_swarming_task(job_type): return self._handle_swarming_job(module, job_type, input_download_url) adapter_id = self._get_adapter() @@ -156,6 +153,7 @@ def create_utask_main_jobs(self, unscheduled_tasks = [] if self._is_swarming_applicable(): + logs.info(f'[Swarming] enabled, pushing {len(remote_tasks)} tasks.') remote_tasks = self._handle_swarming_jobs(remote_tasks) if not remote_tasks: diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 4e03ab3bf6c..9ec7ac4b28c 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -21,10 +21,12 @@ from google.protobuf import json_format from clusterfuzz._internal.base import utils +from clusterfuzz._internal.base.errors import BadConfigError from clusterfuzz._internal.base.feature_flags import FeatureFlags from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.google_cloud_utils import credentials +from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.system import environment @@ -34,40 +36,62 @@ ] -def is_swarming_task(command: str, job_name: str): +def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool: """Returns True if the task is supposed to run on swarming.""" if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + logs.info('[DEBUG] Flag is disabled', job_name=job_name) return False - job = data_types.Job.query(data_types.Job.name == job_name).get() - if not job: - return False + if job is None: + job = data_types.Job.query(data_types.Job.name == job_name).get() + if not job: + logs.info('[DEBUG] Job not found', job_name=job_name) + return False job_environment = job.get_environment() - if not utils.string_is_true(job_environment.get('IS_SWARMING_JOB')): + if not utils.string_is_true(job_environment.get( + 'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'): + logs.info('[DEBUG] No swarming env var', job_name=job_name) return False - try: - _get_new_task_spec(command, job_name, '') - return True - except ValueError: + swarming_config = _get_swarming_config() + if swarming_config is None: + logs.warning('[Swarming] current task is not suitable for swarming. ' \ + 'Reason: failed to retrieve config.') return False + return _get_instance_spec(swarming_config, job) is not None + -def _get_task_name(): - return 't-' + str(uuid.uuid4()).lower() +def _get_instance_spec(swarming_config: local_config.SwarmingConfig, + job: data_types.Job) -> dict | None: + return swarming_config.get('mapping').get(job.platform, None) -def _get_swarming_config(): +def _get_task_name(job_name: str): + return 't-' + str(uuid.uuid4()).lower() + ' ' + job_name + + +def _get_swarming_config() -> local_config.SwarmingConfig | None: """Returns the swarming config.""" - return local_config.SwarmingConfig() + try: + return local_config.SwarmingConfig() + except (BadConfigError, ValueError) as e: + logs.error(f'[Swarming] Failed to retrieve config: {e}') + return None def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list ) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member """ Gets all swarming dimensions for a task. Job dimensions have more precedence than static dimensions""" + swarming_config = _get_swarming_config() + if not swarming_config: + logs.warning( + '[Swarming] No dimensions set. Reason: failed to retrieve config') + return [] + unique_dimensions = {} - unique_dimensions['os'] = job.platform + unique_dimensions['os'] = str(job.platform).capitalize() unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') for dimension in platform_specific_dimensions: @@ -99,16 +123,27 @@ def _env_vars_to_json( value=json.dumps(env_vars_dict)) -def _get_new_task_spec(command: str, job_name: str, - download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member - """Gets the configured specifications for a swarming task.""" +def create_new_task_request(command: str, job_name: str, download_url: str + ) -> swarming_pb2.NewTaskRequest | None: # pylint: disable=no-member + """Gets the configured specifications for a swarming task. + Returns None if the task should'nt be executed on swarming + or if the SWARMING_REMOTE_EXECUTION flag is disabled.""" + if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled: + return None + job = data_types.Job.query(data_types.Job.name == job_name).get() - config_name = job.platform + if job is None: + return None + swarming_config = _get_swarming_config() - instance_spec = swarming_config.get('mapping').get(config_name, None) + if not swarming_config: + return None + + instance_spec = _get_instance_spec(swarming_config, job) if instance_spec is None: - raise ValueError(f'No mapping for {config_name}') - swarming_realm = swarming_config.get('swarming_realm') + return None + + swarming_realm = swarming_config.get('swarming_realm',) logs_project_id = swarming_config.get('logs_project_id') priority = instance_spec['priority'] startup_command = instance_spec['command'] @@ -133,6 +168,7 @@ def _get_new_task_spec(command: str, job_name: str, swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), # pylint: disable=no-member swarming_pb2.StringPair( # pylint: disable=no-member key='LOGGING_CLOUD_PROJECT_ID', value=logs_project_id), @@ -153,7 +189,7 @@ def _get_new_task_spec(command: str, job_name: str, cas_input_root = instance_spec.get('cas_input_root', {}) new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member - name=_get_task_name(), + name=_get_task_name(job_name), priority=priority, realm=swarming_realm, service_account=service_account, @@ -174,14 +210,18 @@ def _get_new_task_spec(command: str, job_name: str, return new_task_request -def push_swarming_task(command, download_url, job_type): +def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member """Schedules a task on swarming.""" - job = data_types.Job.query(data_types.Job.name == job_type).get() - if not job: - raise ValueError('invalid job_name') - - task_spec = _get_new_task_spec(command, job_type, download_url) - creds, _ = credentials.get_default(_SWARMING_SCOPES) + swarming_config = _get_swarming_config() + if not swarming_config: + logs.error( + '[Swarming] Failed to push task into swarming. Reason: No config.') + return + creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES) + if not creds: + logs.error( + '[Swarming] Failed to push task into swarming. Reason: No credentials.') + return if not creds.token: creds.refresh(requests.Request()) @@ -193,5 +233,8 @@ def push_swarming_task(command, download_url, job_type): } swarming_server = _get_swarming_config().get('swarming_server') url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' - utils.post_url( - url=url, data=json_format.MessageToJson(task_spec), headers=headers) + message_body = json_format.MessageToJson(task_request) + logs.info(f"""[Swarming] Pushing task to {url} + as {creds.service_account_email} with {message_body}""") + response = utils.post_url(url=url, data=message_body, headers=headers) + logs.info(f'[Swarming] Response: {response}') diff --git a/src/clusterfuzz/_internal/swarming/service.py b/src/clusterfuzz/_internal/swarming/service.py index 88c1169d285..30c1bad6770 100644 --- a/src/clusterfuzz/_internal/swarming/service.py +++ b/src/clusterfuzz/_internal/swarming/service.py @@ -35,6 +35,7 @@ def create_utask_main_job(self, module: str, job_type: str, return result[0] + @logs.task_stage_context(logs.Stage.SCHEDULER) def create_utask_main_jobs(self, remote_tasks: list[remote_task_types.RemoteTask] ) -> list[remote_task_types.RemoteTask]: @@ -42,14 +43,15 @@ def create_utask_main_jobs(self, Returns the tasks that couldn't be created. """ unscheduled_tasks = [] + logs.info(f'[Swarming] Pushing {len(remote_tasks)} tasks trough service.') for task in remote_tasks: try: - if not swarming.is_swarming_task(task.command, task.job_type): + if not swarming.is_swarming_task(task.job_type): unscheduled_tasks.append(task) continue - - swarming.push_swarming_task(task.command, task.input_download_url, - task.job_type) + if request := swarming.create_new_task_request( + task.command, task.job_type, task.argument): + swarming.push_swarming_task(request) except Exception: # pylint: disable=broad-except logs.error( f'Failed to push task to Swarming: {task.command}, {task.job_type}.' diff --git a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py index e9c21331268..d2991f85583 100644 --- a/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py +++ b/src/clusterfuzz/_internal/tests/core/remote_task/remote_task_gate_test.py @@ -518,15 +518,13 @@ def test_is_swarming_applicable(self, mock_swarming_flag): mock_swarming_flag.return_value = False self.assertFalse(self.gate._is_swarming_applicable()) - @mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.task_utils') @mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.swarming') - def test_is_swarming_task(self, mock_swarming, mock_task_utils): + def test_is_swarming_task(self, mock_swarming): """Tests _is_swarming_task.""" - mock_task_utils.get_command_from_module.return_value = 'fuzz' mock_swarming.is_swarming_task.return_value = True - self.assertTrue(self.gate._is_swarming_task('module', 'job')) - mock_swarming.is_swarming_task.assert_called_once_with('fuzz', 'job') + self.assertTrue(self.gate._is_swarming_task('job')) + mock_swarming.is_swarming_task.assert_called_once_with('job') def test_handle_swarming_job(self): """Tests _handle_swarming_job.""" diff --git a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py index 09c485c4267..e80e51b3237 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/service_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/service_test.py @@ -28,10 +28,12 @@ def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.swarming.is_swarming_task', 'clusterfuzz._internal.swarming.push_swarming_task', + 'clusterfuzz._internal.swarming.create_new_task_request', 'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module', 'clusterfuzz._internal.metrics.logs.error', ]) self.service = service.SwarmingService() + self.mock.create_new_task_request.return_value = 'fake_request' def test_create_utask_main_job_success(self): """Test creating a single task successfully.""" @@ -44,8 +46,7 @@ def test_create_utask_main_job_success(self): # Success returns None in this interface (consistent with GcpBatchService) self.assertIsNone(result) - self.mock.push_swarming_task.assert_called_once_with( - 'fuzz', 'http://url', 'job_type') + self.mock.push_swarming_task.assert_called_once_with('fake_request') def test_create_utask_main_job_failure(self): """Test creating a single task that is not a swarming task.""" @@ -78,8 +79,8 @@ def test_create_utask_main_jobs_mixed_results(self): self.assertEqual(self.mock.push_swarming_task.call_count, 2) self.mock.push_swarming_task.assert_has_calls([ - mock.call('fuzz', 'url1', 'job1'), - mock.call('fuzz', 'url3', 'job3'), + mock.call('fake_request'), + mock.call('fake_request'), ]) def test_create_utask_main_jobs_all_success(self): diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_config_error_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_config_error_test.py new file mode 100644 index 00000000000..8958d73b081 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_config_error_test.py @@ -0,0 +1,57 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Swarming config error tests.""" +import unittest +from unittest import mock + +from clusterfuzz._internal import swarming +from clusterfuzz._internal.base.errors import BadConfigError +from clusterfuzz._internal.datastore import data_types +from clusterfuzz._internal.tests.test_libs import helpers +from clusterfuzz._internal.tests.test_libs import test_utils + + +@test_utils.with_cloud_emulators('datastore') +class SwarmingConfigErrorTest(unittest.TestCase): + """Tests for swarming utils when config is missing.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.swarming.FeatureFlags', + ]) + helpers.patch_environ(self) + self.mock.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled = True + + def test_is_swarming_task_bad_config(self): + """Tests that is_swarming_task returns False when there's a BadConfigError.""" + with mock.patch('clusterfuzz._internal.config.local_config.SwarmingConfig' + ) as mock_config: + mock_config.side_effect = BadConfigError('test') + job = data_types.Job( + name='libfuzzer_chrome_asan', + platform='LINUX', + environment_string='IS_SWARMING_JOB = True') + job.put() + self.assertFalse(swarming.is_swarming_task(job.name)) + + def test_create_new_task_request_bad_config(self): + """Tests that create_new_task_request returns None when there's a BadConfigError.""" + with mock.patch('clusterfuzz._internal.config.local_config.SwarmingConfig' + ) as mock_config: + mock_config.side_effect = BadConfigError('test') + job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') + job.put() + spec = swarming.create_new_task_request('fuzz', job.name, + 'https://download_url') + self.assertIsNone(spec) diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index 016a41f13bb..5028851be93 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -34,7 +34,7 @@ def setUp(self): helpers.patch(self, [ 'clusterfuzz._internal.base.utils.post_url', 'clusterfuzz._internal.swarming._get_task_name', - 'clusterfuzz._internal.google_cloud_utils.credentials.get_default', + 'clusterfuzz._internal.google_cloud_utils.credentials.get_scoped_service_account_credentials', 'google.auth.transport.requests.Request', 'clusterfuzz._internal.swarming.FeatureFlags', ]) @@ -44,11 +44,11 @@ def setUp(self): self.maxDiff = None def test_get_spec_from_config_with_docker_image(self): - """Tests that _get_new_task_spec works as expected.""" + """Tests that create_new_task_request works as expected.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') + spec = swarming.create_new_task_request('corpus_pruning', job.name, + 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', priority=1, @@ -62,7 +62,8 @@ def test_get_spec_from_config_with_docker_image(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -82,37 +83,35 @@ def test_get_spec_from_config_with_docker_image(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], secret_bytes=base64.b64encode( 'https://download_url'.encode('utf-8')))) ]) - self.assertEqual(spec, expected_spec) - def test_get_spec_from_config_raises_error_on_unknown_config(self): - """Tests that _get_new_task_spec raises error when there's no mapping for - the config.""" + def test_get_spec_from_config_returns_none_on_unknown_config(self): + """Tests that create_new_task_request returns None when there's no mapping for the config.""" job = data_types.Job(name='some_job_name', platform='UNKNOWN-PLATFORM') job.put() - with self.assertRaises(ValueError): - swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') + spec = swarming.create_new_task_request('corpus_pruning', job.name, + 'https://download_url') + self.assertIsNone(spec) def test_get_spec_from_config_without_docker_image(self): - """Tests that _get_new_task_spec works as expected (without a docker - image).""" + """Tests that create_new_task_request works as expected (without a docker image).""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='MAC') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'corpus_pruning', job.name, 'https://download_url') + spec = swarming.create_new_task_request('corpus_pruning', job.name, + 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', priority=1, @@ -126,7 +125,8 @@ def test_get_spec_from_config_without_docker_image(self): 'luci-auth', 'context', '--', './mac_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name'), swarming_pb2.StringPair(key='key1', value='value1'), swarming_pb2.StringPair(key='key2', value='value2'), @@ -155,12 +155,13 @@ def test_get_spec_from_config_without_docker_image(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -178,11 +179,11 @@ def test_get_spec_from_config_without_docker_image(self): self.assertEqual(spec, expected_spec) def test_get_spec_from_config_for_fuzz_task(self): - """Tests that _get_new_task_spec works as expected for fuzz commands.""" + """Tests that create_new_task_request works as expected for fuzz commands.""" job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'fuzz', job.name, 'https://download_url') + spec = swarming.create_new_task_request('fuzz', job.name, + 'https://download_url') expected_spec = swarming_pb2.NewTaskRequest( name='task_name', priority=1, @@ -196,7 +197,8 @@ def test_get_spec_from_config_for_fuzz_task(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -216,12 +218,13 @@ def test_get_spec_from_config_for_fuzz_task(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -234,11 +237,13 @@ def test_push_swarming_task(self): """Tests that push_swarming_task works as expected.""" mock_creds = mock.MagicMock() mock_creds.token = 'fake_token' - self.mock.get_default.return_value = (mock_creds, None) + self.mock.get_scoped_service_account_credentials.return_value = mock_creds job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - swarming.push_swarming_task('fuzz', 'https://download_url', job.name) + task_request = swarming.create_new_task_request('fuzz', job.name, + 'https://download_url') + swarming.push_swarming_task(task_request) expected_new_task_request = swarming_pb2.NewTaskRequest( name='task_name', @@ -253,7 +258,8 @@ def test_push_swarming_task(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -273,12 +279,13 @@ def test_push_swarming_task(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -286,7 +293,8 @@ def test_push_swarming_task(self): 'https://download_url'.encode('utf-8')))) ]) - self.mock.get_default.assert_called_with(swarming._SWARMING_SCOPES) # pylint: disable=protected-access + self.mock.get_scoped_service_account_credentials.assert_called_with( + swarming._SWARMING_SCOPES) # pylint: disable=protected-access expected_headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', @@ -302,7 +310,7 @@ def test_push_swarming_task_with_refresh(self): """Tests that push_swarming_task refreshes credentials if token is missing.""" mock_creds = mock.MagicMock() mock_creds.token = None - self.mock.get_default.return_value = (mock_creds, None) + self.mock.get_scoped_service_account_credentials.return_value = mock_creds def refresh_side_effect(_): mock_creds.token = 'refreshed_token' @@ -311,7 +319,9 @@ def refresh_side_effect(_): job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() - swarming.push_swarming_task('fuzz', 'https://download_url', job.name) + request = swarming.create_new_task_request('fuzz', job.name, + 'https://download_url') + swarming.push_swarming_task(request) mock_creds.refresh.assert_called_with(self.mock.Request.return_value) expected_headers = { @@ -329,15 +339,58 @@ def test_is_swarming_task(self): platform='LINUX', environment_string='IS_SWARMING_JOB = True') job.put() - self.assertTrue(swarming.is_swarming_task('fuzz', job.name)) + self.assertTrue(swarming.is_swarming_task(job.name)) job.environment_string = 'IS_SWARMING_JOB = False' job.put() - self.assertFalse(swarming.is_swarming_task('fuzz', job.name)) + self.assertFalse(swarming.is_swarming_task(job.name)) job.environment_string = '' job.put() - self.assertFalse(swarming.is_swarming_task('fuzz', job.name)) + self.assertFalse(swarming.is_swarming_task(job.name)) + + def test_is_swarming_task_with_job_instance(self): + """Tests that is_swarming_task avoids DB query when job is provided.""" + # Mock query to prove that passing a job instance bypasses the Datastore query. + helpers.patch(self, + ['clusterfuzz._internal.datastore.data_types.Job.query']) + job = data_types.Job( + name='libfuzzer_chrome_asan', + platform='LINUX', + environment_string='IS_SWARMING_JOB = True') + job.put() # Ensure it's valid, though it won't be queried + + # Call with job instance + self.assertTrue(swarming.is_swarming_task(job.name, job=job)) + self.mock.query.assert_not_called() + + def test_is_swarming_task_without_job_instance(self): + """Tests that is_swarming_task queries the DB when job is not provided.""" + # Mock query to prove that passing a job instance bypasses the Datastore query. + helpers.patch(self, + ['clusterfuzz._internal.datastore.data_types.Job.query']) + job = data_types.Job( + name='libfuzzer_chrome_asan', + platform='LINUX', + environment_string='IS_SWARMING_JOB = True') + job.put() + + mock_query_obj = mock.Mock() + mock_query_obj.get.return_value = job + self.mock.query.return_value = mock_query_obj + + self.assertTrue(swarming.is_swarming_task(job.name)) + self.mock.query.assert_called_once() + + def test_is_swarming_task_with_feature_flag_disabled(self): + """Tests that is_swarming_task returns False when the feature flag is disabled.""" + self.mock.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled = False + job = data_types.Job( + name='libfuzzer_chrome_asan', + platform='LINUX', + environment_string='IS_SWARMING_JOB = True') + job.put() + self.assertFalse(swarming.is_swarming_task(job.name)) def test_get_task_dimensions_with_env_var(self): """Tests that _get_task_dimensions handles SWARMING_DIMENSIONS env var.""" @@ -365,12 +418,12 @@ def test_get_task_dimensions_job_precedence(self): # We set SWARMING_DIMENSIONS in the environment to override key1. environment.set_value('SWARMING_DIMENSIONS', {'key1': 'job_value1'}) - spec = swarming._get_new_task_spec( # pylint: disable=protected-access - 'fuzz', job.name, 'https://download_url') + spec = swarming.create_new_task_request('fuzz', job.name, + 'https://download_url') dimensions = spec.task_slices[0].properties.dimensions expected_dimensions = [ - swarming_pb2.StringPair(key='os', value='MAC'), + swarming_pb2.StringPair(key='os', value='Mac'), swarming_pb2.StringPair(key='pool', value='pool-name'), swarming_pb2.StringPair(key='key1', value='job_value1'), swarming_pb2.StringPair(key='key2', value='value2'),