Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/clusterfuzz/_internal/bot/tasks/task_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ def is_remote_utask(command, job):
# Return True even if we can't query the db.
return True

return batch_service.is_remote_task(
command, job) or swarming.is_swarming_task(command, job)
return batch_service.is_remote_task(command,
job) or swarming.is_swarming_task(job)


def task_main_runs_on_uworker():
Expand Down Expand Up @@ -178,11 +178,8 @@ def execute(self, task_argument, job_type, uworker_env):
return

logs.info('Queueing utask for remote execution.', download_url=download_url)
if batch_service.is_remote_task(command, job_type):
tasks.add_utask_main(command, download_url, job_type)
else:
assert swarming.is_swarming_task(command, job_type)
swarming.push_swarming_task(command, download_url, job_type)
assert batch_service.is_remote_task(command, job_type)
tasks.add_utask_main(command, download_url, job_type)

@logs.task_stage_context(logs.Stage.PREPROCESS)
def preprocess(self, task_argument, job_type, uworker_env):
Expand Down
10 changes: 4 additions & 6 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ def _timestamp_now() -> Timestamp:
return ts


def _get_execution_mode(utask_module, job_type):
def _get_execution_mode(job_type):
"""Determines whether this task in executed on swarming or batch."""
command = task_utils.get_command_from_module(utask_module.__name__)
if swarming.is_swarming_task(command, job_type):
if swarming.is_swarming_task(job_type):
return Mode.SWARMING
return Mode.BATCH

Expand Down Expand Up @@ -410,7 +409,7 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
signed download URL for the uworker's input and the (unsigned) download URL
for its output."""
with _MetricRecorder(_Subtask.PREPROCESS) as recorder:
execution_mode = _get_execution_mode(utask_module, job_type)
execution_mode = _get_execution_mode(job_type)
uworker_input = _preprocess(utask_module, task_argument, job_type,
uworker_env, recorder, execution_mode)
if not uworker_input:
Expand Down Expand Up @@ -501,8 +500,7 @@ def tworker_postprocess(output_download_url) -> None:
task_utils.reset_task_stage_env()

utask_module = get_utask_module(uworker_output.uworker_input.module_name)
execution_mode = _get_execution_mode(utask_module,
uworker_output.uworker_input.job_type)
execution_mode = _get_execution_mode(uworker_output.uworker_input.job_type)
recorder.set_task_details(
utask_module,
uworker_output.uworker_input.job_type,
Expand Down
1 change: 1 addition & 0 deletions src/clusterfuzz/_internal/metrics/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ class Stage(enum.Enum):
MAIN = 'main'
POSTPROCESS = 'postprocess'
UNKNOWN = 'unknown'
SCHEDULER = 'scheduler'
NA = 'n/a'


Expand Down
10 changes: 4 additions & 6 deletions src/clusterfuzz/_internal/remote_task/remote_task_gate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

from clusterfuzz._internal import swarming
from clusterfuzz._internal.base import feature_flags
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.remote_task import remote_task_adapters
from clusterfuzz._internal.remote_task import remote_task_types
Expand Down Expand Up @@ -61,9 +60,8 @@ def _get_adapter(self) -> str:
def _is_swarming_applicable(self):
return feature_flags.FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled

def _is_swarming_task(self, module, job_type):
return swarming.is_swarming_task(
task_utils.get_command_from_module(module), job_type)
def _is_swarming_task(self, job_type):
return swarming.is_swarming_task(job_type)

def _handle_swarming_job(self, module, job_type, input_download_url):
return self._service_map['swarming'].create_utask_main_job(
Expand Down Expand Up @@ -124,8 +122,7 @@ def get_job_frequency(self):

def create_utask_main_job(self, module, job_type, input_download_url):
"""Creates a single remote task, selecting a backend dynamically."""
if self._is_swarming_applicable() and self._is_swarming_task(
module, job_type):
if self._is_swarming_applicable() and self._is_swarming_task(job_type):
return self._handle_swarming_job(module, job_type, input_download_url)

adapter_id = self._get_adapter()
Expand Down Expand Up @@ -156,6 +153,7 @@ def create_utask_main_jobs(self,
unscheduled_tasks = []

if self._is_swarming_applicable():
logs.info(f'[Swarming] enabled, pushing {len(remote_tasks)} tasks.')
remote_tasks = self._handle_swarming_jobs(remote_tasks)

if not remote_tasks:
Expand Down
105 changes: 74 additions & 31 deletions src/clusterfuzz/_internal/swarming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
from google.protobuf import json_format

from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.errors import BadConfigError
from clusterfuzz._internal.base.feature_flags import FeatureFlags
from clusterfuzz._internal.config import local_config
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.google_cloud_utils import credentials
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.protos import swarming_pb2
from clusterfuzz._internal.system import environment

Expand All @@ -34,40 +36,62 @@
]


def is_swarming_task(command: str, job_name: str):
def is_swarming_task(job_name: str, job: data_types.Job | None = None) -> bool:
"""Returns True if the task is supposed to run on swarming."""
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
logs.info('[DEBUG] Flag is disabled', job_name=job_name)
return False
job = data_types.Job.query(data_types.Job.name == job_name).get()
if not job:
return False
if job is None:
job = data_types.Job.query(data_types.Job.name == job_name).get()
if not job:
logs.info('[DEBUG] Job not found', job_name=job_name)
return False

job_environment = job.get_environment()
if not utils.string_is_true(job_environment.get('IS_SWARMING_JOB')):
if not utils.string_is_true(job_environment.get(
'IS_SWARMING_JOB')) and not job_environment.get('SWARMING_DIMENSIONS'):
logs.info('[DEBUG] No swarming env var', job_name=job_name)
return False

try:
_get_new_task_spec(command, job_name, '')
return True
except ValueError:
swarming_config = _get_swarming_config()
if swarming_config is None:
logs.warning('[Swarming] current task is not suitable for swarming. ' \
'Reason: failed to retrieve config.')
return False

return _get_instance_spec(swarming_config, job) is not None


def _get_task_name():
return 't-' + str(uuid.uuid4()).lower()
def _get_instance_spec(swarming_config: local_config.SwarmingConfig,
job: data_types.Job) -> dict | None:
return swarming_config.get('mapping').get(job.platform, None)


def _get_swarming_config():
def _get_task_name(job_name: str):
return 't-' + str(uuid.uuid4()).lower() + ' ' + job_name


def _get_swarming_config() -> local_config.SwarmingConfig | None:
"""Returns the swarming config."""
return local_config.SwarmingConfig()
try:
return local_config.SwarmingConfig()
except (BadConfigError, ValueError) as e:
logs.error(f'[Swarming] Failed to retrieve config: {e}')
return None


def _get_task_dimensions(job: data_types.Job, platform_specific_dimensions: list
) -> list[swarming_pb2.StringPair]: # pylint: disable=no-member
""" Gets all swarming dimensions for a task.
Job dimensions have more precedence than static dimensions"""
swarming_config = _get_swarming_config()
if not swarming_config:
logs.warning(
'[Swarming] No dimensions set. Reason: failed to retrieve config')
return []

unique_dimensions = {}
unique_dimensions['os'] = job.platform
unique_dimensions['os'] = str(job.platform).capitalize()
unique_dimensions['pool'] = _get_swarming_config().get('swarming_pool')

for dimension in platform_specific_dimensions:
Expand Down Expand Up @@ -99,16 +123,27 @@ def _env_vars_to_json(
value=json.dumps(env_vars_dict))


def _get_new_task_spec(command: str, job_name: str,
download_url: str) -> swarming_pb2.NewTaskRequest: # pylint: disable=no-member
"""Gets the configured specifications for a swarming task."""
def create_new_task_request(command: str, job_name: str, download_url: str
) -> swarming_pb2.NewTaskRequest | None: # pylint: disable=no-member
"""Gets the configured specifications for a swarming task.
Returns None if the task should'nt be executed on swarming
or if the SWARMING_REMOTE_EXECUTION flag is disabled."""
if not FeatureFlags.SWARMING_REMOTE_EXECUTION.enabled:
return None

job = data_types.Job.query(data_types.Job.name == job_name).get()
config_name = job.platform
if job is None:
return None

swarming_config = _get_swarming_config()
instance_spec = swarming_config.get('mapping').get(config_name, None)
if not swarming_config:
return None

instance_spec = _get_instance_spec(swarming_config, job)
if instance_spec is None:
raise ValueError(f'No mapping for {config_name}')
swarming_realm = swarming_config.get('swarming_realm')
return None

swarming_realm = swarming_config.get('swarming_realm',)
logs_project_id = swarming_config.get('logs_project_id')
priority = instance_spec['priority']
startup_command = instance_spec['command']
Expand All @@ -133,6 +168,7 @@ def _get_new_task_spec(command: str, job_name: str,
swarming_pb2.StringPair(key='UWORKER', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='SWARMING_BOT', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='LOG_TO_GCP', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair(key='IS_K8S_ENV', value='True'), # pylint: disable=no-member
swarming_pb2.StringPair( # pylint: disable=no-member
key='LOGGING_CLOUD_PROJECT_ID',
value=logs_project_id),
Expand All @@ -153,7 +189,7 @@ def _get_new_task_spec(command: str, job_name: str,
cas_input_root = instance_spec.get('cas_input_root', {})

new_task_request = swarming_pb2.NewTaskRequest( # pylint: disable=no-member
name=_get_task_name(),
name=_get_task_name(job_name),
priority=priority,
realm=swarming_realm,
service_account=service_account,
Expand All @@ -174,14 +210,18 @@ def _get_new_task_spec(command: str, job_name: str,
return new_task_request


def push_swarming_task(command, download_url, job_type):
def push_swarming_task(task_request: swarming_pb2.NewTaskRequest): # pylint: disable=no-member
"""Schedules a task on swarming."""
job = data_types.Job.query(data_types.Job.name == job_type).get()
if not job:
raise ValueError('invalid job_name')

task_spec = _get_new_task_spec(command, job_type, download_url)
creds, _ = credentials.get_default(_SWARMING_SCOPES)
swarming_config = _get_swarming_config()
if not swarming_config:
logs.error(
'[Swarming] Failed to push task into swarming. Reason: No config.')
return
creds = credentials.get_scoped_service_account_credentials(_SWARMING_SCOPES)
if not creds:
logs.error(
'[Swarming] Failed to push task into swarming. Reason: No credentials.')
return

if not creds.token:
creds.refresh(requests.Request())
Expand All @@ -193,5 +233,8 @@ def push_swarming_task(command, download_url, job_type):
}
swarming_server = _get_swarming_config().get('swarming_server')
url = f'https://{swarming_server}/prpc/swarming.v2.Tasks/NewTask'
utils.post_url(
url=url, data=json_format.MessageToJson(task_spec), headers=headers)
message_body = json_format.MessageToJson(task_request)
logs.info(f"""[Swarming] Pushing task to {url}
as {creds.service_account_email} with {message_body}""")
response = utils.post_url(url=url, data=message_body, headers=headers)
logs.info(f'[Swarming] Response: {response}')
10 changes: 6 additions & 4 deletions src/clusterfuzz/_internal/swarming/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ def create_utask_main_job(self, module: str, job_type: str,

return result[0]

@logs.task_stage_context(logs.Stage.SCHEDULER)
def create_utask_main_jobs(self,
remote_tasks: list[remote_task_types.RemoteTask]
) -> list[remote_task_types.RemoteTask]:
"""Creates many remote tasks for uworker main tasks.
Returns the tasks that couldn't be created.
"""
unscheduled_tasks = []
logs.info(f'[Swarming] Pushing {len(remote_tasks)} tasks trough service.')
for task in remote_tasks:
try:
if not swarming.is_swarming_task(task.command, task.job_type):
if not swarming.is_swarming_task(task.job_type):
unscheduled_tasks.append(task)
continue

swarming.push_swarming_task(task.command, task.input_download_url,
task.job_type)
if request := swarming.create_new_task_request(
task.command, task.job_type, task.argument):
swarming.push_swarming_task(request)
except Exception: # pylint: disable=broad-except
logs.error(
f'Failed to push task to Swarming: {task.command}, {task.job_type}.'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,13 @@ def test_is_swarming_applicable(self, mock_swarming_flag):
mock_swarming_flag.return_value = False
self.assertFalse(self.gate._is_swarming_applicable())

@mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.task_utils')
@mock.patch('clusterfuzz._internal.remote_task.remote_task_gate.swarming')
def test_is_swarming_task(self, mock_swarming, mock_task_utils):
def test_is_swarming_task(self, mock_swarming):
"""Tests _is_swarming_task."""
mock_task_utils.get_command_from_module.return_value = 'fuzz'
mock_swarming.is_swarming_task.return_value = True

self.assertTrue(self.gate._is_swarming_task('module', 'job'))
mock_swarming.is_swarming_task.assert_called_once_with('fuzz', 'job')
self.assertTrue(self.gate._is_swarming_task('job'))
mock_swarming.is_swarming_task.assert_called_once_with('job')

def test_handle_swarming_job(self):
"""Tests _handle_swarming_job."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ def setUp(self):
helpers.patch(self, [
'clusterfuzz._internal.swarming.is_swarming_task',
'clusterfuzz._internal.swarming.push_swarming_task',
'clusterfuzz._internal.swarming.create_new_task_request',
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module',
'clusterfuzz._internal.metrics.logs.error',
])
self.service = service.SwarmingService()
self.mock.create_new_task_request.return_value = 'fake_request'

def test_create_utask_main_job_success(self):
"""Test creating a single task successfully."""
Expand All @@ -44,8 +46,7 @@ def test_create_utask_main_job_success(self):
# Success returns None in this interface (consistent with GcpBatchService)
self.assertIsNone(result)

self.mock.push_swarming_task.assert_called_once_with(
'fuzz', 'http://url', 'job_type')
self.mock.push_swarming_task.assert_called_once_with('fake_request')

def test_create_utask_main_job_failure(self):
"""Test creating a single task that is not a swarming task."""
Expand Down Expand Up @@ -78,8 +79,8 @@ def test_create_utask_main_jobs_mixed_results(self):

self.assertEqual(self.mock.push_swarming_task.call_count, 2)
self.mock.push_swarming_task.assert_has_calls([
mock.call('fuzz', 'url1', 'job1'),
mock.call('fuzz', 'url3', 'job3'),
mock.call('fake_request'),
mock.call('fake_request'),
])

def test_create_utask_main_jobs_all_success(self):
Expand Down
Loading
Loading