diff --git a/src/clusterfuzz/_internal/swarming/__init__.py b/src/clusterfuzz/_internal/swarming/__init__.py index 4e03ab3bf6..b69115ee31 100644 --- a/src/clusterfuzz/_internal/swarming/__init__.py +++ b/src/clusterfuzz/_internal/swarming/__init__.py @@ -25,6 +25,7 @@ from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.google_cloud_utils import credentials +from clusterfuzz._internal.metrics import logs from clusterfuzz._internal.protos import swarming_pb2 from clusterfuzz._internal.system import environment @@ -67,7 +68,7 @@ def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list """ Gets all swarming dimensions for a task. Job dimensions have more precedence than static dimensions""" unique_dimensions = {} - unique_dimensions['os'] = job.platform + unique_dimensions['os'] = str(job.platform).capitalize() unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool') for dimension in platform_specific_dimensions: @@ -133,6 +134,7 @@ def _get_new_task_spec(command: str, job_name: str, swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), # pylint: disable=no-member swarming_pb2.StringPair( # pylint: disable=no-member key='LOGGING_CLOUD_PROJECT_ID', value=logs_project_id), @@ -181,7 +183,11 @@ def push_swarming_task(command, download_url, job_type): raise ValueError('invalid job_name') task_spec = _get_new_task_spec(command, job_type, download_url) - creds, _ = credentials.get_default(_SWARMING_SCOPES) + creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES) + if not creds: + logs.error( + '[Swarming] Failed to push task into swarming. Reason: No credentials.') + return if not creds.token: creds.refresh(requests.Request()) @@ -193,5 +199,8 @@ def push_swarming_task(command, download_url, job_type): } swarming_server = _get_swarming_config().get('swarming_server') url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask' - utils.post_url( - url=url, data=json_format.MessageToJson(task_spec), headers=headers) + message_body = json_format.MessageToJson(task_spec) + logs.info(f"""[Swarming] Pushing task to {url} + as {creds.service_account_email} with {message_body}""") + response = utils.post_url(url=url, data=message_body, headers=headers) + logs.info(f'[Swarming] Response: {response}') diff --git a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py index 016a41f13b..aec2b0532d 100644 --- a/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py +++ b/src/clusterfuzz/_internal/tests/core/swarming/swarming_test.py @@ -35,6 +35,7 @@ def setUp(self): 'clusterfuzz._internal.base.utils.post_url', 'clusterfuzz._internal.swarming._get_task_name', 'clusterfuzz._internal.google_cloud_utils.credentials.get_default', + 'clusterfuzz._internal.google_cloud_utils.credentials.get_scoped_service_account_credentials', 'google.auth.transport.requests.Request', 'clusterfuzz._internal.swarming.FeatureFlags', ]) @@ -62,7 +63,8 @@ def test_get_spec_from_config_with_docker_image(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -82,12 +84,13 @@ def test_get_spec_from_config_with_docker_image(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -126,7 +129,8 @@ def test_get_spec_from_config_without_docker_image(self): 'luci-auth', 'context', '--', './mac_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name'), swarming_pb2.StringPair(key='key1', value='value1'), swarming_pb2.StringPair(key='key2', value='value2'), @@ -155,12 +159,13 @@ def test_get_spec_from_config_without_docker_image(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -196,7 +201,8 @@ def test_get_spec_from_config_for_fuzz_task(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -216,12 +222,13 @@ def test_get_spec_from_config_for_fuzz_task(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -234,7 +241,7 @@ def test_push_swarming_task(self): """Tests that push_swarming_task works as expected.""" mock_creds = mock.MagicMock() mock_creds.token = 'fake_token' - self.mock.get_default.return_value = (mock_creds, None) + self.mock.get_scoped_service_account_credentials.return_value = mock_creds job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX') job.put() @@ -253,7 +260,8 @@ def test_push_swarming_task(self): 'luci-auth', 'context', '--', './linux_entry_point.sh' ], dimensions=[ - swarming_pb2.StringPair(key='os', value=job.platform), + swarming_pb2.StringPair( + key='os', value=str(job.platform).capitalize()), swarming_pb2.StringPair(key='pool', value='pool-name') ], cipd_input=swarming_pb2.CipdInput(), # pylint: disable=no-member @@ -273,12 +281,13 @@ def test_push_swarming_task(self): swarming_pb2.StringPair( key='DOCKER_ENV_VARS', value= - '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' + '{"UWORKER": "True", "SWARMING_BOT": "True", "LOG_TO_GCP": "True", "IS_K8S_ENV": "True", "LOGGING_CLOUD_PROJECT_ID": "project_id"}' ), swarming_pb2.StringPair(key='UWORKER', value='True'), swarming_pb2.StringPair( key='SWARMING_BOT', value='True'), swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), + swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), swarming_pb2.StringPair( key='LOGGING_CLOUD_PROJECT_ID', value='project_id'), ], @@ -286,7 +295,8 @@ def test_push_swarming_task(self): 'https://download_url'.encode('utf-8')))) ]) - self.mock.get_default.assert_called_with(swarming._SWARMING_SCOPES) # pylint: disable=protected-access + self.mock.get_scoped_service_account_credentials.assert_called_with( + swarming._SWARMING_SCOPES) # pylint: disable=protected-access expected_headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', @@ -302,7 +312,7 @@ def test_push_swarming_task_with_refresh(self): """Tests that push_swarming_task refreshes credentials if token is missing.""" mock_creds = mock.MagicMock() mock_creds.token = None - self.mock.get_default.return_value = (mock_creds, None) + self.mock.get_scoped_service_account_credentials.return_value = mock_creds def refresh_side_effect(_): mock_creds.token = 'refreshed_token' @@ -370,7 +380,7 @@ def test_get_task_dimensions_job_precedence(self): dimensions = spec.task_slices[0].properties.dimensions expected_dimensions = [ - swarming_pb2.StringPair(key='os', value='MAC'), + swarming_pb2.StringPair(key='os', value='Mac'), swarming_pb2.StringPair(key='pool', value='pool-name'), swarming_pb2.StringPair(key='key1', value='job_value1'), swarming_pb2.StringPair(key='key2', value='value2'),