From 192b947978aaf5873a506568dacbb5073474756c Mon Sep 17 00:00:00 2001 From: geospatial-jeff Date: Thu, 3 Jul 2025 10:14:15 -0600 Subject: [PATCH 1/4] speed up ftw download data command --- pyproject.toml | 3 +- src/ftw_cli/download_ftw.py | 218 +++++++++++++----------------------- 2 files changed, 80 insertions(+), 141 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ddb1b535..c0a0b18b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,8 @@ dependencies = [ "pyproj", # For projections and CRSes "fiboa-cli==0.7.0", # For storing data in Fiboa format "tenacity==9.1.2", # For backoff / retry. - "dask[distributed]==2025.5.1" # For parallel processing + "dask[distributed]==2025.5.1", # For parallel processing + "smart_open[s3]==7.0.5" # For streaming file download. ] [project.urls] diff --git a/src/ftw_cli/download_ftw.py b/src/ftw_cli/download_ftw.py index d4bb3c95..181f1dbb 100644 --- a/src/ftw_cli/download_ftw.py +++ b/src/ftw_cli/download_ftw.py @@ -1,16 +1,24 @@ +import concurrent.futures +import functools import hashlib import logging import os import shutil -import time -import wget -from tqdm import tqdm +import boto3 +import smart_open +from botocore import UNSIGNED +from botocore.config import Config from .cfg import ALL_COUNTRIES +logger = logging.getLogger() +client = boto3.client( + "s3", config=Config(signature_version=UNSIGNED), region_name="us-west-1" +) -def load_checksums(local_md5_file_path): + +def _load_checksums(local_md5_file_path): """ Load the checksum data from a local md5 file. @@ -25,7 +33,7 @@ def load_checksums(local_md5_file_path): return checksum_data -def calculate_md5(file_path): +def _calculate_md5(file_path): """Calculate the MD5 checksum of a file.""" hash_md5 = hashlib.md5() with open(file_path, "rb") as f: @@ -34,6 +42,54 @@ def calculate_md5(file_path): return hash_md5.hexdigest() +def _download_file(key: str, fpath: str): + # if key.endswith(".zip"): + print(f"Downloading {key} to {fpath}") + with smart_open.open( + f"s3://us-west-2.opendata.source.coop/{key}", + "rb", + transport_params={"client": client}, + ) as f: + with open(fpath, "wb") as outf: + for chunk in f: + outf.write(chunk) + + +def _download_country_file( + country_name: str, root_folder: str, checksum_data: dict[str, str] +): + key = f"kerner-lab/fields-of-the-world-archive/{country_name}.zip" + local_file_path = os.path.join(root_folder, f"{country_name}.zip") + + # Check if the file already exists locally + if os.path.exists(local_file_path): + print(f"File {local_file_path} already exists, skipping download.") + return + + # Otherwise download the file. + expected_md5 = checksum_data[country_name] + try: + _download_file(key, local_file_path) + + # Verify checksum (md5 hash) + actual_md5 = _calculate_md5(local_file_path) + if country_name not in checksum_data: + print(f"No checksum found for {country_name}, skipping verification.") + return + if actual_md5 == expected_md5: + logger.info(f"Checksum verification passed for {local_file_path}") + print(f"Checksum verification passed for {country_name}.") + else: + logger.error(f"Checksum verification failed for {local_file_path}") + print( + f"Checksum verification failed for {country_name}. Expected: {expected_md5}, Found: {actual_md5}" + ) + except Exception: + logger.exception(f"Error downloading {key}") + print(f"Failed to download {key}.") + raise + + def download(out, clean_download, countries): root_folder_path = os.path.abspath(out) @@ -47,148 +103,24 @@ def download(out, clean_download, countries): # Ensure the root folder exists os.makedirs(root_folder_path, exist_ok=True) - - # Path for the log file inside the data folder - log_file = os.path.join(root_folder_path, "download.log") - - # Initialize logging logging.basicConfig( - filename=log_file, + filename=os.path.join(root_folder_path, "download.log"), level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s", ) - logger = logging.getLogger() - - def custom_progress_bar(file_name): - start_time = time.time() - print(f"Downloading {file_name}...") - - def bar(current, total, width=None): - if width is None: - width = shutil.get_terminal_size((80, 20)).columns - elapsed_time = time.time() - start_time - speed = current / elapsed_time if elapsed_time > 0 else 0 - progress = current / total - bar_width = int( - (width - 40) - ) # Adjust width for the text and make the bar 30% shorter - block = int(round(bar_width * progress)) - progress_message = ( - f"{current / (1024**3):.3f} GB / {total / (1024**3):.3f} GB" - ) - speed_message = f"Speed: {speed / (1024**2):.2f} MB/s" - bar = f"[{'#' * block}{'-' * (bar_width - block)}]" - print(f"{progress_message} {bar} {speed_message}", end="\r") - - return bar - - def download_file(url, local_file_path): - """ - Downloads a file from the given URL using wget. - :param url: URL of the file to be downloaded - :param local_file_path: Path to the local file where the URL will be downloaded - :return: True if download was successful, False otherwise - """ - try: - # Use wget to download the file with a custom progress bar - wget.download( - url, - local_file_path, - bar=custom_progress_bar(os.path.basename(local_file_path)), - ) - logger.info(f"Downloaded {url} to {local_file_path}") - print(f"\nDownloaded {local_file_path}") - return True - except Exception as e: - logger.error(f"Error downloading {url}: {e}") - print(f"Error downloading {url}: {str(e)}") - return False - - def download_and_verify_md5(zip_file_path, country, checksum_data): - """ - Download a .zip file for a country and verify its checksum. - - :param zip_file_path: Path to the local .zip file - :param country: Name of the country - :param checksum_data: Dictionary of country to checksum hash - :return: True if checksum matches, False otherwise - """ - print(f"Verifying {zip_file_path}...") - - if country not in checksum_data: - print(f"No checksum found for {country}, skipping verification.") - return False - - # Calculate the MD5 checksum of the downloaded file - calculated_checksum = calculate_md5(zip_file_path) - expected_checksum = checksum_data[country] - - if calculated_checksum == expected_checksum: - print(f"Checksum verification passed for {country}.") - logger.info(f"Checksum verification passed for {zip_file_path}") - return True - else: - print( - f"Checksum verification failed for {country}. Expected: {expected_checksum}, Found: {calculated_checksum}" - ) - logger.error(f"Checksum verification failed for {zip_file_path}") - return False - - def download_all_country_files(root_folder, country_names, checksum_data): - """ - Download all .zip files for the specified countries and verify their checksums using basic URL downloads. - - :param root_folder: Path to the root folder where files will be downloaded - :param country_names: List of country names whose files need to be downloaded - :param checksum_data: Dictionary of country to checksum hash - """ - base_url = "https://data.source.coop/kerner-lab/fields-of-the-world-archive/" - - # tqdm progress for the entire dataset - overall_progress = tqdm( - total=len(country_names), desc="Overall Download Progress", unit="country" - ) - - for country in country_names: - url = f"{base_url}{country}.zip" - local_file_path = os.path.join(root_folder, f"{country}.zip") - - # Check if the file already exists locally - if os.path.exists(local_file_path): - logger.info( - f"File {local_file_path} already exists, skipping download." - ) - print(f"File {local_file_path} already exists, skipping download.") - else: - # Download the file - if download_file(url, local_file_path): - # Verify the file checksum - download_and_verify_md5( - local_file_path, country.lower(), checksum_data - ) - - # Update the overall progress - overall_progress.update(1) - - overall_progress.close() - - # Main script # Step 1: Download the checksum.md5 file local_md5_file_path = os.path.join(root_folder_path, "checksum.md5") - md5_file_url = ( - "https://data.source.coop/kerner-lab/fields-of-the-world-archive/checksum.md5" - ) - - if not os.path.exists(local_md5_file_path): - if not download_file(md5_file_url, local_md5_file_path): - print("Failed to download checksum.md5 file.") - return - else: - print(f"Downloaded checksum.md5 to {local_md5_file_path}") + key = "kerner-lab/fields-of-the-world-archive/checksum.md5" + try: + _download_file(key, local_md5_file_path) + print(f"Downloaded checksum.md5 to {local_md5_file_path}") + except Exception as exc: + print("Failed to download checksum.md5 file.") + raise exc # Step 2: Load the checksum data - checksum_data = load_checksums(local_md5_file_path) + checksum_data = _load_checksums(local_md5_file_path) # Step 3: Handle country selection (all or specific countries) if countries == "all": @@ -202,5 +134,11 @@ def download_all_country_files(root_folder, country_names, checksum_data): ] print(f"Downloading selected countries: {country_names}") - # Step 4: Download all .zip files for the specified countries and verify their checksums - download_all_country_files(root_folder_path, country_names, checksum_data) + # Step 4: Run the download + func = functools.partial( + _download_country_file, + root_folder=root_folder_path, + checksum_data=checksum_data, + ) + with concurrent.futures.ThreadPoolExecutor() as exec: + exec.map(func, country_names) From bba9865d43ea13c22c4565b572db3e11c9e156ac Mon Sep 17 00:00:00 2001 From: geospatial-jeff Date: Thu, 3 Jul 2025 10:18:26 -0600 Subject: [PATCH 2/4] don't reraise exception --- src/ftw_cli/download_ftw.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ftw_cli/download_ftw.py b/src/ftw_cli/download_ftw.py index 181f1dbb..94840722 100644 --- a/src/ftw_cli/download_ftw.py +++ b/src/ftw_cli/download_ftw.py @@ -87,7 +87,6 @@ def _download_country_file( except Exception: logger.exception(f"Error downloading {key}") print(f"Failed to download {key}.") - raise def download(out, clean_download, countries): From 51272c8617ab0353f955f3adaff337171adaef0b Mon Sep 17 00:00:00 2001 From: geospatial-jeff Date: Mon, 7 Jul 2025 15:37:08 -0600 Subject: [PATCH 3/4] fix unit test --- src/tests/test_data.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tests/test_data.py b/src/tests/test_data.py index 60ec05d1..f28e17b8 100644 --- a/src/tests/test_data.py +++ b/src/tests/test_data.py @@ -13,7 +13,6 @@ def test_data_download(): result = runner.invoke(download, ["-f", "--countries=Rwanda", "--no-unpack"]) assert result.exit_code == 0, result.output assert "Downloading selected countries: ['rwanda']" in result.output - assert "Overall Download Progress: 100%" in result.output assert "Unpacking files:" not in result.output assert os.path.exists("data/rwanda.zip") assert not os.path.exists("data/ftw/rwanda") @@ -22,7 +21,6 @@ def test_data_download(): result = runner.invoke(download, ["--countries=Rwanda"]) assert result.exit_code == 0, result.output assert "already exists, skipping download." in result.output - assert "Unpacking files: 100%" in result.output assert os.path.exists("data/rwanda.zip") assert os.path.exists("data/ftw/rwanda") From 616ddbf42a849f8757ba0e98704699268b13cf66 Mon Sep 17 00:00:00 2001 From: geospatial-jeff Date: Mon, 7 Jul 2025 15:39:29 -0600 Subject: [PATCH 4/4] copilot review --- src/ftw_cli/download_ftw.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ftw_cli/download_ftw.py b/src/ftw_cli/download_ftw.py index 94840722..6d794695 100644 --- a/src/ftw_cli/download_ftw.py +++ b/src/ftw_cli/download_ftw.py @@ -14,7 +14,7 @@ logger = logging.getLogger() client = boto3.client( - "s3", config=Config(signature_version=UNSIGNED), region_name="us-west-1" + "s3", config=Config(signature_version=UNSIGNED), region_name="us-west-2" ) @@ -43,7 +43,6 @@ def _calculate_md5(file_path): def _download_file(key: str, fpath: str): - # if key.endswith(".zip"): print(f"Downloading {key} to {fpath}") with smart_open.open( f"s3://us-west-2.opendata.source.coop/{key}", @@ -67,7 +66,6 @@ def _download_country_file( return # Otherwise download the file. - expected_md5 = checksum_data[country_name] try: _download_file(key, local_file_path) @@ -76,6 +74,7 @@ def _download_country_file( if country_name not in checksum_data: print(f"No checksum found for {country_name}, skipping verification.") return + expected_md5 = checksum_data[country_name] if actual_md5 == expected_md5: logger.info(f"Checksum verification passed for {local_file_path}") print(f"Checksum verification passed for {country_name}.")