Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ dependencies = [
"pyproj", # For projections and CRSes
"fiboa-cli==0.7.0", # For storing data in Fiboa format
"tenacity==9.1.2", # For backoff / retry.
"dask[distributed]==2025.5.1" # For parallel processing
"dask[distributed]==2025.5.1", # For parallel processing
"smart_open[s3]==7.0.5" # For streaming file download.
]

[project.urls]
Expand Down
217 changes: 77 additions & 140 deletions src/ftw_cli/download_ftw.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import concurrent.futures
import functools
import hashlib
import logging
import os
import shutil
import time

import wget
from tqdm import tqdm
import boto3
import smart_open
from botocore import UNSIGNED
from botocore.config import Config

from .cfg import ALL_COUNTRIES

logger = logging.getLogger()
client = boto3.client(
"s3", config=Config(signature_version=UNSIGNED), region_name="us-west-1"
Copy link

Copilot AI Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3 client is configured for us-west-1 but the download URL uses us-west-2. Align the region_name with the actual bucket region or make the bucket endpoint configurable.

Suggested change
client = boto3.client(
"s3", config=Config(signature_version=UNSIGNED), region_name="us-west-1"
bucket_region = os.getenv("BUCKET_REGION", "us-west-2")
bucket_endpoint = os.getenv("BUCKET_ENDPOINT", "us-west-2.opendata.source.coop")
client = boto3.client(
"s3", config=Config(signature_version=UNSIGNED), region_name=bucket_region

Copilot uses AI. Check for mistakes.
)

def load_checksums(local_md5_file_path):

def _load_checksums(local_md5_file_path):
"""
Load the checksum data from a local md5 file.

Expand All @@ -25,7 +33,7 @@ def load_checksums(local_md5_file_path):
return checksum_data


def calculate_md5(file_path):
def _calculate_md5(file_path):
"""Calculate the MD5 checksum of a file."""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
Expand All @@ -34,6 +42,53 @@ def calculate_md5(file_path):
return hash_md5.hexdigest()


def _download_file(key: str, fpath: str):
# if key.endswith(".zip"):
Comment thread
geospatial-jeff marked this conversation as resolved.
Outdated
print(f"Downloading {key} to {fpath}")
Comment thread
geospatial-jeff marked this conversation as resolved.
with smart_open.open(
f"s3://us-west-2.opendata.source.coop/{key}",
"rb",
transport_params={"client": client},
) as f:
with open(fpath, "wb") as outf:
for chunk in f:
outf.write(chunk)


def _download_country_file(
country_name: str, root_folder: str, checksum_data: dict[str, str]
):
key = f"kerner-lab/fields-of-the-world-archive/{country_name}.zip"
local_file_path = os.path.join(root_folder, f"{country_name}.zip")

# Check if the file already exists locally
if os.path.exists(local_file_path):
print(f"File {local_file_path} already exists, skipping download.")
return

# Otherwise download the file.
expected_md5 = checksum_data[country_name]
try:
_download_file(key, local_file_path)

# Verify checksum (md5 hash)
actual_md5 = _calculate_md5(local_file_path)
if country_name not in checksum_data:
print(f"No checksum found for {country_name}, skipping verification.")
return
Comment thread
geospatial-jeff marked this conversation as resolved.
if actual_md5 == expected_md5:
logger.info(f"Checksum verification passed for {local_file_path}")
print(f"Checksum verification passed for {country_name}.")
else:
logger.error(f"Checksum verification failed for {local_file_path}")
print(
f"Checksum verification failed for {country_name}. Expected: {expected_md5}, Found: {actual_md5}"
)
except Exception:
logger.exception(f"Error downloading {key}")
print(f"Failed to download {key}.")


def download(out, clean_download, countries):
root_folder_path = os.path.abspath(out)

Expand All @@ -47,148 +102,24 @@ def download(out, clean_download, countries):

# Ensure the root folder exists
os.makedirs(root_folder_path, exist_ok=True)

# Path for the log file inside the data folder
log_file = os.path.join(root_folder_path, "download.log")

# Initialize logging
logging.basicConfig(
filename=log_file,
filename=os.path.join(root_folder_path, "download.log"),
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
)
logger = logging.getLogger()

def custom_progress_bar(file_name):
start_time = time.time()
print(f"Downloading {file_name}...")

def bar(current, total, width=None):
if width is None:
width = shutil.get_terminal_size((80, 20)).columns
elapsed_time = time.time() - start_time
speed = current / elapsed_time if elapsed_time > 0 else 0
progress = current / total
bar_width = int(
(width - 40)
) # Adjust width for the text and make the bar 30% shorter
block = int(round(bar_width * progress))
progress_message = (
f"{current / (1024**3):.3f} GB / {total / (1024**3):.3f} GB"
)
speed_message = f"Speed: {speed / (1024**2):.2f} MB/s"
bar = f"[{'#' * block}{'-' * (bar_width - block)}]"
print(f"{progress_message} {bar} {speed_message}", end="\r")

return bar

def download_file(url, local_file_path):
"""
Downloads a file from the given URL using wget.

:param url: URL of the file to be downloaded
:param local_file_path: Path to the local file where the URL will be downloaded
:return: True if download was successful, False otherwise
"""
try:
# Use wget to download the file with a custom progress bar
wget.download(
url,
local_file_path,
bar=custom_progress_bar(os.path.basename(local_file_path)),
)
logger.info(f"Downloaded {url} to {local_file_path}")
print(f"\nDownloaded {local_file_path}")
return True
except Exception as e:
logger.error(f"Error downloading {url}: {e}")
print(f"Error downloading {url}: {str(e)}")
return False

def download_and_verify_md5(zip_file_path, country, checksum_data):
"""
Download a .zip file for a country and verify its checksum.

:param zip_file_path: Path to the local .zip file
:param country: Name of the country
:param checksum_data: Dictionary of country to checksum hash
:return: True if checksum matches, False otherwise
"""
print(f"Verifying {zip_file_path}...")

if country not in checksum_data:
print(f"No checksum found for {country}, skipping verification.")
return False

# Calculate the MD5 checksum of the downloaded file
calculated_checksum = calculate_md5(zip_file_path)
expected_checksum = checksum_data[country]

if calculated_checksum == expected_checksum:
print(f"Checksum verification passed for {country}.")
logger.info(f"Checksum verification passed for {zip_file_path}")
return True
else:
print(
f"Checksum verification failed for {country}. Expected: {expected_checksum}, Found: {calculated_checksum}"
)
logger.error(f"Checksum verification failed for {zip_file_path}")
return False

def download_all_country_files(root_folder, country_names, checksum_data):
"""
Download all .zip files for the specified countries and verify their checksums using basic URL downloads.

:param root_folder: Path to the root folder where files will be downloaded
:param country_names: List of country names whose files need to be downloaded
:param checksum_data: Dictionary of country to checksum hash
"""
base_url = "https://data.source.coop/kerner-lab/fields-of-the-world-archive/"

# tqdm progress for the entire dataset
overall_progress = tqdm(
total=len(country_names), desc="Overall Download Progress", unit="country"
)

for country in country_names:
url = f"{base_url}{country}.zip"
local_file_path = os.path.join(root_folder, f"{country}.zip")

# Check if the file already exists locally
if os.path.exists(local_file_path):
logger.info(
f"File {local_file_path} already exists, skipping download."
)
print(f"File {local_file_path} already exists, skipping download.")
else:
# Download the file
if download_file(url, local_file_path):
# Verify the file checksum
download_and_verify_md5(
local_file_path, country.lower(), checksum_data
)

# Update the overall progress
overall_progress.update(1)

overall_progress.close()

# Main script
# Step 1: Download the checksum.md5 file
local_md5_file_path = os.path.join(root_folder_path, "checksum.md5")
md5_file_url = (
"https://data.source.coop/kerner-lab/fields-of-the-world-archive/checksum.md5"
)

if not os.path.exists(local_md5_file_path):
if not download_file(md5_file_url, local_md5_file_path):
print("Failed to download checksum.md5 file.")
return
else:
print(f"Downloaded checksum.md5 to {local_md5_file_path}")
key = "kerner-lab/fields-of-the-world-archive/checksum.md5"
try:
_download_file(key, local_md5_file_path)
print(f"Downloaded checksum.md5 to {local_md5_file_path}")
except Exception as exc:
print("Failed to download checksum.md5 file.")
raise exc

# Step 2: Load the checksum data
checksum_data = load_checksums(local_md5_file_path)
checksum_data = _load_checksums(local_md5_file_path)

# Step 3: Handle country selection (all or specific countries)
if countries == "all":
Expand All @@ -202,5 +133,11 @@ def download_all_country_files(root_folder, country_names, checksum_data):
]
print(f"Downloading selected countries: {country_names}")

# Step 4: Download all .zip files for the specified countries and verify their checksums
download_all_country_files(root_folder_path, country_names, checksum_data)
# Step 4: Run the download
func = functools.partial(
_download_country_file,
root_folder=root_folder_path,
checksum_data=checksum_data,
)
with concurrent.futures.ThreadPoolExecutor() as exec:
exec.map(func, country_names)
Loading