diff --git a/bugwarrior/services/azuredevops.py b/bugwarrior/services/azuredevops.py index 3fb8b47c..f8182a74 100644 --- a/bugwarrior/services/azuredevops.py +++ b/bugwarrior/services/azuredevops.py @@ -185,8 +185,10 @@ class AzureDevopsService(Service[AzureDevopsIssue]): ISSUE_CLASS = AzureDevopsIssue CONFIG_SCHEMA = AzureDevopsConfig - def __init__(self, *args: Any, **kw: Any) -> None: - super().__init__(*args, **kw) + def __init__( + self, config: AzureDevopsConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.client = AzureDevopsClient( pat=self.get_secret('PAT'), project=self.config.project, diff --git a/bugwarrior/services/bitbucket.py b/bugwarrior/services/bitbucket.py index dfaf10c2..ed0b734d 100644 --- a/bugwarrior/services/bitbucket.py +++ b/bugwarrior/services/bitbucket.py @@ -79,7 +79,7 @@ def get_default_description(self) -> str: ) -class BitbucketService(Service[BitbucketIssue], Client): +class BitbucketService(Service[BitbucketIssue]): API_VERSION = 1.0 ISSUE_CLASS = BitbucketIssue CONFIG_SCHEMA = BitbucketConfig @@ -87,8 +87,10 @@ class BitbucketService(Service[BitbucketIssue], Client): BASE_API2 = 'https://api.bitbucket.org/2.0' BASE_URL = 'https://bitbucket.org/' - def __init__(self, *args: Any, **kw: Any) -> None: - super().__init__(*args, **kw) + def __init__( + self, config: BitbucketConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) oauth = (self.config.key, self.get_secret('secret', self.config.key)) refresh_token = self.main_config.data.get('bitbucket_refresh_token') @@ -135,7 +137,7 @@ def filter_repos(self, repo_tag: str) -> bool: def get_data(self, url: str) -> dict[str, Any]: """Perform a request to the fully qualified url and return json.""" - return self.json_response(requests.get(url, **self.requests_kwargs)) + return Client.json_response(requests.get(url, **self.requests_kwargs)) def get_collection(self, url: str) -> Iterator[Any]: """Pages through an object collection from the bitbucket API. diff --git a/bugwarrior/services/bts.py b/bugwarrior/services/bts.py index 62092a92..f23b263b 100644 --- a/bugwarrior/services/bts.py +++ b/bugwarrior/services/bts.py @@ -1,5 +1,7 @@ +from collections.abc import Iterable, Iterator import logging import typing +from typing import Any import debianbts import pydantic @@ -7,6 +9,7 @@ import requests from bugwarrior import config +from bugwarrior.config import Priority from bugwarrior.services import Client, Issue, Service log = logging.getLogger(__name__) @@ -30,13 +33,13 @@ class BTSConfig(config.ServiceConfig): also_unassigned: config.UnsupportedOption[bool] = False @model_validator(mode='after') - def require_email_or_packages(self): + def require_email_or_packages(self) -> "BTSConfig": if not self.email and not self.packages: raise ValueError('section requires one of:\n email\n packages') return self @model_validator(mode='after') - def udd_needs_email(self): + def udd_needs_email(self) -> "BTSConfig": if self.udd and not self.email: raise ValueError("no 'email' but UDD search was requested") return self @@ -62,7 +65,7 @@ class BTSIssue(Issue): } UNIQUE_KEY = (URL,) - PRIORITY_MAP = { + PRIORITY_MAP: dict[str, Priority] = { 'wishlist': 'L', 'minor': 'L', 'normal': 'M', @@ -72,7 +75,7 @@ class BTSIssue(Issue): 'critical': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'priority': self.get_priority(), 'annotations': self.extra.get('annotations', []), @@ -85,7 +88,7 @@ def to_taskwarrior(self): self.STATUS: self.record['status'], } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['subject'], url=self.record['url'], @@ -93,22 +96,22 @@ def get_default_description(self): cls='issue', ) - def get_priority(self): + def get_priority(self) -> config.Priority: return self.PRIORITY_MAP.get( self.record.get('severity', ''), self.config.default_priority ) -class BTSService(Service, Client): +class BTSService(Service[BTSIssue]): API_VERSION = 1.0 ISSUE_CLASS = BTSIssue CONFIG_SCHEMA = BTSConfig @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: BTSConfig) -> str: return 'bts://' - def _record_for_bug(self, bug): + def _record_for_bug(self, bug: debianbts.Bugreport) -> dict[str, Any]: return { 'number': bug.bug_num, 'url': 'https://bugs.debian.org/' + str(bug.bug_num), @@ -120,17 +123,17 @@ def _record_for_bug(self, bug): 'status': bug.pending, } - def _get_udd_bugs(self): + def _get_udd_bugs(self) -> Iterable[dict[str, Any]]: request_params = {'format': 'json', 'dmd': 1, 'email1': self.config.email} if self.config.udd_ignore_sponsor: request_params['nosponsor1'] = "on" resp = requests.get(UDD_BUGS_SEARCH, request_params) - return self.json_response(resp) + return Client.json_response(resp) - def annotations(self, issue): + def annotations(self, issue: dict[str, Any]) -> list[str]: return self.build_annotations([], issue['url']) - def issues(self): + def issues(self) -> Iterator[BTSIssue]: # Initialise empty list of bug numbers collected_bugs = [] diff --git a/bugwarrior/services/bz.py b/bugwarrior/services/bz.py index 0fcb32d1..d1e98a1c 100644 --- a/bugwarrior/services/bz.py +++ b/bugwarrior/services/bz.py @@ -1,8 +1,9 @@ +from collections.abc import Iterator import datetime import logging import time import typing -from typing import Annotated +from typing import Annotated, Any import urllib.parse import xmlrpc.client @@ -17,7 +18,7 @@ log = logging.getLogger(__name__) -def validate_url(value: str): +def validate_url(value: str) -> str: if not urllib.parse.urlparse(value).scheme: value = f'https://{value}' log.warning( @@ -87,7 +88,7 @@ class BugzillaIssue(Issue): 'urgent': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: task = { 'project': self.record['component'], 'priority': self.get_priority(), @@ -107,7 +108,7 @@ def to_taskwarrior(self): return task - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['summary'], url=self.extra['url'], @@ -116,7 +117,7 @@ def get_default_description(self): ) -class BugzillaService(Service): +class BugzillaService(Service[BugzillaIssue]): API_VERSION = 1.0 ISSUE_CLASS = BugzillaIssue CONFIG_SCHEMA = BugzillaConfig @@ -133,8 +134,10 @@ class BugzillaService(Service): 'assigned_to', ] - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: BugzillaConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) log.debug(" filtering on statuses: %r", self.config.open_statuses) force_rest_kwargs = {} @@ -156,13 +159,13 @@ def __init__(self, *args, **kw): self.bz.login(self.config.username, password) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: BugzillaConfig) -> str: return f"bugzilla://{config.username}@{config.base_uri}" - def get_owner(self, issue): + def get_owner(self, issue: dict[str, Any]) -> str: return issue['assigned_to'] - def include(self, issue): + def include(self, issue: dict[str, Any]) -> bool: """Return true if the issue in question should be included""" if self.config.only_if_assigned: owner = self.get_owner(issue) @@ -175,7 +178,7 @@ def include(self, issue): return True - def annotations(self, tag, issue): + def annotations(self, tag: str, issue: dict[str, Any]) -> list[str]: base_url = "%s/show_bug.cgi?id=" % self.config.base_uri long_url = base_url + str(issue['id']) url = long_url @@ -192,20 +195,21 @@ def annotations(self, tag, issue): # version of bugzilla itself. :( comments = issue.get('longdescs', []) - def _parse_author(obj): + def _parse_author(obj: dict[str, Any] | str) -> str: if isinstance(obj, dict): return obj['login_name'].split('@')[0] else: return obj - def _parse_body(obj): + def _parse_body(obj: dict[str, Any]) -> str | None: return obj.get('text', obj.get('body')) return self.build_annotations( - ((_parse_author(c['author']), _parse_body(c)) for c in comments), url + ((_parse_author(c['author']), _parse_body(c) or "") for c in comments), + url, ) - def issues(self): + def issues(self) -> Iterator[BugzillaIssue]: email = self.config.username # TODO -- doing something with blockedby would be nice. @@ -286,7 +290,7 @@ def issues(self): issue_obj.extra.update(extra) yield issue_obj - def _get_assigned_date(self, issue): + def _get_assigned_date(self, issue: dict[str, Any]) -> str | None: bug = self.bz.getbug(issue['id']) history = bug.get_history_raw()['bugs'][0]['history'] @@ -297,7 +301,7 @@ def _get_assigned_date(self, issue): return _ensure_datetime(h['when']).isoformat() -def _get_bug_attr(bug, attr): +def _get_bug_attr(bug: Any, attr: str) -> Any: """Default longdescs/flags case to [] since they may not be present.""" if attr in ("longdescs", "flags"): return getattr(bug, attr, []) diff --git a/bugwarrior/services/clickup.py b/bugwarrior/services/clickup.py index aab50720..2e23cd7f 100644 --- a/bugwarrior/services/clickup.py +++ b/bugwarrior/services/clickup.py @@ -1,7 +1,8 @@ +from collections.abc import Iterator import datetime import logging import typing -from typing import Generator, Optional +from typing import Any, Generator, Optional import requests @@ -20,11 +21,11 @@ class ClickupConfig(config.ServiceConfig): class ClickupClient(Client): """Abstraction of Clickup API v2""" - def __init__(self, token): + def __init__(self, token: str) -> None: self.token = token @staticmethod - def _get_url_for_tasks(team_id: int, page: int = 0): + def _get_url_for_tasks(team_id: int, page: int = 0) -> str: base_url = "https://api.clickup.com/api/v2/" query = f"include_closed=false&page={page}" return f"{base_url}team/{team_id}/task?{query}" @@ -78,7 +79,7 @@ class ClickupIssue(Issue): PRIORITY_MAP = {"urgent": "H", "high": "M", "normal": "L", "low": ""} - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: if not self.record["project"]["hidden"]: project = self.record["project"]["name"] else: @@ -102,7 +103,7 @@ def to_taskwarrior(self): self.NAME: self.record["name"], } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record["name"], url=self.record["url"] ) @@ -118,17 +119,19 @@ def parse_timestamp( return datetime.datetime.fromtimestamp(seconds_unix, tz=datetime.timezone.utc) -class ClickupService(Service): +class ClickupService(Service[ClickupIssue]): API_VERSION = 1.0 ISSUE_CLASS = ClickupIssue CONFIG_SCHEMA = ClickupConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: ClickupConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.client = ClickupClient(token=self.get_secret('token')) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: ClickupConfig) -> str: return "clickup://" def is_assigned(self, issue: dict) -> bool: @@ -144,7 +147,7 @@ def is_assigned(self, issue: dict) -> bool: return False - def issues(self): + def issues(self) -> Iterator[ClickupIssue]: for task in self.client.get_tasks_for_team(self.config.team_id): if self.is_assigned(task): yield self.get_issue_for_record(task) diff --git a/bugwarrior/services/deck.py b/bugwarrior/services/deck.py index 86cf4682..3b70a46a 100644 --- a/bugwarrior/services/deck.py +++ b/bugwarrior/services/deck.py @@ -1,6 +1,8 @@ +from collections.abc import Iterator import datetime import logging import typing +from typing import Any import requests @@ -31,7 +33,7 @@ class NextcloudDeckConfig(config.ServiceConfig): # * Cards will be mapped to tasks # * Labels will be mapped to tags class NextcloudDeckClient(Client): - def __init__(self, base_uri, username, password): + def __init__(self, base_uri: str, username: str, password: str) -> None: self.api_base_path = f'{base_uri}/index.php/apps/deck/api/v1.0' self.ocs_base_path = f'{base_uri}/ocs/v2.php/apps/deck/api/v1.0' @@ -42,17 +44,17 @@ def __init__(self, base_uri, username, password): ) # see https://deck.readthedocs.io/en/latest/API/#boards for API docs - def get_boards(self): + def get_boards(self) -> list[dict[str, Any]]: response = self.session.get(f'{self.api_base_path}/boards') return response.json() # see https://deck.readthedocs.io/en/latest/API/#stacks for API docs - def get_stacks(self, board_id): + def get_stacks(self, board_id: int) -> list[dict[str, Any]]: response = self.session.get(f'{self.api_base_path}/boards/{board_id}/stacks') return response.json() # see https://deck.readthedocs.io/en/latest/API/#comments for API docs - def get_comments(self, card_id): + def get_comments(self, card_id: int) -> dict[str, Any]: response = self.session.get( f'{self.ocs_base_path}/cards/{card_id}/comments?limit=100&offset=0' ) @@ -88,7 +90,7 @@ class NextcloudDeckIssue(Issue): PRIORITY_MAP = {} # FIXME - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['board']['title'].lower().replace(' ', '_'), 'priority': self.get_priority(), @@ -114,22 +116,24 @@ def to_taskwarrior(self): ), } - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels( [label['title'] for label in self.record['labels']] ) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description(title=self.record['title']) -class NextcloudDeckService(Service): +class NextcloudDeckService(Service[NextcloudDeckIssue]): API_VERSION = 1.0 ISSUE_CLASS = NextcloudDeckIssue CONFIG_SCHEMA = NextcloudDeckConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: NextcloudDeckConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.client = NextcloudDeckClient( base_uri=self.config.base_uri, @@ -138,13 +142,16 @@ def __init__(self, *args, **kwargs): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: NextcloudDeckConfig) -> str: return f'deck://{config.username}@{config.base_uri}' - def get_owner(self, issue): - return issue[issue.ASSIGNEE] + def get_owner(self, issue: NextcloudDeckIssue) -> str | None: + rec = issue.record + if rec.get('assignedUsers'): + return rec['assignedUsers'][0]['participant']['uid'] + return None - def include(self, issue): + def include(self, issue: NextcloudDeckIssue) -> bool: """Return true if the issue in question should be included""" if self.config.only_if_assigned: owner = self.get_owner(issue) @@ -157,7 +164,7 @@ def include(self, issue): return True - def filter_boards(self, board): + def filter_boards(self, board: dict[str, Any]) -> bool: # include_board_ids takes precedence over exclude_board_ids if self.config.include_board_ids: return str(board['id']) in self.config.include_board_ids @@ -166,7 +173,7 @@ def filter_boards(self, board): # no filters defined: then it's included return True - def annotations(self, card): + def annotations(self, card: dict[str, Any]) -> list[str]: comments = ( self.client.get_comments(card['id'])['ocs']['data'] if self.main_config.annotation_comments @@ -176,7 +183,7 @@ def annotations(self, card): ((comment['actorDisplayName'], comment['message']) for comment in comments) ) - def issues(self): + def issues(self) -> Iterator[NextcloudDeckIssue]: for board in self.client.get_boards(): if self.filter_boards(board): for stack in self.client.get_stacks(board['id']): diff --git a/bugwarrior/services/gerrit.py b/bugwarrior/services/gerrit.py index a2a9b2b8..f81728f5 100644 --- a/bugwarrior/services/gerrit.py +++ b/bugwarrior/services/gerrit.py @@ -1,12 +1,14 @@ +from collections.abc import Iterator import json import logging import typing +from typing import Any import requests import requests.auth from bugwarrior import config -from bugwarrior.services import Client, Issue, Service +from bugwarrior.services import Issue, Service log = logging.getLogger(__name__) @@ -45,7 +47,7 @@ class GerritIssue(Issue): } UNIQUE_KEY = (URL,) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.record['project'], 'annotations': self.extra['annotations'], @@ -60,7 +62,7 @@ def to_taskwarrior(self): self.WORK_IN_PROGRESS: int(self.record.get('work_in_progress', 0)), } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['subject'], url=self.extra['url'], @@ -69,13 +71,15 @@ def get_default_description(self): ) -class GerritService(Service, Client): +class GerritService(Service[GerritIssue]): API_VERSION = 1.0 ISSUE_CLASS = GerritIssue CONFIG_SCHEMA = GerritConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: GerritConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.password = self.get_secret('password', self.config.username) self.session = requests.session() self.session.headers.update( @@ -99,10 +103,10 @@ def __init__(self, *args, **kw): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: GerritConfig) -> str: return f"gerrit://{config.base_uri}" - def issues(self): + def issues(self) -> Iterator[GerritIssue]: # Construct the whole url by hand here, because otherwise requests will # percent-encode the ':' characters, which gerrit doesn't like. url = self.config.base_uri + '/a/changes/?q=' + self.query_string @@ -119,10 +123,10 @@ def issues(self): } yield self.get_issue_for_record(change, extra) - def build_url(self, change): + def build_url(self, change: dict[str, Any]) -> str: return '%s/#/c/%i/' % (self.config.base_uri, change['_number']) - def annotations(self, change): + def annotations(self, change: dict[str, Any]) -> list[str]: entries = [] for item in change['messages']: for key in ['name', 'username', 'email']: diff --git a/bugwarrior/services/gitbug.py b/bugwarrior/services/gitbug.py index 513b61aa..608bc7a4 100644 --- a/bugwarrior/services/gitbug.py +++ b/bugwarrior/services/gitbug.py @@ -1,3 +1,4 @@ +from collections.abc import Iterator import logging import os import signal @@ -27,11 +28,11 @@ class GitBugConfig(config.ServiceConfig): class Webui: - def __init__(self, path, port): + def __init__(self, path: str, port: int) -> None: self.path = path self.port = port - def __enter__(self): + def __enter__(self) -> "Webui": popen_kwargs: dict[str, Any] = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP @@ -56,7 +57,7 @@ def __enter__(self): return self - def __exit__(self, *exc): + def __exit__(self, *exc: Any) -> Literal[False]: if self.webui.returncode is None: if sys.platform == "win32": os.kill(self.webui.pid, signal.SIGTERM) @@ -67,19 +68,19 @@ def __exit__(self, *exc): class GitBugClient(Client): - def __init__(self, path, port, annotation_comments): + def __init__(self, path: str, port: int, annotation_comments: bool) -> None: self.path = path self.port = port self.annotation_comments = annotation_comments - def _query_graphql(self, query): + def _query_graphql(self, query: str) -> dict[str, Any]: with Webui(self.path, self.port): response = requests.post( f'http://127.0.0.1:{self.port}/graphql', json={'query': query} ) return self.json_response(response)['data'] - def get_issues(self): + def get_issues(self) -> list[dict[str, Any]]: return self._query_graphql( '{ repository { allBugs { nodes { %s } } } }' % ' '.join( @@ -114,7 +115,7 @@ class GitBugIssue(Issue): UNIQUE_KEY = (ID,) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.config.target, 'priority': self.config.default_priority, @@ -127,22 +128,24 @@ def to_taskwarrior(self): self.TITLE: self.record['title'], } - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels( [label['name'] for label in self.record['labels']] ) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description(title=self.record['title'], cls='bug') -class GitBugService(Service): +class GitBugService(Service[GitBugIssue]): API_VERSION = 1.0 ISSUE_CLASS = GitBugIssue CONFIG_SCHEMA = GitBugConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: GitBugConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.client = GitBugClient( path=self.config.path, @@ -151,10 +154,10 @@ def __init__(self, *args, **kwargs): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: GitBugConfig) -> str: return f'gitbug://{config.path}' - def issues(self): + def issues(self) -> Iterator[GitBugIssue]: for issue in self.client.get_issues(): comments = issue.pop('comments') issue['description'] = comments['nodes'].pop(0)['message'] diff --git a/bugwarrior/services/github.py b/bugwarrior/services/github.py index fde9b8ac..fc589007 100644 --- a/bugwarrior/services/github.py +++ b/bugwarrior/services/github.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator import logging import re import sys import typing +from typing import Any import urllib.parse from pydantic import ValidationInfo, field_validator, model_validator @@ -12,6 +14,11 @@ log = logging.getLogger(__name__) +# (repo_name, issue_data) +GithubIssueEntry = tuple[str, dict[str, Any]] +# {issue_url: (repo_name, issue_data)} +GithubIssueMap = dict[str, GithubIssueEntry] + class GithubConfig(config.ServiceConfig): password: str = 'Deprecated' @@ -42,7 +49,7 @@ class GithubConfig(config.ServiceConfig): ignore_user_comments: config.ConfigList = [] @model_validator(mode='after') - def deprecate_password(self): + def deprecate_password(self) -> "GithubConfig": if self.password != 'Deprecated': log.warning( 'Basic auth is no longer supported. Please remove ' @@ -51,14 +58,16 @@ def deprecate_password(self): return self @model_validator(mode='after') - def require_username_or_query(self): + def require_username_or_query(self) -> "GithubConfig": if not self.username and not self.query: raise ValueError('section requires one of:\n username\n query') return self @field_validator('issue_urls', mode='after') @classmethod - def issue_urls_consistent_with_host(cls, value, info: ValidationInfo): + def issue_urls_consistent_with_host( + cls, value: list[str], info: ValidationInfo + ) -> list[str]: issue_url_paths = [] # host can be None if it raised a ValidationError (e.g. if it has a scheme) @@ -78,7 +87,7 @@ def issue_urls_consistent_with_host(cls, value, info: ValidationInfo): return issue_url_paths @model_validator(mode='after') - def require_username_if_include_user_repos(self): + def require_username_if_include_user_repos(self) -> "GithubConfig": if self.include_user_repos and not self.username: raise ValueError( 'username required when include_user_repos is True (default)' @@ -87,7 +96,7 @@ def require_username_if_include_user_repos(self): class GithubClient(Client): - def __init__(self, host, auth): + def __init__(self, host: str, auth: dict[str, Any]) -> None: self.host = host self.auth = auth self.session = requests.Session() @@ -99,7 +108,7 @@ def __init__(self, host, auth): if 'basic' in self.auth: self.kwargs['auth'] = self.auth['basic'] - def _api_url(self, path, **context): + def _api_url(self, path: str, **context: Any) -> str: """Build the full url to the API endpoint""" if self.host == 'github.com': baseurl = "https://api.github.com" @@ -107,25 +116,25 @@ def _api_url(self, path, **context): baseurl = f"https://{self.host}/api/v3" return baseurl + path.format(**context) - def get_repos(self, username): + def get_repos(self, username: str) -> list[Any]: user_repos = self._getter(self._api_url("/user/repos?per_page=100")) public_repos = self._getter( self._api_url("/users/{username}/repos?per_page=100", username=username) ) return user_repos + public_repos - def get_query(self, query): + def get_query(self, query: str) -> list[Any]: """Run a generic issue/PR query""" url = self._api_url("/search/issues?q={query}&per_page=100", query=query) return self._getter(url, subkey='items') - def get_issues(self, username, repo): + def get_issues(self, username: str, repo: str) -> list[Any]: url = self._api_url( "/repos/{username}/{repo}/issues?per_page=100", username=username, repo=repo ) return self._getter(url) - def get_directly_assigned_issues(self): + def get_directly_assigned_issues(self) -> list[Any]: """Returns all issues assigned to authenticated user. List issues assigned to the authenticated user across all visible @@ -135,13 +144,13 @@ def get_directly_assigned_issues(self): url = self._api_url("/issues?per_page=100") return self._getter(url) - def get_issue_for_url_path(self, url_path): + def get_issue_for_url_path(self, url_path: str) -> dict[str, Any]: # The pull request url is '/pull/' but the api path is '/pulls/'. api_path = re.sub(r'pull(?=/[0-9]*$)', 'pulls', url_path) url = self._api_url(f'/repos{api_path}') return self.json_response(self._request(url)) - def get_comments(self, username, repo, number): + def get_comments(self, username: str, repo: str, number: int) -> list[Any]: url = self._api_url( "/repos/{username}/{repo}/issues/{number}/comments?per_page=100", username=username, @@ -150,13 +159,13 @@ def get_comments(self, username, repo, number): ) return self._getter(url) - def get_pulls(self, username, repo): + def get_pulls(self, username: str, repo: str) -> list[Any]: url = self._api_url( "/repos/{username}/{repo}/pulls?per_page=100", username=username, repo=repo ) return self._getter(url) - def _getter(self, url, subkey=None): + def _getter(self, url: str, subkey: str | None = None) -> list[Any]: """Pagination utility. Obnoxious.""" results = [] link = dict(next=url) @@ -174,7 +183,7 @@ def _getter(self, url, subkey=None): return results - def _request(self, url): + def _request(self, url: str) -> requests.Response: response = self.session.get(url, **self.kwargs) # Warn about the mis-leading 404 error code. See: @@ -190,7 +199,7 @@ def _request(self, url): return response @staticmethod - def _link_field_to_dict(field): + def _link_field_to_dict(field: str | None) -> dict[str, str]: """Utility for ripping apart github's Link header field. It's kind of ugly. """ @@ -238,7 +247,7 @@ class GithubIssue(Issue): } UNIQUE_KEY = (URL, TYPE) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: milestone = self.record['milestone'] if milestone: milestone = milestone['title'] @@ -270,11 +279,11 @@ def to_taskwarrior(self): self.DRAFT: int(self.record.get('draft', 0)), } - def get_tags(self): + def get_tags(self) -> list[str]: labels = [label['name'] for label in self.record.get('labels', [])] return self.get_tags_from_labels(labels) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['title'], url=self.record['html_url'], @@ -283,29 +292,31 @@ def get_default_description(self): ) -class GithubService(Service): +class GithubService(Service[GithubIssue]): API_VERSION = 1.0 ISSUE_CLASS = GithubIssue CONFIG_SCHEMA = GithubConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: GithubConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) auth = {'token': self.get_secret('token', self.config.login)} self.client = GithubClient(self.config.host, auth) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: GithubConfig) -> str: return f"github://{config.login}@{config.host}/{config.username}" - def get_owned_repo_issues(self, tag): + def get_owned_repo_issues(self, tag: str) -> GithubIssueMap: """Grab all the issues""" issues = {} for issue in self.client.get_issues(*tag.split('/')): issues[issue['url']] = (tag, issue) return issues - def get_query(self, query): + def get_query(self, query: str) -> GithubIssueMap: """Grab all issues matching a github query""" issues = {} for issue in self.client.get_query(query): @@ -318,14 +329,14 @@ def get_query(self, query): issues[url] = (repo, issue) return issues - def get_directly_assigned_issues(self): + def get_directly_assigned_issues(self) -> GithubIssueMap: issues = {} for issue in self.client.get_directly_assigned_issues(): repo = self.get_repository_from_issue(issue) issues[issue['url']] = (repo, issue) return issues - def get_issues_by_url(self): + def get_issues_by_url(self) -> GithubIssueMap: issues = {} for url_path in self.config.issue_urls: issue = self.client.get_issue_for_url_path(url_path) @@ -336,7 +347,7 @@ def get_issues_by_url(self): return issues @classmethod - def get_repository_from_issue(cls, issue): + def get_repository_from_issue(cls, issue: dict[str, Any]) -> str: if 'repo' in issue: return issue['repo'] if 'repos_url' in issue: @@ -350,11 +361,11 @@ def get_repository_from_issue(cls, issue): raise ValueError(f"Unrecognized URL: {url}.") return tag.group(1) - def _comments(self, tag, number): + def _comments(self, tag: str, number: int) -> list[Any]: user, repo = tag.split('/') return self.client.get_comments(user, repo, number) - def annotations(self, tag, issue): + def annotations(self, tag: str, issue: dict[str, Any]) -> list[str]: url = issue['html_url'] annotations = [] if self.main_config.annotation_comments: @@ -369,7 +380,7 @@ def annotations(self, tag, issue): annotations.append((login, c['body'])) return self.build_annotations(annotations, url) - def body(self, issue): + def body(self, issue: dict[str, Any]) -> str | None: body = issue['body'] if body: @@ -379,25 +390,25 @@ def body(self, issue): return body - def _reqs(self, tag): + def _reqs(self, tag: str) -> list[tuple[str, Any]]: """Grab all the pull requests""" return [(tag, i) for i in self.client.get_pulls(*tag.split('/'))] - def get_owner(self, issue): + def get_owner(self, issue: GithubIssueEntry) -> str | None: if issue[1]['assignee']: return issue[1]['assignee']['login'] - def filter_issues(self, issue): + def filter_issues(self, issue: Any) -> bool: repo, _ = issue return self.filter_repo_name(repo.split('/')[-3]) - def filter_repos(self, repo): + def filter_repos(self, repo: dict[str, Any]) -> bool: if repo['owner']['login'] != self.config.username: return False return self.filter_repo_name(repo['name']) - def filter_repo_name(self, name): + def filter_repo_name(self, name: str) -> bool: if name in self.config.exclude_repos: return False @@ -409,7 +420,7 @@ def filter_repo_name(self, name): return True - def include(self, issue): + def include(self, issue: GithubIssueEntry) -> bool: """Return true if the issue in question should be included""" if 'pull_request' in issue[1]: if self.config.exclude_pull_requests: @@ -428,7 +439,7 @@ def include(self, issue): return True - def issues(self): + def issues(self) -> Iterator[GithubIssue]: issues = {} if self.config.query: issues.update(self.get_query(self.config.query)) diff --git a/bugwarrior/services/gitlab.py b/bugwarrior/services/gitlab.py index 51a21924..3f5fbd49 100644 --- a/bugwarrior/services/gitlab.py +++ b/bugwarrior/services/gitlab.py @@ -1,6 +1,8 @@ +from collections.abc import Callable, Iterator import logging import sys import typing +from typing import Any from urllib.parse import quote, urlencode from pydantic import ValidationInfo, field_validator, model_validator @@ -14,6 +16,13 @@ DefaultPriority = typing.Literal['', 'L', 'M', 'H', 'unassigned'] +# (project_id, issue_data) +GitlabIssueEntry = tuple[int, dict[str, Any]] +# {issue_id: (project_id, issue_data)} +GitlabIssueMap = dict[int, GitlabIssueEntry] +# (project_data | None, todo_data) +GitlabTodoEntry = tuple[dict[str, Any] | None, dict[str, Any]] + class GitlabConfig(config.ServiceConfig): _DEPRECATE_FILTER_MERGE_REQUESTS = True @@ -80,7 +89,7 @@ def default_priorities(cls, value: str, info: ValidationInfo) -> str: return value @model_validator(mode='after') - def filter_gitlab_dot_com(self): + def filter_gitlab_dot_com(self) -> "GitlabConfig": """ There must be a repository filter if the host is gitlab.com. @@ -107,7 +116,7 @@ def filter_gitlab_dot_com(self): @field_validator('owned', mode='before') @classmethod - def require_owned(cls, v): + def require_owned(cls, v: bool | None) -> bool: """ Migrate 'owned' field from default False to default True. @@ -129,8 +138,14 @@ class GitlabClient(Client): """Abstraction of Gitlab API v4""" def __init__( - self, host, token, only_if_assigned, also_unassigned, use_https, verify_ssl - ): + self, + host: str, + token: str, + only_if_assigned: str, + also_unassigned: bool, + use_https: bool, + verify_ssl: bool, + ) -> None: if use_https: self.scheme = 'https' else: @@ -162,10 +177,10 @@ def __init__( self.assignee_query = f'assignee_id={assignee_id}' if assignee_id else '' - def _base_url(self): + def _base_url(self) -> str: return f"{self.scheme}://{self.host}/api/v4/" - def _fetch(self, relative_url: str, skip_403: bool = False, **kwargs) -> dict: + def _fetch(self, relative_url: str, skip_403: bool = False, **kwargs: Any) -> Any: """Perform a fetch operation on the gitlab server :param relative_url: This part will be appended to the base api URL for the call @@ -227,7 +242,7 @@ def _fetch_paged( return full def get_repos( - self, include_repos: list, only_membership: bool, only_owned: bool + self, include_repos: list[str], only_membership: bool, only_owned: bool | None ) -> list: """Returns a list of repo objects for all repositories accessible. Respects config.include_repos. @@ -243,7 +258,7 @@ def get_repos( :rtype: list """ - all_repos = [] + all_repos: list = [] if include_repos: for repo in include_repos: if repo.startswith("id:"): @@ -266,7 +281,7 @@ def get_repos( self.repo_cache[item['id']] = item return all_repos - def _get_repo(self, repo_id: int) -> dict: + def _get_repo(self, repo_id: int) -> dict[str, Any]: """Queries information about a single repository as JSON dictionary :param repo_id: Project ID in the Gitlab instance @@ -275,7 +290,7 @@ def _get_repo(self, repo_id: int) -> dict: """ return self._fetch('projects/' + str(repo_id)) - def get_repo_cached(self, repo_id: int) -> dict: + def get_repo_cached(self, repo_id: int) -> dict[str, Any]: """Get repo information with a repo cache. Repo information will only be fetched the first time information about a certain repository is fetched. @@ -301,7 +316,7 @@ def get_notes(self, rid: int, issue_type: str, issueid: int) -> list: """ return self._fetch_paged(f'projects/{rid}/{issue_type}/{issueid}/notes') - def get_repo_issues(self, rid: int) -> dict: + def get_repo_issues(self, rid: int) -> GitlabIssueMap: """Get all issues from a repository as JSON dictionary :param rid: Project ID in the Gitlab instance @@ -312,7 +327,7 @@ def get_repo_issues(self, rid: int) -> dict: f'projects/{rid}/issues?state=opened&{self.assignee_query}' ) - def get_repo_merge_requests(self, rid: int) -> dict: + def get_repo_merge_requests(self, rid: int) -> GitlabIssueMap: """Get all merge_requests from a repository as JSON dictionary :param rid: Project ID in the Gitlab instance @@ -324,7 +339,9 @@ def get_repo_merge_requests(self, rid: int) -> dict: skip_403=True, ) - def get_issues_from_query(self, query: str, skip_403: bool = False) -> dict: + def get_issues_from_query( + self, query: str, skip_403: bool = False + ) -> GitlabIssueMap: """Get objects matching a query. Results will be returned in a dictionary where the key matches their project ID. @@ -332,20 +349,20 @@ def get_issues_from_query(self, query: str, skip_403: bool = False) -> dict: :type query: str :rtype: dict """ - issues = {} + issues: GitlabIssueMap = {} result = self._fetch_paged(query, skip_403=skip_403) for issue in result: issues[issue['id']] = (issue['project_id'], issue) return issues - def get_todos(self, query: str) -> list: + def get_todos(self, query: str) -> list[GitlabTodoEntry]: """Get all todo objects matching a query returned as list of (project_id, todo) tuples :param query: API query string that should get sent to the server :type query: str :rtype: list """ - todos = [] + todos: list[GitlabTodoEntry] = [] fetched_todos = self._fetch_paged(query) for todo in fetched_todos: todos.append((todo.get('project'), todo)) @@ -398,7 +415,7 @@ class GitlabIssue(Issue): UNIQUE_KEY = (REPO, TYPE, NUMBER) # Override the method from parent class - def get_priority(self): + def get_priority(self) -> config.Priority: default_priority_map = { 'todo': self.config.default_todo_priority, 'merge_request': self.config.default_mr_priority, @@ -410,7 +427,7 @@ def get_priority(self): return default_priority_map.get(type_str, default_priority) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: author = self.record['author'] milestone = self.record.get('milestone') created = self.record['created_at'] @@ -487,10 +504,10 @@ def to_taskwarrior(self): self.WEIGHT: weight, } - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels(self.record.get('labels', [])) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.title, url=self.extra['issue_url'], @@ -499,13 +516,15 @@ def get_default_description(self): ) -class GitlabService(Service): +class GitlabService(Service[GitlabIssue]): API_VERSION = 1.0 ISSUE_CLASS = GitlabIssue CONFIG_SCHEMA = GitlabConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: GitlabConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) token = self.get_secret('token', self.config.login) self.gitlab_client = GitlabClient( @@ -516,20 +535,20 @@ def __init__(self, *args, **kw): use_https=self.config.use_https, verify_ssl=self.config.verify_ssl, ) - self.repo_map = dict() + self.repo_map: dict[int, dict[str, Any]] = {} @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: GitlabConfig) -> str: return f"gitlab://{config.login}@{config.host}" - def get_owner(self, issue): + def get_owner(self, issue: GitlabIssueEntry) -> list[str]: return [assignee['username'] for assignee in issue[1]['assignees']] - def get_author(self, issue): + def get_author(self, issue: GitlabIssueEntry) -> str | None: if issue[1]['author'] is not None and issue[1]['author']['username']: return issue[1]['author']['username'] - def filter_repos(self, repo): + def filter_repos(self, repo: dict[str, Any]) -> bool: if ( repo['path_with_namespace'] in self.config.exclude_repos or "id:%d" % repo['id'] in self.config.exclude_repos @@ -560,7 +579,9 @@ def filter_repos(self, repo): return is_included - def annotations(self, repo, url, issue_type, issue): + def annotations( + self, repo: dict[str, Any], url: str, issue_type: str, issue: dict[str, Any] + ) -> list[str]: annotations = [] if self.main_config.annotation_comments: @@ -569,16 +590,20 @@ def annotations(self, repo, url, issue_type, issue): return self.build_annotations(annotations, url) - def include_todo(self, repos): + def include_todo( + self, repos: list[dict[str, Any]] + ) -> Callable[[GitlabTodoEntry], bool]: ids = list(r['id'] for r in repos) - def include_todo(todo): - project, todo = todo + def include_todo(item: GitlabTodoEntry) -> bool: + project, _todo = item return project is None or project['id'] in ids return include_todo - def _get_issue_objs(self, issues, issue_type): + def _get_issue_objs( + self, issues: list[GitlabIssueEntry], issue_type: str + ) -> Iterator[GitlabIssue]: type_plural = issue_type + 's' for rid, issue in issues: @@ -600,7 +625,7 @@ def _get_issue_objs(self, issues, issue_type): issue_obj.extra.update(extra) yield issue_obj - def _get_todo_objs(self, todos): + def _get_todo_objs(self, todos: list[GitlabTodoEntry]) -> Iterator[GitlabIssue]: for project, todo in todos: todo['repo'] = project['path'] if project is not None else 'the instance' @@ -619,7 +644,7 @@ def _get_todo_objs(self, todos): todo_obj.extra.update(extra) yield todo_obj - def include(self, issue): + def include(self, issue: GitlabIssueEntry) -> bool: """Return true if the issue in question should be included""" if not self.filter_repos(self.gitlab_client.get_repo_cached(issue[0])): return False @@ -637,15 +662,15 @@ def include(self, issue): return True - def get_issues_from_projects(self, repos): - issues = {} + def get_issues_from_projects(self, repos: list[dict[str, Any]]) -> GitlabIssueMap: + issues: GitlabIssueMap = {} for repo in repos: rid = repo['id'] self.repo_map[rid] = repo issues.update(self.gitlab_client.get_repo_issues(rid)) return issues - def get_all_repos(self): + def get_all_repos(self) -> list: include_repos = list() if not self.config.include_regex: include_repos = self.config.include_repos @@ -657,7 +682,7 @@ def get_all_repos(self): repos = list(filter(self.filter_repos, all_repos)) return repos - def description(self, issue): + def description(self, issue: dict[str, Any]) -> str | None: description = issue['description'] if description: @@ -666,9 +691,9 @@ def description(self, issue): return description - def issues(self): + def issues(self) -> Iterator[GitlabIssue]: # List of repos will only be queried if needed - repos = [] + repos: list = [] # Issues if self.config.include_issues: @@ -695,7 +720,7 @@ def issues(self): else: if not repos: repos = self.get_all_repos() - merge_requests = {} + merge_requests: GitlabIssueMap = {} for repo in repos: rid = repo['id'] merge_requests.update( diff --git a/bugwarrior/services/gmail.py b/bugwarrior/services/gmail.py index 38e2456b..e67ab305 100644 --- a/bugwarrior/services/gmail.py +++ b/bugwarrior/services/gmail.py @@ -1,3 +1,4 @@ +from collections.abc import Iterator from datetime import datetime, timezone import email import email.utils @@ -8,8 +9,10 @@ import pickle import re import typing +from typing import Any from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow import googleapiclient.discovery @@ -61,7 +64,7 @@ class GmailIssue(Issue): 'SENT', ] - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'annotations': self.get_annotations(), 'entry': self.get_entry(), @@ -81,7 +84,7 @@ def to_taskwarrior(self): self.LABELS: " ".join(sorted(self.extra['labels'])), } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.extra['subject'], url=self.extra['url'], @@ -89,16 +92,16 @@ def get_default_description(self): cls='issue', ) - def get_annotations(self): + def get_annotations(self) -> list[str]: return self.extra.get('annotations', []) - def get_entry(self): + def get_entry(self) -> datetime: # internal_date is in milliseconds, convert to seconds and create UTC datetime timestamp_seconds = int(self.extra['internal_date']) / 1000 return datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc) -class GmailService(Service): +class GmailService(Service[GmailIssue]): APPLICATION_NAME = 'Bugwarrior Gmail Service' SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] @@ -107,8 +110,10 @@ class GmailService(Service): CONFIG_SCHEMA = GmailConfig AUTHENTICATION_LOCK = multiprocessing.Lock() - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: GmailConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) credentials_name = clean_filename( self.config.login_name @@ -122,16 +127,16 @@ def __init__(self, *args, **kw): self.gmail_api = self.build_api() @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: GmailConfig) -> str: return f'gmail://{config.login_name}' - def build_api(self): + def build_api(self) -> googleapiclient.discovery.Resource: credentials = self.get_credentials() return googleapiclient.discovery.build( 'gmail', 'v1', credentials=credentials, cache_discovery=False ) - def get_credentials(self): + def get_credentials(self) -> Credentials: """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, @@ -167,17 +172,17 @@ def get_credentials(self): log.info('Storing credentials to %r', self.credentials_path) return credentials - def get_labels(self): + def get_labels(self) -> dict[str, str]: result = ( - self.gmail_api.users() + self.gmail_api.users() # ty: ignore[unresolved-attribute] .labels() .list(userId=self.config.login_name) .execute() ) return {label['id']: label['name'] for label in result['labels']} - def get_threads(self): - thread_service = self.gmail_api.users().threads() + def get_threads(self) -> list[dict[str, Any]]: + thread_service = self.gmail_api.users().threads() # ty: ignore[unresolved-attribute] threads = [] pageToken = None @@ -203,13 +208,13 @@ def get_threads(self): return threads - def annotations(self, issue): + def annotations(self, issue: GmailIssue) -> list[str]: sender = issue.extra['last_sender_name'] subj = issue.extra['subject'] issue_url = issue.extra['url'] return self.build_annotations([(sender, subj)], issue_url) - def issues(self): + def issues(self) -> Iterator[GmailIssue]: labels = self.get_labels() for thread in self.get_threads(): issue = self.get_issue_for_record(thread, thread_extras(thread, labels)) @@ -218,7 +223,7 @@ def issues(self): yield issue -def thread_extras(thread, labels): +def thread_extras(thread: dict[str, Any], labels: dict[str, str]) -> dict[str, Any]: name, address = thread_last_sender(thread) last_message_id = thread_last_message_id(thread) return { @@ -233,44 +238,44 @@ def thread_extras(thread, labels): } -def thread_labels(thread): +def thread_labels(thread: dict[str, Any]) -> set[str]: return {label for message in thread['messages'] for label in message['labelIds']} -def thread_subject(thread): +def thread_subject(thread: dict[str, Any]) -> str | None: return message_header(thread['messages'][0], 'Subject') -def thread_last_sender(thread): +def thread_last_sender(thread: dict[str, Any]) -> tuple[str, str]: from_header = message_header(thread['messages'][-1], 'From') - name, address = email.utils.parseaddr(from_header) + name, address = email.utils.parseaddr(from_header or "") return name if name else address, address -def thread_last_message_id(thread): +def thread_last_message_id(thread: dict[str, Any]) -> str: message_id_header = message_header(thread['messages'][-1], 'Message-ID') if not message_id_header or message_id_header == '': return '' return message_id_header[1:-1] # remove the enclosing < >. -def thread_timestamp(thread): +def thread_timestamp(thread: dict[str, Any]) -> str: return thread['messages'][-1]['internalDate'] -def thread_snippet(thread): +def thread_snippet(thread: dict[str, Any]) -> str: return thread['messages'][-1]['snippet'] -def thread_url(thread): +def thread_url(thread: dict[str, Any]) -> str: return "https://mail.google.com/mail/u/0/#all/%s" % (thread['id'],) -def message_header(message, header_name): +def message_header(message: dict[str, Any], header_name: str) -> str | None: for item in message['payload']['headers']: if item['name'] == header_name: return item['value'] -def clean_filename(name): +def clean_filename(name: str) -> str: return re.sub(r'[^A-Za-z0-9_]+', '_', name) diff --git a/bugwarrior/services/jira.py b/bugwarrior/services/jira.py index 416b29b6..77057003 100644 --- a/bugwarrior/services/jira.py +++ b/bugwarrior/services/jira.py @@ -1,8 +1,11 @@ +from collections.abc import Iterator import dataclasses +import datetime from functools import reduce import logging import sys import typing +from typing import Any from jira.client import JIRA as BaseJIRA from jira.exceptions import JIRAError @@ -16,18 +19,18 @@ class ExtraFieldConfigError(Exception): - def __init__(self, extra_field_raw): + def __init__(self, extra_field_raw: str) -> None: self.message = f'Extra field is improperly defined: {extra_field_raw}' super().__init__(self.message) class ExtraFieldNotFoundError(Exception): - def __init__(self, label, query): + def __init__(self, label: str, query: str) -> None: self.message = f'Extra field {label}:{query} not found among Jira issue fields.' super().__init__(self.message) -def parse_jira_extra_fields(extra_fields_raw) -> "list[JiraExtraField] | None": +def parse_jira_extra_fields(extra_fields_raw: Any) -> "list[JiraExtraField] | None": if extra_fields_raw is None: return None try: # ini @@ -55,7 +58,7 @@ class JiraExtraField: label: str keys: list[str] - def extract_value(self, fields): + def extract_value(self, fields: dict[str, Any]) -> Any: """Extract a field value from a dictionary of Jira issue fields.""" try: @@ -95,7 +98,7 @@ class JiraConfig(config.ServiceConfig): also_unassigned: config.UnsupportedOption[bool] = False @model_validator(mode='after') - def require_password_xor_PAT(self): + def require_password_xor_PAT(self) -> "JiraConfig": if (self.password and self.PAT) or not (self.password or self.PAT): raise ValueError( 'section requires one of (not both):\n password\n PAT' @@ -108,17 +111,17 @@ def require_password_xor_PAT(self): # https://github.com/GaretJax/lancet/commit/f175cb2ec9a2135fb78188cf0b9f621b51d88977 # Prevents Jira web client being logged out when API call is made. class ObliviousCookieJar(RequestsCookieJar): - def set_cookie(self, *args, **kwargs): + def set_cookie(self, *args: Any, **kwargs: Any) -> None: """Simply ignore any request to set a cookie.""" pass - def copy(self): + def copy(self) -> "ObliviousCookieJar": """Make sure to return an instance of the correct class on copying.""" return ObliviousCookieJar() class JIRA(BaseJIRA): - def _create_http_basic_session(self, *args, **kwargs): + def _create_http_basic_session(self, *args: Any, **kwargs: Any) -> None: super()._create_http_basic_session(*args, **kwargs) # XXX: JIRA logs the web user out if we send the session cookies we get @@ -127,14 +130,14 @@ def _create_http_basic_session(self, *args, **kwargs): assert self._session is not None self._session.cookies = ObliviousCookieJar() - def close(self): + def close(self) -> None: # this is called in a destructor, which may occur before the session # has been created, so be resilient to a missing session if (session := getattr(self, "_session", None)) is not None: session.close() -def _parse_sprint_string(sprint): +def _parse_sprint_string(sprint: str) -> dict[str, str]: """Parse the big ugly sprint string stored by JIRA. They look like: @@ -175,7 +178,7 @@ class JiraIssue(Issue): } UNIQUE_KEY = (URL,) - PRIORITY_MAP = { + PRIORITY_MAP: dict[str, config.Priority] = { 'Highest': 'H', 'High': 'H', 'Medium': 'M', @@ -188,7 +191,7 @@ class JiraIssue(Issue): 'Blocker': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: fixed_fields = { 'project': self.get_project(), 'priority': self.get_priority(), @@ -212,7 +215,7 @@ def to_taskwarrior(self): return {**fixed_fields, **extra_fields} - def get_extra_fields(self): + def get_extra_fields(self) -> dict[str, Any]: if self.config.extra_fields is None: return {} @@ -221,13 +224,13 @@ def get_extra_fields(self): for extra_field in self.config.extra_fields } - def get_entry(self): + def get_entry(self) -> datetime.datetime | None: created_at = self.record['fields']['created'] # Convert timestamp to an offset-aware datetime date = self.parse_date(created_at) return date - def get_tags(self): + def get_tags(self) -> list[str]: labels = self.record.get('fields', {}).get('labels', []) label_tags = self.get_tags_from_labels(labels) @@ -238,7 +241,7 @@ def get_tags(self): return label_tags + sprint_tags - def get_due(self): + def get_due(self) -> datetime.datetime | None: # If the duedate is explicitly set on the issue, then use that. if self.record['fields'].get('duedate'): return self.parse_date(self.record['fields']['duedate']) @@ -249,7 +252,7 @@ def get_due(self): if endDate != '': return self.parse_date(endDate) - def __get_sprints(self): + def __get_sprints(self) -> Iterator[dict[str, Any]]: fields = self.record.get('fields', {}) sprints = sum( (fields.get(key) or [] for key in self.extra['sprint_field_names']), [] @@ -263,24 +266,24 @@ def __get_sprints(self): # string yield _parse_sprint_string(sprint) - def get_annotations(self): + def get_annotations(self) -> list[str]: return self.extra.get('annotations', []) - def get_project(self): + def get_project(self) -> str: return self.record['key'].rsplit('-', 1)[0] - def get_number(self): + def get_number(self) -> str: return self.record['key'].rsplit('-', 1)[1] - def get_url(self): + def get_url(self) -> str: return self.config.base_uri + '/browse/' + self.record['key'] - def get_summary(self): + def get_summary(self) -> str: if self.config.version == 4: return self.record['fields']['summary']['value'] return self.record['fields']['summary'] - def get_estimate(self): + def get_estimate(self) -> float | None: if self.config.version == 4: return self.record['fields']['timeestimate']['value'] try: @@ -288,7 +291,7 @@ def get_estimate(self): except (TypeError, KeyError): return None - def get_priority(self): + def get_priority(self) -> config.Priority: value = self.record['fields'].get('priority') try: value = value['name'] @@ -298,7 +301,7 @@ def get_priority(self): map_key = value.strip().split()[-1] return self.PRIORITY_MAP.get(map_key, self.config.default_priority) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.get_summary(), url=self.get_url(), @@ -306,21 +309,21 @@ def get_default_description(self): cls='issue', ) - def get_fix_version(self): + def get_fix_version(self) -> str | None: try: return self.record['fields'].get('fixVersions', [{}])[0].get('name') except (IndexError, KeyError, AttributeError, TypeError): return None - def get_status(self): + def get_status(self) -> str: return self.record['fields']['status']['name'] - def get_subtasks(self): + def get_subtasks(self) -> str: return ','.join( task['key'] for task in self.record['fields'].get('subtasks', []) ) - def get_parent(self): + def get_parent(self) -> str | None: try: parent = self.record['fields']['parent']['key'] except (KeyError,): @@ -328,18 +331,23 @@ def get_parent(self): return parent - def get_issue_type(self): + def get_issue_type(self) -> str: return self.record['fields']['issuetype']['name'] -class JiraService(Service): +class JiraService(Service[JiraIssue]): API_VERSION = 1.0 ISSUE_CLASS = JiraIssue CONFIG_SCHEMA = JiraConfig - def __init__(self, *args, **kw): - _skip_server = kw.pop('_skip_server', False) - super().__init__(*args, **kw) + def __init__( + self, + config: JiraConfig, + main_config: config.MainSectionConfig, + *, + _skip_server: bool = False, + ) -> None: + super().__init__(config, main_config) default_query = ( 'assignee="' @@ -381,10 +389,10 @@ def _build_jira_client(self) -> JIRA: return JIRA(options=jira_options, basic_auth=(self.config.username, password)) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: JiraConfig) -> str: return f"jira://{config.username}@{config.base_uri}" - def body(self, issue): + def body(self, issue: JiraIssue) -> str | None: body = issue.record.get('fields', {}).get('description') if body: @@ -392,14 +400,14 @@ def body(self, issue): return body - def annotations(self, issue, issue_obj): + def annotations(self, issue: Any, issue_obj: JiraIssue) -> list[str]: comments = self.jira.comments(issue.key) or [] return self.build_annotations( ((comment.author.displayName, comment.body) for comment in comments), issue_obj.get_url(), ) - def issues(self): + def issues(self) -> Iterator[JiraIssue]: try: cases = self.jira.search_issues(self.query, maxResults=False) except JIRAError: # Jira Cloud @@ -409,8 +417,8 @@ def issues(self): issue = self.get_issue_for_record( case.raw, extra={'sprint_field_names': self.sprint_field_names} ) - extra = {'body': self.body(issue)} + extra: dict[str, Any] = {'body': self.body(issue)} if self.config.version > 4: - extra.update({'annotations': self.annotations(case, issue)}) + extra['annotations'] = self.annotations(case, issue) issue.extra.update(extra) yield issue diff --git a/bugwarrior/services/kanboard.py b/bugwarrior/services/kanboard.py index e4975c00..ac93e04a 100644 --- a/bugwarrior/services/kanboard.py +++ b/bugwarrior/services/kanboard.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator import datetime import logging import re import typing +from typing import Any from urllib.parse import urlparse from kanboard import Client @@ -42,9 +44,14 @@ class KanboardIssue(Issue): } UNIQUE_KEY = (TASK_ID,) - PRIORITY_MAP = {"0": None, "1": "L", "2": "M", "3": "H"} + PRIORITY_MAP: dict[str, config.Priority | None] = { + "0": None, + "1": "L", + "2": "M", + "3": "H", + } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { "project": self.get_project(), "priority": self.get_priority(), @@ -60,59 +67,61 @@ def to_taskwarrior(self): self.URL: self.get_url(), } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.get_task_title(), url=self.get_url(), number=self.get_task_id() ) - def get_task_id(self): + def get_task_id(self) -> int: return int(self.record["id"]) - def get_task_title(self): + def get_task_title(self) -> str: return self.record["title"] - def get_task_description(self): + def get_task_description(self) -> str: return self.record["description"] - def get_project_id(self): + def get_project_id(self) -> int: return int(self.record["project_id"]) - def get_project_name(self): + def get_project_name(self) -> str: return self.record["project_name"] - def get_project(self): + def get_project(self) -> str: value = self.get_project_name() value = re.sub(r"[^a-zA-Z0-9]", "_", value) return value.strip("_") - def get_url(self): + def get_url(self) -> str: return self.extra["url"] - def get_tags(self): + def get_tags(self) -> list[str]: return self.extra.get("tags", []) - def get_due(self): + def get_due(self) -> datetime.datetime | None: return self._convert_timestamp_from_field("date_due") - def get_entry(self): + def get_entry(self) -> datetime.datetime | None: return self._convert_timestamp_from_field("date_creation") - def get_annotations(self): + def get_annotations(self) -> list[str]: return self.extra.get("annotations", []) - def _convert_timestamp_from_field(self, field): + def _convert_timestamp_from_field(self, field: str) -> datetime.datetime | None: timestamp = int(self.record.get(field, 0)) if timestamp: return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) -class KanboardService(Service): +class KanboardService(Service[KanboardIssue]): API_VERSION = 1.0 ISSUE_CLASS = KanboardIssue CONFIG_SCHEMA = KanboardConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: KanboardConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) password = self.get_secret("password", self.config.username) self.client = Client( f"{self.config.url}/jsonrpc.php", self.config.username, password @@ -120,7 +129,7 @@ def __init__(self, *args, **kw): default_query = f"status:open assignee:{self.config.username}" self.query = self.config.query or default_query - def annotations(self, task, url): + def annotations(self, task: dict[str, Any], url: str) -> list[str]: comments = [] if int(task.get("nb_comments", 0)): comments = self.client.get_all_comments(**{"task_id": task["id"]}) @@ -128,7 +137,7 @@ def annotations(self, task, url): ((c["name"], c["comment"]) for c in comments), url ) - def issues(self): + def issues(self) -> Iterator[KanboardIssue]: # The API provides only a per-project search. Retrieve the list of # projects first and query each project in turn. projects = self.client.get_my_projects_list() @@ -162,6 +171,6 @@ def issues(self): yield self.get_issue_for_record(task, extra) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: KanboardConfig) -> str: parsed = urlparse(config.url) return f"kanboard://{config.username}@{parsed.netloc}" diff --git a/bugwarrior/services/linear.py b/bugwarrior/services/linear.py index fc1eae72..711d3fc8 100644 --- a/bugwarrior/services/linear.py +++ b/bugwarrior/services/linear.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator import json import logging import re import typing +from typing import Any from pydantic import model_validator import requests @@ -25,7 +27,7 @@ class LinearConfig(config.ServiceConfig): @model_validator(mode='before') @classmethod - def statuses_or_status_types(cls, values): + def statuses_or_status_types(cls, values: Any) -> dict[str, Any]: statuses = values.get("statuses") status_types = values.get("status_types") if statuses and status_types: @@ -66,9 +68,9 @@ class LinearIssue(Issue): # Linear exposes issue priority as an integer: # 0 = No priority, 1 = Urgent, 2 = High, 3 = Medium, 4 = Low. - PRIORITY_MAP = {1: "H", 2: "H", 3: "M", 4: "L"} + PRIORITY_MAP: dict[int, config.Priority] = {1: "H", 2: "H", 3: "M", 4: "L"} - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: description = self.record.get("description") created = self.parse_date(self.record.get("createdAt")) modified = self.parse_date(self.record.get("updatedAt")) @@ -79,11 +81,8 @@ def to_taskwarrior(self): # GraphQL response values, such as for `project`, are either an object # or None, rather than being omitted when empty, so this allows chained # traversal of such values. - def get(v, k, default=None): - r = v.get(k, default) - if not r: - return default - return r + def get(v: Any, k: str, default: Any = None) -> Any: + return v.get(k, default) or default return { "project": ( @@ -112,13 +111,13 @@ def get(v, k, default=None): self.CLOSED_AT: closed, } - def get_tags(self): + def get_tags(self) -> list[str]: labels = [ label["name"] for label in self.record.get("labels", {}).get("nodes", []) ] return self.get_tags_from_labels(labels) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record.get("title", ""), url=self.record.get("url", ""), @@ -127,13 +126,15 @@ def get_default_description(self): ) -class LinearService(Service, Client): +class LinearService(Service[LinearIssue]): API_VERSION = 1.0 ISSUE_CLASS = LinearIssue CONFIG_SCHEMA = LinearConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: LinearConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.session = requests.Session() self.session.headers.update( @@ -143,7 +144,7 @@ def __init__(self, *args, **kwargs): } ) - self.filter = [] + self.filter: list[dict[str, Any]] = [] if self.config.only_if_assigned: self.filter.append( {"assignee": {"email": {"eq": self.config.only_if_assigned}}} @@ -199,14 +200,14 @@ def __init__(self, *args, **kwargs): """ @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: LinearConfig) -> str: return f"linear://{config.host}" - def issues(self): + def issues(self) -> Iterator[LinearIssue]: for issue in self.get_issues(): yield self.get_issue_for_record(issue, {}) - def get_issues(self): + def get_issues(self) -> Iterator[dict[str, Any]]: """ Make Linear API requests, paginating with cursors until exhausted. @@ -224,7 +225,7 @@ def get_issues(self): "variables": {"filter": filter_arg, "after": cursor}, } response = self.session.post(self.config.host, data=json.dumps(data)) - res = self.json_response(response) + res = Client.json_response(response) if "errors" in res: messages = [ diff --git a/bugwarrior/services/logseq.py b/bugwarrior/services/logseq.py index a183ffab..784f6343 100644 --- a/bugwarrior/services/logseq.py +++ b/bugwarrior/services/logseq.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator from datetime import datetime import logging import re import typing +from typing import Any import requests @@ -39,7 +41,7 @@ class LogseqConfig(config.ServiceConfig): class LogseqClient(Client): - def __init__(self, host, port, token, filter): + def __init__(self, host: str, port: int, token: str, filter: str) -> None: self.host = host self.port = port self.token = token @@ -50,7 +52,7 @@ def __init__(self, host, port, token, filter): "content-type": "application/json; charset=utf-8", } - def _datascript_query(self, query): + def _datascript_query(self, query: str) -> Any: try: response = requests.post( f"http://{self.host}:{self.port}/api", @@ -62,7 +64,7 @@ def _datascript_query(self, query): log.fatal("Unable to connect to Logseq HTTP APIs server. %s", ce) exit(1) - def _get_current_graph(self): + def _get_current_graph(self) -> dict[str, Any]: try: response = requests.post( f"http://{self.host}:{self.port}/api", @@ -74,11 +76,11 @@ def _get_current_graph(self): log.fatal("Unable to connect to Logseq HTTP APIs server. %s", ce) exit(1) - def get_graph_name(self): + def get_graph_name(self) -> str | None: graph = self._get_current_graph() return graph["name"] if graph else None - def get_page(self, page_id): + def get_page(self, page_id: int) -> dict[str, Any]: try: response = requests.post( f"http://{self.host}:{self.port}/api", @@ -90,7 +92,7 @@ def get_page(self, page_id): log.fatal("Unable to connect to Logseq HTTP APIs server. %s", ce) exit(1) - def get_issues(self): + def get_issues(self) -> Any: query = f""" [:find (pull ?b [*]) :where [?b :block/marker ?marker] @@ -137,11 +139,11 @@ class LogseqIssue(Issue): UNIQUE_KEY = (ID, UUID) # map A B C priority to H M L - PRIORITY_MAP = {"A": "H", "B": "M", "C": "L"} + PRIORITY_MAP: dict[str, config.Priority] = {"A": "H", "B": "M", "C": "L"} # `pending` is the defuault state. Taskwarrior will dynamcily change task to `waiting` # state if wait date is set to a future date. - STATE_MAP = { + STATE_MAP: dict[str, str] = { "IN-PROGRESS": "pending", "DOING": "pending", "TODO": "pending", @@ -156,7 +158,7 @@ class LogseqIssue(Issue): # replace characters that cause escaping issues like [] and " # this is a workaround for https://github.com/ralphbean/taskw/issues/172 - def _unescape_content(self, content): + def _unescape_content(self, content: str) -> str: return ( content.replace('"', "'") # prevent &dquote; in task details .replace( @@ -169,7 +171,7 @@ def _unescape_content(self, content): # remove brackets and spaces to compress display format of mutli work tags # e.g from #[[Multi Word]] to #MultiWord - def _compress_tag_format(self, tag): + def _compress_tag_format(self, tag: str) -> str: return ( tag.replace(self.config.char_open_link, "") .replace(" ", "") @@ -177,7 +179,7 @@ def _compress_tag_format(self, tag): ) # get an optimized and formatted title - def get_formatted_title(self): + def get_formatted_title(self) -> str: # use first line only and remove state and priority first_line = ( self.record["content"] @@ -190,7 +192,7 @@ def get_formatted_title(self): return self._unescape_content(first_line) # get a list of tags from the task content - def get_tags_from_content(self): + def get_tags_from_content(self) -> list[str]: # pattern match for #[[multi word]] tags and #single word tags # but ignore any non-tag use of the # character in URLs # like http://example.com/page#test or in `#code` @@ -212,7 +214,9 @@ def get_tags_from_content(self): return tags # get a list of annotations from the content - def get_annotations_from_content(self): + def get_annotations_from_content( + self, + ) -> tuple[list[str], datetime | None, datetime | None]: annotations = [] scheduled_date = None deadline_date = None @@ -242,13 +246,13 @@ def get_annotations_from_content(self): annotations.pop(0) # remove first line return annotations, scheduled_date, deadline_date - def get_url(self): + def get_url(self) -> str: return f'logseq://graph/{self.extra["graph"]}?block-id={self.record["uuid"]}' - def get_logseq_state(self): + def get_logseq_state(self) -> str: return self.record["marker"] - def get_scheduled_date(self, scheduled): + def get_scheduled_date(self, scheduled: str) -> datetime | None: # format is # e.g. <2024-06-20 Thu 10:55 .+1d> date_split = ( @@ -282,10 +286,10 @@ def get_scheduled_date(self, scheduled): log.warning(f"Could not parse date {date} from {scheduled}") return None - def _is_waiting(self): + def _is_waiting(self) -> bool: return self.get_logseq_state() in ["WAIT", "WAITING"] - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: annotations, scheduled_date, deadline_date = self.get_annotations_from_content() wait_date = min( [d for d in [scheduled_date, deadline_date, self.SOMEDAY] if d is not None] @@ -309,7 +313,7 @@ def to_taskwarrior(self): self.PAGE: self.extra["page_title"], } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.get_formatted_title(), url=self.get_url() if self.config.inline_links else "", @@ -318,13 +322,15 @@ def get_default_description(self): ) -class LogseqService(Service): +class LogseqService(Service[LogseqIssue]): API_VERSION = 1.0 ISSUE_CLASS = LogseqIssue CONFIG_SCHEMA = LogseqConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: LogseqConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.token = self.get_secret('token') filter = '"' + '" "'.join(self.config.task_state) + '"' self.client = LogseqClient( @@ -335,10 +341,10 @@ def __init__(self, *args, **kwargs): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: LogseqConfig) -> str: return f"http://{config.host}:{config.port}" - def issues(self): + def issues(self) -> Iterator[LogseqIssue]: graph_name = self.client.get_graph_name() for issue in self.client.get_issues(): parent_page = self.client.get_page(issue[0]["parent"]["id"]) diff --git a/bugwarrior/services/pagure.py b/bugwarrior/services/pagure.py index d56cfbfb..7cdccf75 100644 --- a/bugwarrior/services/pagure.py +++ b/bugwarrior/services/pagure.py @@ -1,6 +1,8 @@ +from collections.abc import Iterator import datetime import logging import typing +from typing import Any from pydantic import model_validator import requests @@ -27,7 +29,7 @@ class PagureConfig(config.ServiceConfig): tag_template: str = '{{label}}' @model_validator(mode='after') - def require_tag_or_repo(self): + def require_tag_or_repo(self) -> "PagureConfig": if not self.tag and not self.repo: raise ValueError('section requires one of:\n tag\n repo') return self @@ -51,7 +53,7 @@ class PagureIssue(Issue): } UNIQUE_KEY = (URL, TYPE) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: if self.extra['type'] == 'pull_request': priority = 'H' else: @@ -72,14 +74,14 @@ def to_taskwarrior(self): ), } - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels( self.record.get('tags', []), toggle_option='import_tags', template_option='tag_template', ) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['title'], url=self.record['html_url'], @@ -88,17 +90,21 @@ def get_default_description(self): ) -class PagureService(Service): +class PagureService(Service[PagureIssue]): API_VERSION = 1.0 ISSUE_CLASS = PagureIssue CONFIG_SCHEMA = PagureConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: PagureConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.session = requests.Session() - def get_issues(self, repo, keys): + def get_issues( + self, repo: str, keys: tuple[str, str] + ) -> list[tuple[str, dict[str, Any]]]: """Grab all the issues""" key1, key2 = keys key3 = key1[:-1] # Just the singular form of key1 @@ -122,17 +128,17 @@ def get_issues(self, repo, keys): return issues - def annotations(self, issue): + def annotations(self, issue: dict[str, Any]) -> list[str]: url = issue['html_url'] return self.build_annotations( ((c['user']['name'], c['comment']) for c in issue['comments']), url ) - def get_owner(self, issue): + def get_owner(self, issue: tuple[str, dict[str, Any]]) -> str | None: if issue[1]['assignee']: return issue[1]['assignee']['name'] - def include(self, issue): + def include(self, issue: tuple[str, dict[str, Any]]) -> bool: """Return true if the issue in question should be included""" if self.config.only_if_assigned: owner = self.get_owner(issue) @@ -145,7 +151,7 @@ def include(self, issue): return True - def filter_repos(self, repo): + def filter_repos(self, repo: str) -> bool: if repo in self.config.exclude_repos: return False @@ -157,7 +163,7 @@ def filter_repos(self, repo): return True - def issues(self): + def issues(self) -> Iterator[PagureIssue]: if self.config.tag: url = self.config.base_url + "/api/0/projects?tags=" + self.config.tag response = self.session.get(url) diff --git a/bugwarrior/services/phab.py b/bugwarrior/services/phab.py index 0accdb71..f433630d 100644 --- a/bugwarrior/services/phab.py +++ b/bugwarrior/services/phab.py @@ -1,5 +1,7 @@ +from collections.abc import Iterator import logging import typing +from typing import Any import phabricator import pydantic @@ -41,7 +43,7 @@ class PhabricatorIssue(Issue): } UNIQUE_KEY = (URL,) - PRIORITY_MAP = { + PRIORITY_MAP: dict[str, config.Priority | None] = { 'Needs Triage': None, 'Unbreak Now!': 'H', 'High': 'H', @@ -50,7 +52,7 @@ class PhabricatorIssue(Issue): 'Wishlist': 'L', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['project'], 'priority': self.priority, @@ -61,7 +63,7 @@ def to_taskwarrior(self): self.OBJECT_NAME: self.record['uri'].split('/')[-1], } - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['title'], url=self.record['uri'], @@ -70,20 +72,22 @@ def get_default_description(self): ) @property - def priority(self): + def priority(self) -> config.Priority: return ( self.PRIORITY_MAP.get(self.record.get('priority', '')) or self.config.default_priority ) -class PhabricatorService(Service): +class PhabricatorService(Service[PhabricatorIssue]): API_VERSION = 1.0 ISSUE_CLASS = PhabricatorIssue CONFIG_SCHEMA = PhabricatorConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: PhabricatorConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) # These read login credentials from ~/.arcrc if self.config.host: @@ -103,10 +107,10 @@ def __init__(self, *args, **kw): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: PhabricatorConfig) -> str: return f'phabricator://{config.host if config.host else ""}' - def tasks(self): + def tasks(self) -> Iterator[PhabricatorIssue]: # If self.config.user_phids or self.config.project_phids is set, # retrict API calls to user_phids or project_phids to avoid time out # with Phabricator installations with huge userbase. @@ -187,7 +191,7 @@ def tasks(self): yield self.get_issue_for_record(task, extra) - def revisions(self): + def revisions(self) -> Iterator[PhabricatorIssue]: try: diffs = self.api.differential.query(status='status-open') except phabricator.APIError as err: @@ -242,6 +246,6 @@ def revisions(self): } yield self.get_issue_for_record(diff, extra) - def issues(self): + def issues(self) -> Iterator[PhabricatorIssue]: yield from self.tasks() yield from self.revisions() diff --git a/bugwarrior/services/pivotaltracker.py b/bugwarrior/services/pivotaltracker.py index ebd552fa..d832b913 100644 --- a/bugwarrior/services/pivotaltracker.py +++ b/bugwarrior/services/pivotaltracker.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator import logging import operator import re import typing +from typing import Any from jinja2 import Template import requests @@ -68,7 +70,7 @@ class PivotalTrackerIssue(Issue): UNIQUE_KEY = (URL,) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: description = self.record.get('description') created = self.parse_date(self.record.get('created_at')) modified = self.parse_date(self.record.get('updated_at')) @@ -94,11 +96,11 @@ def to_taskwarrior(self): self.CLOSED_AT: closed, } - def get_tags(self): + def get_tags(self) -> list[str]: labels = [label['name'] for label in self.record.get('labels', [])] return self.get_tags_from_labels(labels) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record.get('name', ''), url=self.record.get('url', ''), @@ -107,13 +109,15 @@ def get_default_description(self): ) -class PivotalTrackerService(Service, Client): +class PivotalTrackerService(Service[PivotalTrackerIssue]): API_VERSION = 1.0 ISSUE_CLASS = PivotalTrackerIssue CONFIG_SCHEMA = PivotalTrackerConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: PivotalTrackerConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.path = f"{self.config.host}/{self.config.version}" @@ -139,10 +143,12 @@ def __init__(self, *args, **kwargs): self.query += f" requester:{self.config.user_id}" @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: PivotalTrackerConfig) -> str: return f'pivotaltracker://{config.user_id}@{config.host}' - def annotations(self, annotations, story): + def annotations( + self, annotations: list[dict[str, Any]], story: dict[str, Any] + ) -> list[str]: final_annotations = [] if self.main_config.annotation_comments: annotation_template = Template(self.config.annotation_template) @@ -152,11 +158,11 @@ def annotations(self, annotations, story): ) return self.build_annotations(final_annotations, story.get('url')) - def blockers(self, blocker_list): + def blockers(self, blocker_list: list[dict[str, Any]]) -> str | None: blockers = [] if not self.config.import_blockers: - return blockers + return None blocker_template = Template(self.config.blocker_template) for blocker in blocker_list: @@ -164,12 +170,14 @@ def blockers(self, blocker_list): return ', '.join(blockers) or None - def issues(self): + def issues(self) -> Iterator[PivotalTrackerIssue]: for project in self.get_projects(self.config.account_ids): project_id = project.get('id') if project_id not in self.config.exclude_projects: for story in self.get_query(project_id, query=self.query): story_id = story.get('id') + if story_id is None: + continue tasks = self.get_tasks(project_id, story_id) blockers = self.get_blockers(project_id, story_id) extra = { @@ -185,47 +193,36 @@ def issues(self): } yield self.get_issue_for_record(story, extra) - def api_request(self, endpoint, params={}): + def api_request(self, endpoint: str, params: dict[str, Any] | None = None) -> Any: """ Make a PivotalTracker API request. This takes an absolute urland a list of argumnets and return a GET request with the key and token from the configuration. """ - subkey = params.pop('subkey', None) - url = "{path}/{endpoint}".format(path=self.path, endpoint=endpoint) - response = self.session.get(url, params=params) - json_res = self.json_response(response) + response = self.session.get(f"{self.path}/{endpoint}", params=params or {}) + return Client.json_response(response) - if subkey is not None: - json_res = json_res[subkey] - - return json_res - - def get_projects(self, account_ids): + def get_projects(self, account_ids: list[str]) -> list[dict[str, Any]]: params = {'account_ids': ','.join(account_ids)} projects = self.api_request('projects', params=params) return projects - def get_query(self, project_id, **params): - params['subkey'] = 'stories' + def get_query(self, project_id: str | int, **params: Any) -> list[dict[str, Any]]: query = self.api_request(f"projects/{project_id}/search", params=params) + return query['stories']['stories'] - return query['stories'] - - def get_tasks(self, project_id, story_id): - tasks = self.api_request( - "projects/{project_id}/stories/{story_id}/tasks".format( - project_id=project_id, story_id=story_id - ) - ) + def get_tasks( + self, project_id: str | int, story_id: str | int + ) -> list[dict[str, Any]]: + tasks = self.api_request(f"projects/{project_id}/stories/{story_id}/tasks") return tasks - def get_blockers(self, project_id, story_id): + def get_blockers( + self, project_id: str | int, story_id: str | int + ) -> list[dict[str, Any]]: blockers = self.api_request( - "projects/{project_id}/stories/{story_id}/blockers".format( - project_id=project_id, story_id=story_id - ) + f"projects/{project_id}/stories/{story_id}/blockers" ) blocker_results = [] for blocker in blockers: @@ -233,10 +230,8 @@ def get_blockers(self, project_id, story_id): blocker_results.append(blocker) return blocker_results - def get_user_by_id(self, project_id, user_ids): - persons = self.api_request( - "projects/{project_id}/memberships".format(project_id=project_id) - ) + def get_user_by_id(self, project_id: str | int, user_ids: list[Any]) -> str | None: + persons = self.api_request(f"projects/{project_id}/memberships") user_list = filter( lambda x: x.get('id') in user_ids, map(operator.itemgetter('person'), persons), diff --git a/bugwarrior/services/redmine.py b/bugwarrior/services/redmine.py index 26b07b72..b2db760f 100644 --- a/bugwarrior/services/redmine.py +++ b/bugwarrior/services/redmine.py @@ -1,6 +1,8 @@ +from collections.abc import Iterator import logging import re import typing +from typing import Any import requests from taskw import TaskWarriorShellout @@ -29,14 +31,23 @@ class RedMineConfig(config.ServiceConfig): class RedMineClient(Client): - def __init__(self, url, key, auth, issue_limit, verify_ssl): + def __init__( + self, + url: str, + key: str, + auth: tuple[str, str] | None, + issue_limit: int | None, + verify_ssl: bool, + ) -> None: self.url = url self.key = key self.auth = auth self.issue_limit = issue_limit self.verify_ssl = verify_ssl - def find_issues(self, issue_limit, query, only_if_assigned=False): + def find_issues( + self, issue_limit: int | None, query: str, only_if_assigned: bool | str = False + ) -> list[dict[str, Any]]: args = {} url = "/issues.json?" + query @@ -51,15 +62,17 @@ def find_issues(self, issue_limit, query, only_if_assigned=False): return self.call_api(url, args)["issues"] - def call_api(self, uri, params): + def call_api(self, uri: str, params: dict[str, Any]) -> dict[str, Any]: url = self.url.rstrip("/") + uri - kwargs = {'headers': {'X-Redmine-API-Key': self.key}, 'params': params} + kwargs: dict[str, Any] = { + 'headers': {'X-Redmine-API-Key': self.key}, + 'params': params, + 'verify': self.verify_ssl, + } if self.auth: kwargs['auth'] = self.auth - kwargs['verify'] = self.verify_ssl - return self.json_response(requests.get(url, **kwargs)) @@ -101,7 +114,7 @@ class RedMineIssue(Issue): } UNIQUE_KEY = (ID,) - PRIORITY_MAP = { + PRIORITY_MAP: dict[str, config.Priority] = { 'Low': 'L', 'Normal': 'M', 'High': 'H', @@ -109,7 +122,7 @@ class RedMineIssue(Issue): 'Immediate': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: due_date = self.record.get('due_date') start_date = self.record.get('start_date') updated_on = self.record.get('updated_on') @@ -160,20 +173,20 @@ def to_taskwarrior(self): self.SPENT_HOURS: spent_hours, } - def get_priority(self): + def get_priority(self) -> config.Priority: return self.PRIORITY_MAP.get( self.record.get('priority', {}).get('name'), self.config.default_priority ) - def get_issue_url(self): + def get_issue_url(self) -> str: return self.config.url + "/issues/" + str(self.record["id"]) - def get_converted_hours(self, estimated_hours): + def get_converted_hours(self, estimated_hours: str) -> str: tw = TaskWarriorShellout(config_filename=self.main_config.taskrc) calc = tw._execute('calc', estimated_hours) return calc[0].rstrip() - def get_project_name(self): + def get_project_name(self) -> str: if self.config.project_name: return self.config.project_name # TODO: It would be nice to use the project slug (if the Redmine @@ -182,7 +195,7 @@ def get_project_name(self): # project ID contained in self.record and the list of projects. return re.sub(r'[^a-zA-Z0-9]', '', self.record["project"]["name"]).lower() - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['subject'], url=self.get_issue_url(), @@ -191,13 +204,15 @@ def get_default_description(self): ) -class RedMineService(Service): +class RedMineService(Service[RedMineIssue]): API_VERSION = 1.0 ISSUE_CLASS = RedMineIssue CONFIG_SCHEMA = RedMineConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: RedMineConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.key = self.get_secret('key') @@ -218,10 +233,10 @@ def __init__(self, *args, **kw): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: RedMineConfig) -> str: return f"redmine://{config.login}@{config.url}/" - def issues(self): + def issues(self) -> Iterator[RedMineIssue]: issues = self.client.find_issues( self.config.issue_limit, self.config.query, self.config.only_if_assigned ) diff --git a/bugwarrior/services/taiga.py b/bugwarrior/services/taiga.py index ef1871c6..33cc8b56 100644 --- a/bugwarrior/services/taiga.py +++ b/bugwarrior/services/taiga.py @@ -1,5 +1,7 @@ +from collections.abc import Iterator import logging import typing +from typing import Any import requests @@ -33,7 +35,7 @@ class TaigaIssue(Issue): } UNIQUE_KEY = (URL,) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['project'], 'annotations': self.extra['annotations'], @@ -44,10 +46,10 @@ def to_taskwarrior(self): self.SUMMARY: self.record['subject'], } - def get_tags(self): + def get_tags(self) -> list[str]: return [x if isinstance(x, str) else x[0] for x in self.record['tags']] - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record['subject'], url=self.extra['url'], @@ -56,13 +58,15 @@ def get_default_description(self): ) -class TaigaService(Service, Client): +class TaigaService(Service[TaigaIssue]): API_VERSION = 1.0 ISSUE_CLASS = TaigaIssue CONFIG_SCHEMA = TaigaConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: TaigaConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.auth_token = self.get_secret('auth_token') self.session = requests.session() self.session.headers.update( @@ -73,10 +77,12 @@ def __init__(self, *args, **kw): ) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: TaigaConfig) -> str: return f"taiga://{config.base_uri}" - def _issues(self, userid, task_type, task_type_plural, task_type_short): + def _issues( + self, userid: int, task_type: str, task_type_plural: str, task_type_short: str + ) -> Iterator[TaigaIssue]: log.debug('Getting %s' % task_type_plural) response = self.session.get( @@ -96,7 +102,7 @@ def _issues(self, userid, task_type, task_type_plural, task_type_short): } yield self.get_issue_for_record(task, extra) - def issues(self): + def issues(self) -> Iterator[TaigaIssue]: url = self.config.base_uri + '/api/v1/users/me' me = self.session.get(url) data = me.json() @@ -114,11 +120,13 @@ def issues(self): yield from self._issues(userid, 'task', 'tasks', 'task') @cache.cache_on_arguments() - def get_project(self, project_id): + def get_project(self, project_id: int) -> dict[str, Any]: url = '%s/api/v1/projects/%i' % (self.config.base_uri, project_id) - return self.json_response(self.session.get(url)) + return Client.json_response(self.session.get(url)) - def build_url(self, task, project, task_type): + def build_url( + self, task: dict[str, Any], project: dict[str, Any], task_type: str + ) -> str: return '%s/project/%s/%s/%i' % ( self.config.base_uri, project['slug'], @@ -126,7 +134,13 @@ def build_url(self, task, project, task_type): task['ref'], ) - def annotations(self, task, project, task_type, task_type_short): + def annotations( + self, + task: dict[str, Any], + project: dict[str, Any], + task_type: str, + task_type_short: str, + ) -> list[str]: url = f"{self.config.base_uri}/api/v1/history/{task_type}/{task['id']}" response = self.session.get(url) history = response.json() diff --git a/bugwarrior/services/teamwork_projects.py b/bugwarrior/services/teamwork_projects.py index 0e9a1bb9..f730e3bd 100644 --- a/bugwarrior/services/teamwork_projects.py +++ b/bugwarrior/services/teamwork_projects.py @@ -1,5 +1,7 @@ +from collections.abc import Iterator import logging import typing +from typing import Any import requests @@ -19,18 +21,12 @@ class TeamworkConfig(config.ServiceConfig): class TeamworkClient(Client): - def __init__(self, host, token): + def __init__(self, host: str, token: str) -> None: self.host = host self.token = token - def authenticate(self): - response = requests.get(self.host + "/authenticate.json", auth=(self.token, "")) - return self.json_response(response) - - def call_api(self, method, endpoint, data=None): - response = requests.get( - self.host + endpoint, auth=(self.token, ""), params=data - ) + def get(self, endpoint: str) -> dict[str, Any]: + response = requests.get(f"{self.host}/{endpoint}", auth=(self.token, "")) return self.json_response(response) @@ -54,17 +50,17 @@ class TeamworkIssue(Issue): UNIQUE_KEY = (URL,) PRIORITY_MAP = {"low": "L", "medium": "M", "high": "H"} - def get_task_url(self): + def get_task_url(self) -> str: return self.extra["host"] + "/#/tasks/" + str(self.record["id"]) - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( title=self.record["content"], url=self.get_task_url(), number=self.record["id"], ) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: task_url = self.get_task_url() status = self.record["status"] @@ -96,27 +92,29 @@ def to_taskwarrior(self): } -class TeamworkService(Service): +class TeamworkService(Service[TeamworkIssue]): API_VERSION = 1.0 ISSUE_CLASS = TeamworkIssue CONFIG_SCHEMA = TeamworkConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: TeamworkConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.client = TeamworkClient(self.config.host, self.config.token) - user = self.client.authenticate() + user = self.client.get("authenticate.json") self.user_id = user["account"]["userId"] self.name = user["account"]["firstname"] + " " + user["account"]["lastname"] @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: TeamworkConfig) -> str: return f'teamwork_projects://{config.host}' - def get_comments(self, issue): + def get_comments(self, issue: dict[str, Any]) -> list[str]: if self.main_config.annotation_comments: if issue.get("comments-count", 0) > 0: - endpoint = "/tasks/{task_id}/comments.json".format(task_id=issue["id"]) - comments = self.client.call_api("GET", endpoint) + endpoint = f"tasks/{issue['id']}/comments.json" + comments = self.client.get(endpoint) comment_list = [] for comment in comments["comments"]: author = "{first} {last}".format( @@ -128,8 +126,8 @@ def get_comments(self, issue): return self.build_annotations(comment_list, None) return [] - def issues(self): - response = self.client.call_api("GET", "/tasks.json") + def issues(self) -> Iterator[TeamworkIssue]: + response = self.client.get("tasks.json") for issue in response["todo-items"]: # Determine if issue is need by if following comments, changes or assigned if ( diff --git a/bugwarrior/services/todoist.py b/bugwarrior/services/todoist.py index 6ebe7748..71b7497d 100644 --- a/bugwarrior/services/todoist.py +++ b/bugwarrior/services/todoist.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator from dataclasses import asdict from datetime import datetime, time import logging import typing +from typing import Any from todoist_api_python.api import TodoistAPI from todoist_api_python.models import Task @@ -23,19 +25,19 @@ class TodoistConfig(config.ServiceConfig): class TodoistClient(Client): - def __init__(self, token, filter): + def __init__(self, token: str, filter: str) -> None: self._api = TodoistAPI(token) self.filter = filter @classmethod - def task_to_dict(cls, task: Task): + def task_to_dict(cls, task: Task) -> dict[str, Any]: record = asdict(task) # add data items for additional properties record["is_completed"] = task.is_completed record["url"] = task.url return record - def get_projects(self): + def get_projects(self) -> list[Any]: all_projects = [] projects_iter = self._api.get_projects() for projects in projects_iter: @@ -43,7 +45,7 @@ def get_projects(self): all_projects.append(project) return all_projects - def get_sections(self): + def get_sections(self) -> list[Any]: all_sections = [] sections_iter = self._api.get_sections() for sections in sections_iter: @@ -51,7 +53,7 @@ def get_sections(self): all_sections.append(section) return all_sections - def get_users(self, project_id): + def get_users(self, project_id: Any) -> list[Any]: all_users = [] users_iter = self._api.get_collaborators(project_id) for users in users_iter: @@ -59,14 +61,14 @@ def get_users(self, project_id): all_users.append(user) return all_users - def get_issues(self): + def get_issues(self) -> Iterator[dict[str, Any]]: tasks_iter = self._api.filter_tasks(query=self.filter) for tasks in tasks_iter: for task in tasks: record = self.task_to_dict(task) yield record - def get_comments(self, task_id): + def get_comments(self, task_id: str) -> list[Any]: all_comments = [] comments_iter = self._api.get_comments(task_id=task_id) for comments in comments_iter: @@ -88,7 +90,7 @@ class TodoistIssue(Issue): SECTION = "todoistsection" URL = "todoisturl" - PRIORITY_MAP = {4: "H", 3: "M", 2: "L", 1: None} + PRIORITY_MAP: dict[int, config.Priority | None] = {4: "H", 3: "M", 2: "L", 1: None} UDAS = { ID: {"type": "string", "label": "Todoist ID"}, @@ -108,14 +110,14 @@ class TodoistIssue(Issue): # replace characters that cause escaping issues like [] and " # this is a workaround for https://github.com/ralphbean/taskw/issues/172 - def _unescape_content(self, content): + def _unescape_content(self, content: str) -> str: return ( content.replace('"', "'") # prevent &dquote; in task details .replace("[", self.config.char_open_bracket) # prevent &open; and &close; .replace("]", self.config.char_close_bracket) ) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: default_time = time(0, 0, 0) # adjust timezone to use local time for "floating" dates if self.record["due"]: @@ -169,7 +171,7 @@ def to_taskwarrior(self): } return task - def get_default_description(self): + def get_default_description(self) -> str: description = self.build_default_description( title=self._unescape_content(self.record["content"]), url=self.record["url"], @@ -179,13 +181,15 @@ def get_default_description(self): return description -class TodoistService(Service): +class TodoistService(Service[TodoistIssue]): API_VERSION = 1.0 ISSUE_CLASS = TodoistIssue CONFIG_SCHEMA = TodoistConfig - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__( + self, config: TodoistConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.token = self.get_secret("token") # apply additional filters @@ -204,10 +208,12 @@ def __init__(self, *args, **kwargs): self.client = TodoistClient(token=self.token, filter=filter) @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: TodoistConfig) -> str: return "todoist://" - def annotations(self, user_index, issue): + def annotations( + self, user_index: dict[Any, str], issue: dict[str, Any] + ) -> list[str]: comments = ( self.client.get_comments(issue["id"]) if self.main_config.annotation_comments @@ -215,13 +221,13 @@ def annotations(self, user_index, issue): ) return self.build_annotations( [ - (user_index.get(comment.poster_id), comment.content) + (user_index.get(comment.poster_id) or "", comment.content) for comment in comments ], issue["url"], ) - def issues(self): + def issues(self) -> Iterator[TodoistIssue]: project_index = { project.id: project.name for project in self.client.get_projects() } diff --git a/bugwarrior/services/trac.py b/bugwarrior/services/trac.py index 35490ba7..8fb3b9e9 100644 --- a/bugwarrior/services/trac.py +++ b/bugwarrior/services/trac.py @@ -1,7 +1,9 @@ +from collections.abc import Iterator import csv import io as StringIO import logging import typing +from typing import Any import urllib.parse import offtrac @@ -37,7 +39,7 @@ class TracIssue(Issue): } UNIQUE_KEY = (URL,) - PRIORITY_MAP = { + PRIORITY_MAP: dict[str, config.Priority] = { 'trivial': 'L', 'minor': 'L', 'major': 'M', @@ -45,7 +47,7 @@ class TracIssue(Issue): 'blocker': 'H', } - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['project'], 'priority': self.get_priority(), @@ -56,7 +58,7 @@ def to_taskwarrior(self): self.COMPONENT: self.record['component'], } - def get_default_description(self): + def get_default_description(self) -> str: if 'number' in self.record: number = self.record['number'] else: @@ -69,19 +71,22 @@ def get_default_description(self): cls='issue', ) - def get_priority(self): + def get_priority(self) -> config.Priority: return self.PRIORITY_MAP.get( self.record.get('priority', ''), self.config.default_priority ) -class TracService(Service): +class TracService(Service[TracIssue]): API_VERSION = 1.0 ISSUE_CLASS = TracIssue CONFIG_SCHEMA = TracConfig + trac: offtrac.TracServer | None - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: TracConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) if self.config.username: password = self.get_secret('password', self.config.username) @@ -89,18 +94,18 @@ def __init__(self, *args, **kw): else: auth = '' - self.trac = None uri = f'{self.config.scheme}://{auth}{self.config.base_uri}/' if self.config.no_xmlrpc: self.uri = uri + self.trac = None else: self.trac = offtrac.TracServer(uri + 'login/xmlrpc') @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: TracConfig) -> str: return f"https://{config.username}@{config.base_uri}/" - def annotations(self, issue): + def annotations(self, issue: dict[str, Any]) -> list[str]: annotations = [] # without offtrac, we can't get issue comments if self.trac is None: @@ -114,11 +119,10 @@ def annotations(self, issue): return self.build_annotations(annotations, issue['url']) - def get_owner(self, issue): - tag, issue = issue - return issue.get('owner', None) or None + def get_owner(self, issue: tuple[str, dict[str, Any]]) -> str | None: + return issue[1].get('owner', None) or None - def include(self, issue): + def include(self, issue: tuple[str, dict[str, Any]]) -> bool: """Return true if the issue in question should be included""" if self.config.only_if_assigned: owner = self.get_owner(issue) @@ -131,7 +135,7 @@ def include(self, issue): return True - def issues(self): + def issues(self) -> Iterator[TracIssue]: base_url = "https://" + self.config.base_uri if self.trac: tickets = self.trac.query_tickets('status!=closed&max=0') diff --git a/bugwarrior/services/trello.py b/bugwarrior/services/trello.py index 124bd20a..1120679c 100644 --- a/bugwarrior/services/trello.py +++ b/bugwarrior/services/trello.py @@ -6,7 +6,9 @@ Trello API documentation available at https://developers.trello.com/ """ +from collections.abc import Iterator import typing +from typing import Any import requests @@ -50,7 +52,7 @@ class TrelloIssue(Issue): } UNIQUE_KEY = (CARDID,) - def get_default_description(self): + def get_default_description(self) -> str: """Return the old-style verbose description from bugwarrior.""" return self.build_default_description( title=self.record['name'], @@ -59,12 +61,12 @@ def get_default_description(self): cls='task', ) - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels( [label['name'] for label in self.record['labels']] ) - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.extra['boardname'], 'due': self.parse_date(self.record['due']), @@ -83,16 +85,16 @@ def to_taskwarrior(self): } -class TrelloService(Service, Client): +class TrelloService(Service[TrelloIssue]): API_VERSION = 1.0 ISSUE_CLASS = TrelloIssue CONFIG_SCHEMA = TrelloConfig @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: TrelloConfig) -> str: return f"trello://{config.api_key}@trello.com" - def issues(self): + def issues(self) -> Iterator[TrelloIssue]: """ Returns a list of dicts representing issues from a remote service. """ @@ -104,7 +106,7 @@ def issues(self): issue.extra.update({"annotations": self.annotations(card)}) yield issue - def annotations(self, card_json): + def annotations(self, card_json: dict[str, Any]) -> list[str]: """A wrapper around get_comments that build the taskwarrior annotations.""" comments = self.get_comments(card_json['id']) @@ -114,7 +116,7 @@ def annotations(self, card_json): ) return annotations - def get_boards(self): + def get_boards(self) -> Iterator[dict[str, Any]]: """ Get the list of boards to pull cards from. If the user gave a value to trello.include_boards use that, otherwise ask the Trello API for the @@ -124,11 +126,12 @@ def get_boards(self): for boardid in self.config.include_boards: # Get the board name yield self.api_request(f"/1/boards/{boardid}", fields='name') + else: boards = self.api_request("/1/members/me/boards", fields='name') yield from boards - def get_lists(self, board): + def get_lists(self, board: str) -> list[dict[str, Any]]: """ Returns a list of the filtered lists for the given board This filters the trello lists according to the configuration values of @@ -146,7 +149,7 @@ def get_lists(self, board): return lists - def get_cards(self, list_id): + def get_cards(self, list_id: str) -> Iterator[dict[str, Any]]: """Returns an iterator for the cards in a given list, filtered according to configuration values of trello.only_if_assigned and trello.also_unassigned""" @@ -164,7 +167,7 @@ def get_cards(self, list_id): ): yield card - def get_comments(self, card_id): + def get_comments(self, card_id: str) -> Iterator[dict[str, Any]]: """Returns an iterator for the comments on a certain card.""" params = {'filter': 'commentCard', 'memberCreator_fields': 'username'} comments = self.api_request(f"/1/cards/{card_id}/actions", **params) @@ -172,7 +175,7 @@ def get_comments(self, card_id): assert comment['type'] == 'commentCard' yield comment - def api_request(self, url, **params): + def api_request(self, url: str, **params: Any) -> Any: """ Make a trello API request. This takes an absolute url (without protocol and host) and a list of argumnets and return a GET request with the @@ -181,4 +184,4 @@ def api_request(self, url, **params): params['key'] = self.config.api_key params['token'] = self.get_secret('token') url = "https://api.trello.com" + url - return self.json_response(requests.get(url, params=params)) + return Client.json_response(requests.get(url, params=params)) diff --git a/bugwarrior/services/youtrack.py b/bugwarrior/services/youtrack.py index f2462e77..a3de6e9a 100644 --- a/bugwarrior/services/youtrack.py +++ b/bugwarrior/services/youtrack.py @@ -1,5 +1,7 @@ +from collections.abc import Iterator import logging import typing +from typing import Any from pydantic import computed_field import requests @@ -61,9 +63,9 @@ class YoutrackIssue(Issue): NUMBER: {'type': 'string', 'label': 'YouTrack Project Issue Number'}, } UNIQUE_KEY = (URL,) - PRIORITY_MAP = {} # FIXME + PRIORITY_MAP: dict[str, config.Priority] = {} - def to_taskwarrior(self): + def to_taskwarrior(self) -> dict[str, Any]: return { 'project': self.get_project(), 'priority': self.get_priority(), @@ -75,30 +77,30 @@ def to_taskwarrior(self): self.NUMBER: self.get_number_in_project(), } - def get_issue(self): - return self.get_project() + '-' + str(self.get_number_in_project()) + def get_issue(self) -> str: + return (self.get_project() or '') + '-' + str(self.get_number_in_project()) - def get_issue_summary(self): + def get_issue_summary(self) -> str | None: return self.record.get('summary') - def get_issue_url(self): + def get_issue_url(self) -> str: return "%s/issue/%s" % (self.config.base_url, self.get_issue()) - def get_project(self): + def get_project(self) -> str | None: return self.record.get('project', {}).get('shortName') - def get_number_in_project(self): + def get_number_in_project(self) -> int | None: return self.record.get('numberInProject') - def get_default_description(self): + def get_default_description(self) -> str: return self.build_default_description( - title=self.get_issue_summary(), + title=self.get_issue_summary() or '', url=self.get_issue_url(), number=self.get_issue(), cls='issue', ) - def get_tags(self): + def get_tags(self) -> list[str]: return self.get_tags_from_labels( [tag['name'] for tag in self.record.get('tags', [])], toggle_option='import_tags', @@ -107,13 +109,15 @@ def get_tags(self): ) -class YoutrackService(Service, Client): +class YoutrackService(Service[YoutrackIssue]): API_VERSION = 1.0 ISSUE_CLASS = YoutrackIssue CONFIG_SCHEMA = YoutrackConfig - def __init__(self, *args, **kw): - super().__init__(*args, **kw) + def __init__( + self, config: YoutrackConfig, main_config: config.MainSectionConfig + ) -> None: + super().__init__(config, main_config) self.rest_url = self.config.base_url + '/api' @@ -127,17 +131,17 @@ def __init__(self, *args, **kw): self.session.headers['Authorization'] = f'Bearer {token}' @staticmethod - def get_keyring_service(config): + def get_keyring_service(config: YoutrackConfig) -> str: return f"youtrack://{config.login}@{config.host}" - def issues(self): + def issues(self) -> Iterator[YoutrackIssue]: params = { 'query': self.config.query, 'max': self.config.query_limit, 'fields': 'id,summary,project(shortName),numberInProject,tags(name)', } resp = self.session.get(self.rest_url + '/issues', params=params) - issues = self.json_response(resp) + issues = Client.json_response(resp) log.debug(" Found %i total.", len(issues)) for issue in issues: diff --git a/pyproject.toml b/pyproject.toml index b114098e..adbfd9b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,13 +91,12 @@ skip-magic-trailing-comma = true select = ["E4", "E7", "E9", "F", "W", "I", "E501", "ANN"] ignore = ["ANN401"] # ANN rules (flake8-annotations) detect untyped code — useful for gradual typing. -# Currently enforced in: bugwarrior/config/, command.py, db.py, collect.py, notifications.py +# Enforced in application code under bugwarrior/ (excluding tests/ and bugwarrior/docs/). # ANN401 (disallow Any) is too strict for wrappers, *args/**kwargs, and data storage. [tool.ruff.lint.per-file-ignores] "tests/**" = ["ANN"] "bugwarrior/docs/**" = ["ANN"] -"bugwarrior/services/{bts,bz,clickup,deck,gerrit,gitbug,github,gitlab,gmail,jira,kanboard,linear,logseq,pagure,phab,pivotaltracker,redmine,taiga,teamwork_projects,todoist,trac,trello,youtrack}.py" = ["ANN"] [tool.ruff.lint.flake8-annotations] suppress-dummy-args = true @@ -141,5 +140,5 @@ test = [ "responses", # ruff and ty are required by tests/test_general.py "ruff", - "ty>=0.0.29", + "ty==0.0.32", ] diff --git a/tests/test_deck.py b/tests/test_deck.py index 19705459..66558dc6 100644 --- a/tests/test_deck.py +++ b/tests/test_deck.py @@ -169,6 +169,21 @@ def test_issues(self): self.assertEqual(TaskConstructor(issue).get_taskwarrior_record(), expected) + def test_get_owner(self): + # Regression test: the old get_owner did `issue[issue.ASSIGNEE]`, treating + # the NextcloudDeckIssue as a dict. Issue has no __getitem__, so this raised + # TypeError whenever only_if_assigned was configured. + self.config['deck']['only_if_assigned'] = 'rainbow' + issue = self.service.get_issue_for_record( + self.data.arbitrary_card, + { + 'board': {'title': 'testboard', 'id': 5}, + 'stack': {'title': 'teststack', 'id': 13}, + 'annotations': [], + }, + ) + self.assertEqual(self.service.get_owner(issue), 'rainbow') + def test_filter_boards_include(self): self.config['deck']['include_board_ids'] = '5' self.assertTrue(self.service.filter_boards({'title': 'testboard', 'id': 5}))