diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 72a9688505..95bde21207 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -69,3 +69,11 @@ repos: # black-compatible flake-8 config args: ['--max-line-length=100', # black default '--extend-ignore=E203'] # E203 is not PEP8 compliant + +- repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.19.1 + hooks: + - id: mypy + args: [--config-file=mypy.ini] + files: ^src/finn/util/.*\.py$ # Only check util. directory for now + exclude: qnn-data # python doesnt allow hyphens in package names diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000000..9e3d745617 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,12 @@ +[mypy] +python_version = 3.12 +strict = True +warn_return_any = True +warn_unused_configs = True +disallow_untyped_defs = False +ignore_missing_imports = True + +[mypy-finn.util.*] +disallow_untyped_defs = True +disallow_incomplete_defs = True +check_untyped_defs = True diff --git a/requirements.txt b/requirements.txt index fc398c346b..a92c468a5f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ dataclasses-json==0.5.7 gspread==3.6.0 importlib-resources==6.1.0 ipython==8.12.2 +mypy>=1.19.1 numpy==1.24.1 onnx==1.17.0 onnxoptimizer diff --git a/src/finn/util/basic.py b/src/finn/util/basic.py index 164971f0f8..339f52f5ca 100644 --- a/src/finn/util/basic.py +++ b/src/finn/util/basic.py @@ -31,12 +31,13 @@ import sys import tempfile from qonnx.util.basic import roundup_to_integer_multiple +from typing import Dict, List, Optional, Tuple # test boards used for bnn pynq tests -test_board_map = ["Pynq-Z1", "KV260_SOM", "ZCU104", "U250"] +test_board_map: List[str] = ["Pynq-Z1", "KV260_SOM", "ZCU104", "U250"] # mapping from PYNQ board names to FPGA part names -pynq_part_map = dict() +pynq_part_map: Dict[str, str] = dict() pynq_part_map["Ultra96"] = "xczu3eg-sbva484-1-e" pynq_part_map["Ultra96-V2"] = "xczu3eg-sbva484-1-i" pynq_part_map["Pynq-Z1"] = "xc7z020clg400-1" @@ -51,7 +52,7 @@ # native AXI HP port width (in bits) for PYNQ boards -pynq_native_port_width = dict() +pynq_native_port_width: Dict[str, int] = dict() pynq_native_port_width["Pynq-Z1"] = 64 pynq_native_port_width["Pynq-Z2"] = 64 pynq_native_port_width["Ultra96"] = 128 @@ -65,14 +66,14 @@ pynq_native_port_width["AUP-ZU3_8GB"] = 128 # Alveo device and platform mappings -alveo_part_map = dict() +alveo_part_map: Dict[str, str] = dict() alveo_part_map["U50"] = "xcu50-fsvh2104-2L-e" alveo_part_map["U200"] = "xcu200-fsgd2104-2-e" alveo_part_map["U250"] = "xcu250-figd2104-2L-e" alveo_part_map["U280"] = "xcu280-fsvh2892-2L-e" alveo_part_map["U55C"] = "xcu55c-fsvh2892-2L-e" -alveo_default_platform = dict() +alveo_default_platform: Dict[str, str] = dict() alveo_default_platform["U50"] = "xilinx_u50_gen3x16_xdma_5_202210_1" alveo_default_platform["U200"] = "xilinx_u200_gen3x16_xdma_2_202110_1" alveo_default_platform["U250"] = "xilinx_u250_gen3x16_xdma_2_1_202010_1" @@ -80,13 +81,13 @@ alveo_default_platform["U55C"] = "xilinx_u55c_gen3x16_xdma_3_202210_1" # Create a joint part map, encompassing other boards too -part_map = {**pynq_part_map, **alveo_part_map} +part_map: Dict[str, str] = {**pynq_part_map, **alveo_part_map} part_map["VEK280"] = "xcve2802-vsvh1760-2MP-e-S" part_map["VCK190"] = "xcvc1902-vsva2197-2MP-e-S" part_map["V80"] = "xcv80-lsva4737-2MHP-e-s" -def get_rtlsim_trace_depth(): +def get_rtlsim_trace_depth() -> int: """Return the trace depth for rtlsim. Controllable via the RTLSIM_TRACE_DEPTH environment variable. If the env.var. is undefined, the default value of 1 is returned. A trace depth of 1 @@ -105,7 +106,7 @@ def get_rtlsim_trace_depth(): return 1 -def get_finn_root(): +def get_finn_root() -> str: "Return the root directory that FINN is cloned into." try: @@ -118,7 +119,7 @@ def get_finn_root(): ) -def get_vivado_root(): +def get_vivado_root() -> str: "Return the root directory that Vivado is installed into." try: @@ -131,14 +132,14 @@ def get_vivado_root(): ) -def get_liveness_threshold_cycles(): +def get_liveness_threshold_cycles() -> int: """Return the number of no-output cycles rtlsim will wait before assuming the simulation is not finishing and throwing an exception.""" return int(os.getenv("LIVENESS_THRESHOLD", 1000000)) -def make_build_dir(prefix=""): +def make_build_dir(prefix: str = "") -> str: """Creates a folder with given prefix to be used as a build dir. Use this function instead of tempfile.mkdtemp to ensure any generated files will survive on the host after the FINN Docker container exits.""" @@ -159,27 +160,27 @@ class CppBuilder: """Builds the g++ compiler command to produces the executable of the c++ code in code_gen_dir which is passed to the function build() of this class.""" - def __init__(self): - self.include_paths = [] - self.cpp_files = [] - self.executable_path = "" - self.code_gen_dir = "" - self.compile_components = [] - self.compile_script = "" + def __init__(self) -> None: + self.include_paths: List[str] = [] + self.cpp_files: List[str] = [] + self.executable_path: str = "" + self.code_gen_dir: str = "" + self.compile_components: List[str] = [] + self.compile_script: str = "" - def append_includes(self, library_path): + def append_includes(self, library_path: str) -> None: """Adds given library path to include_paths list.""" self.include_paths.append(library_path) - def append_sources(self, cpp_file): + def append_sources(self, cpp_file: str) -> None: """Adds given c++ file to cpp_files list.""" self.cpp_files.append(cpp_file) - def set_executable_path(self, path): + def set_executable_path(self, path: str) -> None: """Sets member variable "executable_path" to given path.""" self.executable_path = path - def build(self, code_gen_dir): + def build(self, code_gen_dir: str) -> None: """Builds the g++ compiler command according to entries in include_paths and cpp_files lists. Saves it in bash script in given folder and executes it.""" @@ -202,7 +203,9 @@ def build(self, code_gen_dir): process_compile.communicate() -def launch_process_helper(args, proc_env=None, cwd=None): +def launch_process_helper( + args: List[str], proc_env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None +) -> Tuple[str, str]: """Helper function to launch a process in a way that facilitates logging stdout/stderr with Python loggers. Returns (cmd_out, cmd_err).""" @@ -211,22 +214,26 @@ def launch_process_helper(args, proc_env=None, cwd=None): with subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=proc_env, cwd=cwd ) as proc: - (cmd_out, cmd_err) = proc.communicate() + (cmd_out_bytes, cmd_err_bytes) = proc.communicate() + + cmd_out = "" + cmd_err = "" + if cmd_out is not None: - cmd_out = cmd_out.decode("utf-8") + cmd_out = cmd_out_bytes.decode("utf-8") sys.stdout.write(cmd_out) if cmd_err is not None: - cmd_err = cmd_err.decode("utf-8") + cmd_err = cmd_err_bytes.decode("utf-8") sys.stderr.write(cmd_err) return (cmd_out, cmd_err) -def which(program): +def which(program: str) -> Optional[str]: "Python equivalent of the shell cmd 'which'." # source: # https://stackoverflow.com/questions/377017/test-if-executable-exists-in-python - def is_exe(fpath): + def is_exe(fpath: str) -> bool: return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) @@ -255,8 +262,10 @@ def is_exe(fpath): def get_memutil_alternatives( - req_mem_spec, mem_primitives=mem_primitives_versal, sort_min_waste=True -): + req_mem_spec: Tuple[int, int], + mem_primitives: Dict[str, Tuple[int, int]] = mem_primitives_versal, + sort_min_waste: bool = True, +) -> List[Tuple[str, Tuple[int, float, int]]]: """Computes how many instances of a memory primitive are necessary to implement a desired memory size, where req_mem_spec is the desired size and the primitive_spec is the primitve size. The sizes are expressed @@ -275,7 +284,9 @@ def get_memutil_alternatives( return ret -def memutil(req_mem_spec, primitive_spec): +def memutil( + req_mem_spec: Tuple[int, int], primitive_spec: Tuple[int, int] +) -> Tuple[int, float, int]: """Computes how many instances of a memory primitive are necessary to implemented a desired memory size, where req_mem_spec is the desired size and the primitive_spec is the primitve size. The sizes are expressed @@ -296,7 +307,7 @@ def memutil(req_mem_spec, primitive_spec): return (count, eff, waste) -def is_versal(fpgapart): +def is_versal(fpgapart: str) -> bool: """Returns whether board is part of the Versal family""" return fpgapart[0:4] in ["xcvc", "xcve", "xcvp", "xcvm", "xqvc", "xqvm"] or fpgapart[0:5] in [ "xqrvc", @@ -304,7 +315,7 @@ def is_versal(fpgapart): ] -def get_dsp_block(fpgapart): +def get_dsp_block(fpgapart: str) -> str: if is_versal(fpgapart): return "DSP58" elif fpgapart[2] == "7": diff --git a/src/finn/util/config.py b/src/finn/util/config.py index 7c4f2013bd..fb45666189 100644 --- a/src/finn/util/config.py +++ b/src/finn/util/config.py @@ -16,11 +16,14 @@ import json import onnx from qonnx.custom_op.registry import getCustomOp, is_custom_op +from typing import Any, Dict, List, Optional # update this code to handle export configs from subgraphs # where the subgraph is found in a node's attribute as a graph type -def extract_model_config(model, subgraph_hier, attr_names_to_extract): +def extract_model_config( + model: Any, subgraph_hier: Optional[str], attr_names_to_extract: List[str] +) -> Dict[str, Dict[str, Any]]: """Create a dictionary with layer name -> attribute mappings extracted from the model. The created dictionary can be later applied on a model with qonnx.transform.general.ApplyConfig. @@ -29,7 +32,7 @@ def extract_model_config(model, subgraph_hier, attr_names_to_extract): For example, a node 'Conv_0' inside a subgraph of node 'IfNode_0' will be exported as 'IfNode_0_Conv_0' in the config.""" - cfg = dict() + cfg: Dict[str, Dict[str, Any]] = dict() cfg["Defaults"] = dict() for n in model.graph.node: new_hier = n.name if subgraph_hier is None else str(subgraph_hier) + "_" + n.name @@ -38,7 +41,7 @@ def extract_model_config(model, subgraph_hier, attr_names_to_extract): is_custom = is_custom_op(n.domain, n.op_type) if is_custom: oi = getCustomOp(n) - layer_dict = dict() + layer_dict: Dict[str, Any] = dict() for attr in attr_names_to_extract: try: layer_dict[attr] = oi.get_nodeattr(attr) @@ -66,7 +69,9 @@ def extract_model_config(model, subgraph_hier, attr_names_to_extract): return cfg -def extract_model_config_to_json(model, json_filename, attr_names_to_extract): +def extract_model_config_to_json( + model: Any, json_filename: str, attr_names_to_extract: List[str] +) -> None: """Create a json file with layer name -> attribute mappings extracted from the model. The created json file can be later applied on a model with qonnx.transform.general.ApplyConfig.""" @@ -81,15 +86,17 @@ def extract_model_config_to_json(model, json_filename, attr_names_to_extract): ) -def extract_model_config_consolidate_shuffles(model, output_file, hw_attrs): +def extract_model_config_consolidate_shuffles( + model: Any, output_file: str, hw_attrs: List[str] +) -> None: """Export flow that takes into consideration how Shuffle operations have been decomposed""" extract_model_config_to_json(model, output_file, hw_attrs) with open(output_file, "r") as f: - config = json.load(f) + config: Dict[str, Any] = json.load(f) - shuffle_configs = {} - nodes_to_remove = [] + shuffle_configs: Dict[str, Dict[str, Any]] = {} + nodes_to_remove: List[str] = [] for node in model.graph.node: if node.op_type in ["InnerShuffle_rtl", "OuterShuffle_hls"]: diff --git a/src/finn/util/create.py b/src/finn/util/create.py index 09ec4f334c..10a44bb9a7 100644 --- a/src/finn/util/create.py +++ b/src/finn/util/create.py @@ -35,12 +35,13 @@ gen_finn_dt_tensor, qonnx_make_model, ) +from typing import Any, Dict, List, Optional -def hls_random_mlp_maker(layer_spec): +def hls_random_mlp_maker(layer_spec: List[Dict[str, Any]]) -> ModelWrapper: """Create an MLP of given specification using HLSCustomOp instances. Generate random weights/thresholds of appropriate size.""" - ret = [] + ret: List[Dict[str, Any]] = [] for lyr in layer_spec: idt = lyr["idt"] wdt = lyr["wdt"] @@ -50,7 +51,7 @@ def hls_random_mlp_maker(layer_spec): lyr["W"] = gen_finn_dt_tensor(wdt, (mw, mh)) if act is None: # no activation, produce accumulators - T = None + T: Optional[np.ndarray[Any, Any]] = None tdt = None if wdt == DataType["BIPOLAR"] and idt == DataType["BIPOLAR"]: odt = DataType["UINT32"] @@ -79,12 +80,12 @@ def hls_random_mlp_maker(layer_spec): return hls_mlp_maker(ret) -def hls_mlp_maker(layer_spec): +def hls_mlp_maker(layer_spec: List[Dict[str, Any]]) -> ModelWrapper: """Create an MLP of given specification using HLSCustomOp instances.""" - current_in_name = "" - current_out_name = "" - i = 0 + current_in_name: str = "" + current_out_name: str = "" + i: int = 0 graph = helper.make_graph(nodes=[], name="mlp", inputs=[], outputs=[]) diff --git a/src/finn/util/data_packing.py b/src/finn/util/data_packing.py index 274e1b9a0a..7b56c96c29 100644 --- a/src/finn/util/data_packing.py +++ b/src/finn/util/data_packing.py @@ -36,10 +36,16 @@ from qonnx.core.modelwrapper import ModelWrapper from qonnx.custom_op.registry import getCustomOp from qonnx.util.basic import gen_finn_dt_tensor, roundup_to_integer_multiple -from typing import Dict +from typing import Any, Dict, List, Optional, Union, cast -def array2hexstring(array, dtype, pad_to_nbits, prefix="0x", reverse=False): +def array2hexstring( + array: Union[np.ndarray[Any, Any], List[Any]], + dtype: DataType, + pad_to_nbits: int, + prefix: str = "0x", + reverse: bool = False, +) -> str: """ Pack given one-dimensional NumPy array with FINN DataType dtype into a hex string. @@ -99,10 +105,12 @@ def array2hexstring(array, dtype, pad_to_nbits, prefix="0x", reverse=False): else: raise Exception("Number of bits is greater than pad_to_nbits") # represent as hex - return prefix + lineval.hex + return str(prefix + lineval.hex) -def hexstring2npbytearray(hexstring, remove_prefix="0x"): +def hexstring2npbytearray( + hexstring: str, remove_prefix: str = "0x" +) -> np.ndarray[Any, np.dtype[np.uint8]]: """Convert a hex string into a NumPy array of dtype uint8. Example: @@ -117,7 +125,9 @@ def hexstring2npbytearray(hexstring, remove_prefix="0x"): return np.asarray(bytearray.fromhex(hexstring), dtype=np.uint8) -def npbytearray2hexstring(npbytearray, prefix="0x"): +def npbytearray2hexstring( + npbytearray: np.ndarray[Any, np.dtype[np.uint8]], prefix: str = "0x" +) -> str: """Convert a NumPy array of uint8 dtype into a hex string. Example: @@ -128,8 +138,12 @@ def npbytearray2hexstring(npbytearray, prefix="0x"): def pack_innermost_dim_as_hex_string( - ndarray, dtype, pad_to_nbits, reverse_inner=False, prefix="0x" -): + ndarray: Union[np.ndarray[Any, Any], List[Any]], + dtype: DataType, + pad_to_nbits: int, + reverse_inner: bool = False, + prefix: str = "0x", +) -> np.ndarray[Any, Any]: """Pack the innermost dimension of the given numpy ndarray into hex strings using array2hexstring. @@ -152,15 +166,19 @@ def pack_innermost_dim_as_hex_string( # try to convert to a float numpy array (container dtype is float) ndarray = np.asarray(ndarray, dtype=np.float32) - def fun(x): + def fun(x: np.ndarray[Any, Any]) -> str: return array2hexstring(x, dtype, pad_to_nbits, reverse=reverse_inner, prefix=prefix) return np.apply_along_axis(fun, ndarray.ndim - 1, ndarray) def unpack_innermost_dim_from_hex_string( - ndarray, dtype, out_shape, packedBits, reverse_inner=False -): + ndarray: np.ndarray[Any, Any], + dtype: DataType, + out_shape: tuple[int, ...], + packedBits: int, + reverse_inner: bool = False, +) -> np.ndarray[Any, Any]: """Convert a NumPy array of hex strings into a FINN NumPy array by unpacking the hex strings into the specified data type. out_shape can be specified such that any padding in the packing dimension is removed. If reverse_inner @@ -185,25 +203,25 @@ def unpack_innermost_dim_from_hex_string( outer_dim_elems = outer_dim_elems * out_shape[dim] inner_dim_elems = out_shape[-1] - array = [] + array: List[List[Union[float, int, np.floating[Any]]]] = [] if dtype.is_fixed_point(): # convert fixed point as signed integer conv_dtype = DataType["INT" + str(targetBits)] else: conv_dtype = dtype for outer_elem in range(outer_dim_elems): - ar_list = [] + ar_list: List[Union[float, int, np.floating[Any]]] = [] ar_elem = data[0] data.pop(0) ar_elem = ar_elem.split("x") ar_elem_bin = bin(int(ar_elem[1], 16))[2:].zfill(packedBits) - ar_elem_bin = [int(x) for x in ar_elem_bin] + ar_elem_bin_list: List[int] = [int(x) for x in ar_elem_bin] - ar_elem_bin.reverse() + ar_elem_bin_list.reverse() for i in range(inner_dim_elems): upper_limit = (i + 1) * targetBits lower_limit = i * targetBits - elem = ar_elem_bin[lower_limit:upper_limit] + elem = ar_elem_bin_list[lower_limit:upper_limit] elem.reverse() elem_str = "".join(map(str, elem)) if conv_dtype == DataType["FLOAT16"]: @@ -230,14 +248,20 @@ def unpack_innermost_dim_from_hex_string( array.append(ar_list) npy_dtype = np.float16 if conv_dtype == DataType["FLOAT16"] else np.float32 - array = np.asarray(array, dtype=npy_dtype).reshape(out_shape) + result: np.ndarray[Any, Any] = np.asarray(array, dtype=npy_dtype).reshape(out_shape) if dtype.is_fixed_point(): # convert signed integer to fixed point by applying scale - array = array * dtype.scale_factor() - return array + result = result * dtype.scale_factor() + return result -def numpy_to_hls_code(ndarray, dtype, hls_var_name, pack_innermost_dim=True, no_decl=False): +def numpy_to_hls_code( + ndarray: Union[np.ndarray[Any, Any], List[Any]], + dtype: DataType, + hls_var_name: str, + pack_innermost_dim: bool = True, + no_decl: bool = False, +) -> str: """Return C++ code representation of a numpy ndarray with FINN DataType dtype, using hls_var_name as the resulting C++ variable name. If pack_innermost_dim is specified, the innermost dimension of the ndarray @@ -258,7 +282,7 @@ def numpy_to_hls_code(ndarray, dtype, hls_var_name, pack_innermost_dim=True, no_ ndims = ndarray.ndim # add type string and variable name # e.g. "const ap_uint<64>" "weightMem0" - ret = "%s %s" % (hls_dtype, hls_var_name) + ret: str = "%s %s" % (hls_dtype, hls_var_name) # add dimensions for d in range(ndims): ret += "[%d]" % ndarray.shape[d] @@ -267,7 +291,7 @@ def numpy_to_hls_code(ndarray, dtype, hls_var_name, pack_innermost_dim=True, no_ # define a function to convert a single element into a C++ init string # a single element can be a hex string if we are using packing - def elem2str(x): + def elem2str(x: Any) -> str: if type(x) == str or type(x) == np.str_: return '%s("%s", 16)' % (hls_dtype, x) elif type(x) == np.float32: @@ -288,7 +312,12 @@ def elem2str(x): return ret -def npy_to_rtlsim_input(input_file, input_dtype, pad_to_nbits, reverse_inner=True): +def npy_to_rtlsim_input( + input_file: Union[str, np.ndarray[Any, Any]], + input_dtype: DataType, + pad_to_nbits: int, + reverse_inner: bool = True, +) -> List[int]: """Convert the multidimensional NumPy array of integers (stored as floats) from input_file into a flattened sequence of Python arbitrary-precision integers, packing the innermost dimension. See @@ -296,30 +325,40 @@ def npy_to_rtlsim_input(input_file, input_dtype, pad_to_nbits, reverse_inner=Tru packing works. If reverse_inner is set, the innermost dimension will be reversed prior to packing.""" pad_to_nbits = roundup_to_integer_multiple(pad_to_nbits, 4) - if issubclass(type(input_file), np.ndarray): + if isinstance(input_file, np.ndarray): inp = input_file - elif os.path.isfile(input_file): + elif isinstance(input_file, str) and os.path.isfile(input_file): inp = np.load(input_file) else: raise Exception("input_file must be ndarray or filename for .npy") + + packed_data: List[int] if ( inp.shape[-1] == 1 and input_dtype.is_integer() and input_dtype.get_canonical_name() != "BIPOLAR" ): mask = (1 << input_dtype.bitwidth()) - 1 - packed_data = inp.flatten().astype(input_dtype.to_numpy_dt()) - packed_data = [int(x) & mask for x in packed_data] + packed_array = inp.flatten().astype(input_dtype.to_numpy_dt()) + packed_data = [int(x) & mask for x in packed_array] else: - packed_data = pack_innermost_dim_as_hex_string( + packed_hexstring = pack_innermost_dim_as_hex_string( inp, input_dtype, pad_to_nbits, reverse_inner=reverse_inner ) - packed_data = packed_data.flatten() - packed_data = [int(x[2:], 16) for x in packed_data] + packed_flat = packed_hexstring.flatten() + packed_data = [int(x[2:], 16) for x in packed_flat] return packed_data -def rtlsim_output_to_npy(output, path, dtype, shape, packedBits, targetBits, reverse_inner=True): +def rtlsim_output_to_npy( + output: List[int], + path: Optional[str], + dtype: DataType, + shape: tuple[int, ...], + packedBits: int, + targetBits: int, + reverse_inner: bool = True, +) -> np.ndarray[Any, Any]: """Convert a flattened sequence of Python arbitrary-precision integers output into a NumPy array, saved as npy file at path. Each arbitrary-precision integer is assumed to be a packed array of targetBits-bit elements, which @@ -327,9 +366,9 @@ def rtlsim_output_to_npy(output, path, dtype, shape, packedBits, targetBits, rev not None it will also be saved as a npy file.""" # TODO should have its own testbench? - output = np.asarray([hex(int(x)) for x in output]) - out_array = unpack_innermost_dim_from_hex_string( - output, dtype, shape, packedBits=packedBits, reverse_inner=reverse_inner + output_hex: np.ndarray[Any, Any] = np.asarray([hex(int(x)) for x in output]) + out_array: np.ndarray[Any, Any] = unpack_innermost_dim_from_hex_string( + output_hex, dtype, shape, packedBits=packedBits, reverse_inner=reverse_inner ) # make copy before saving the array out_array = out_array.copy() @@ -339,8 +378,12 @@ def rtlsim_output_to_npy(output, path, dtype, shape, packedBits, targetBits, rev def finnpy_to_packed_bytearray( - ndarray, dtype, reverse_inner=False, reverse_endian=False, fast_mode=False -): + ndarray: Union[np.ndarray[Any, Any], List[Any]], + dtype: DataType, + reverse_inner: bool = False, + reverse_endian: bool = False, + fast_mode: bool = False, +) -> np.ndarray[Any, np.dtype[np.uint8]]: """Given a numpy ndarray with FINN DataType dtype, pack the innermost dimension and return the packed representation as an ndarray of uint8. The packed innermost dimension will be padded to the nearest multiple @@ -354,8 +397,13 @@ def finnpy_to_packed_bytearray( This mode is currently not well-tested, use at your own risk! """ + # convert to ndarray early to avoid union type issues + if not isinstance(ndarray, np.ndarray) or ndarray.dtype not in [np.float32, np.float16]: + # try to convert to a float numpy array (container dtype is float) + ndarray = np.asarray(ndarray, dtype=np.float32) + # handle fast_mode cases (currently only called from driver): - if issubclass(type(ndarray), np.ndarray) and fast_mode: + if fast_mode: inp_is_byte = ndarray.dtype in [np.uint8, np.int8] out_is_byte = dtype.bitwidth() == 8 double_reverse = reverse_inner and reverse_endian @@ -379,9 +427,6 @@ def finnpy_to_packed_bytearray( # reverse endianness and return return np.flip(packed_data, axis=-1) - if (not issubclass(type(ndarray), np.ndarray)) or ndarray.dtype != np.float32: - # try to convert to a float numpy array (container dtype is float) - ndarray = np.asarray(ndarray, dtype=np.float32) # pack innermost dim to hex strings padded to 8 bits bits = dtype.bitwidth() * ndarray.shape[-1] bits_padded = roundup_to_integer_multiple(bits, 8) @@ -389,12 +434,12 @@ def finnpy_to_packed_bytearray( ndarray, dtype, bits_padded, reverse_inner=reverse_inner ) - def fn(x): + def fn(x: np.ndarray[Any, Any]) -> np.ndarray[Any, np.dtype[np.uint8]]: return np.asarray(list(map(hexstring2npbytearray, x))) if packed_hexstring.ndim == 0: # scalar, call hexstring2npbytearray directly - ret = hexstring2npbytearray(np.asscalar(packed_hexstring)) + ret = hexstring2npbytearray(str(packed_hexstring.item())) else: # convert ndarray of hex strings to byte array ret = np.apply_along_axis(fn, packed_hexstring.ndim - 1, packed_hexstring) @@ -405,13 +450,13 @@ def fn(x): def packed_bytearray_to_finnpy( - packed_bytearray, - dtype, - output_shape=None, - reverse_inner=False, - reverse_endian=False, - fast_mode=False, -): + packed_bytearray: np.ndarray[Any, np.dtype[np.uint8]], + dtype: DataType, + output_shape: Optional[tuple[int, ...]] = None, + reverse_inner: bool = False, + reverse_endian: bool = False, + fast_mode: bool = False, +) -> np.ndarray[Any, Any]: """Given a packed numpy uint8 ndarray, unpack it into a FINN array of given DataType. @@ -424,7 +469,7 @@ def packed_bytearray_to_finnpy( """ - if (not issubclass(type(packed_bytearray), np.ndarray)) or packed_bytearray.dtype != np.uint8: + if (not isinstance(packed_bytearray, np.ndarray)) or packed_bytearray.dtype != np.uint8: raise Exception("packed_bytearray_to_finnpy needs NumPy uint8 arrays") if packed_bytearray.ndim == 0: raise Exception("packed_bytearray_to_finnpy expects at least 1D ndarray") @@ -446,11 +491,14 @@ def packed_bytearray_to_finnpy( no_unpad = np.prod(packed_bytearray.shape) == np.prod(output_shape) if no_unpad: as_np_type = packed_bytearray.view(dtype.to_numpy_dt()) - return as_np_type.reshape(output_shape).astype(np.float32) + return cast(np.ndarray[Any, Any], as_np_type.reshape(output_shape).astype(np.float32)) if reverse_endian: packed_bytearray = np.flip(packed_bytearray, axis=-1) # convert innermost dim of byte array to hex strings - packed_hexstring = np.apply_along_axis(npbytearray2hexstring, packed_dim, packed_bytearray) + packed_hexstring = cast( + np.ndarray[Any, Any], + np.apply_along_axis(npbytearray2hexstring, packed_dim, packed_bytearray), + ) ret = unpack_innermost_dim_from_hex_string( packed_hexstring, dtype, output_shape, packed_bits, reverse_inner ) @@ -458,7 +506,9 @@ def packed_bytearray_to_finnpy( return ret -def to_external_tensor(init, w_dtype): +def to_external_tensor( + init: np.ndarray[Any, Any], w_dtype: DataType +) -> np.ndarray[Any, np.dtype[np.uint8]]: """Return an appropriately formatted and packed numpy byte array for given external parameter tensor.""" @@ -473,7 +523,7 @@ def to_external_tensor(init, w_dtype): return ext_weight -def get_driver_shapes(model: ModelWrapper) -> Dict: +def get_driver_shapes(model: ModelWrapper) -> Dict[str, List[Any]]: idt = [] idma_names = [] ishape_normal = [] diff --git a/src/finn/util/fpgadataflow.py b/src/finn/util/fpgadataflow.py index aae438fac2..0b4068add5 100644 --- a/src/finn/util/fpgadataflow.py +++ b/src/finn/util/fpgadataflow.py @@ -26,10 +26,12 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from onnx import NodeProto from qonnx.util.basic import get_by_name, is_finn_op +from typing import Optional -def is_fpgadataflow_node(node): +def is_fpgadataflow_node(node: Optional[NodeProto]) -> bool: """Returns True if given node is fpgadataflow node. Otherwise False.""" is_node = False if node is not None: @@ -43,7 +45,7 @@ def is_fpgadataflow_node(node): return is_node -def is_hls_node(node): +def is_hls_node(node: Optional[NodeProto]) -> bool: """Returns True if given node is hls node. Otherwise False.""" is_node = False if node is not None: @@ -57,7 +59,7 @@ def is_hls_node(node): return is_node -def is_rtl_node(node): +def is_rtl_node(node: Optional[NodeProto]) -> bool: """Returns True if given node is rtl node. Otherwise False.""" is_node = False if node is not None: diff --git a/src/finn/util/hls.py b/src/finn/util/hls.py index 1ae7853ce3..230e05b607 100644 --- a/src/finn/util/hls.py +++ b/src/finn/util/hls.py @@ -37,27 +37,29 @@ class CallHLS: """Call vitis_hls to run HLS build tcl scripts.""" - def __init__(self): + def __init__(self) -> None: self.tcl_script = "" self.ipgen_path = "" self.code_gen_dir = "" self.ipgen_script = "" - def append_tcl(self, tcl_script): + def append_tcl(self, tcl_script: str) -> None: """Sets the tcl script to be executed.""" self.tcl_script = tcl_script - def set_ipgen_path(self, path): + def set_ipgen_path(self, path: str) -> None: """Sets member variable ipgen_path to given path.""" self.ipgen_path = path - def build(self, code_gen_dir): + def build(self, code_gen_dir: str) -> None: """Builds the bash script with given parameters and saves it in given folder. To guarantee the generation in the correct folder the bash script contains a cd command.""" vivado_path = os.environ.get("XILINX_VIVADO") # xsi kernel lib name depends on Vivado version (renamed in 2024.2) + assert vivado_path is not None, "XILINX_VIVADO environment variable not set" match = re.search(r"\b(20\d{2})\.(1|2)\b", vivado_path) + assert match is not None, "Could not parse Vivado version from XILINX_VIVADO" year, minor = int(match.group(1)), int(match.group(2)) if (year, minor) > (2024, 2): assert which("vitis-run") is not None, "vitis-run not found in PATH" diff --git a/src/finn/util/imagenet.py b/src/finn/util/imagenet.py index 1d63adf58b..e2a5801e8e 100644 --- a/src/finn/util/imagenet.py +++ b/src/finn/util/imagenet.py @@ -28,13 +28,17 @@ import numpy as np import os +from numpy.typing import NDArray from PIL import Image from qonnx.core.data_layout import NCHW, NHWC +from typing import Any, Callable, Generator from finn.util.test import crop_center, resize_smaller_side -def get_val_images(n_images=100, interleave_classes=False): +def get_val_images( + n_images: int = 100, interleave_classes: bool = False +) -> Generator[tuple[str, int], None, None]: """Returns generator over (path_to_jpeg, imagenet_class_id) for the first n_images in the ILSVRC2012 validation dataset. The IMAGENET_VAL_PATH environment variable must point to the validation dataset folder, containing 1000 folders (one for @@ -49,35 +53,32 @@ def get_val_images(n_images=100, interleave_classes=False): please see: https://github.com/Xilinx/brevitas/blob/dev/brevitas_examples/imagenet_classification/README.md """ - try: - val_path = os.environ["IMAGENET_VAL_PATH"] - val_folders = sorted(os.listdir(val_path)) - assert len(val_folders) == 1000, "Expected 1000 subfolders in ILSVRC2012 val" - assert n_images <= 50000, "ILSVRC2012 validation dataset has 50k images" - n_current_folder = 0 - n_current_file = 0 - total = 0 - while total != n_images: - current_folder = os.path.join(val_path, val_folders[n_current_folder]) - current_files = sorted(os.listdir(current_folder)) - current_file = os.path.join(current_folder, current_files[n_current_file]) - yield (current_file, n_current_folder) - total += 1 - if interleave_classes: - n_current_folder += 1 - if n_current_folder == 1000: - n_current_file += 1 - n_current_folder = 0 - else: + val_path = os.environ["IMAGENET_VAL_PATH"] + val_folders = sorted(os.listdir(val_path)) + assert len(val_folders) == 1000, "Expected 1000 subfolders in ILSVRC2012 val" + assert n_images <= 50000, "ILSVRC2012 validation dataset has 50k images" + n_current_folder = 0 + n_current_file = 0 + total = 0 + while total != n_images: + current_folder = os.path.join(val_path, val_folders[n_current_folder]) + current_files = sorted(os.listdir(current_folder)) + current_file = os.path.join(current_folder, current_files[n_current_file]) + yield (current_file, n_current_folder) + total += 1 + if interleave_classes: + n_current_folder += 1 + if n_current_folder == 1000: n_current_file += 1 - if n_current_file == 50: - n_current_folder += 1 - n_current_file = 0 - except KeyError: - return None + n_current_folder = 0 + else: + n_current_file += 1 + if n_current_file == 50: + n_current_folder += 1 + n_current_file = 0 -def load_resize_crop(img_path, layout=NCHW, dtype=np.float32): +def load_resize_crop(img_path: str, layout: Any = NCHW, dtype: Any = np.float32) -> NDArray[Any]: """Load, resize and center crop given image for standard ImageNet preprocessing, return a numpy array.""" # get single image as input and prepare image @@ -90,7 +91,7 @@ def load_resize_crop(img_path, layout=NCHW, dtype=np.float32): if layout == NCHW: # save image as numpy array and as torch tensor to enable testing in # brevitas/pytorch and finn and transpose from (H, W, C) to (C, H, W) - img_np = np.asarray(img).copy().astype(dtype).transpose(2, 0, 1) + img_np: NDArray[Any] = np.asarray(img).copy().astype(dtype).transpose(2, 0, 1) img_np = img_np.reshape(1, 3, 224, 224) return img_np elif layout == NHWC: @@ -101,7 +102,14 @@ def load_resize_crop(img_path, layout=NCHW, dtype=np.float32): raise Exception("Unknown data layout for load_resize_crop") -def measure_topk(n_images, fxn_pre, fxn_exec, fxn_post, verbose=True, k=5): +def measure_topk( + n_images: int, + fxn_pre: Callable[[NDArray[Any]], Any], + fxn_exec: Callable[[Any], Any], + fxn_post: Callable[[Any], NDArray[Any]], + verbose: bool = True, + k: int = 5, +) -> tuple[tuple[float, float], tuple[float, float]]: "Do top-k accuracy measurement on ILSVRC2012 with given functions." workload = get_val_images(n_images) @@ -110,10 +118,10 @@ def measure_topk(n_images, fxn_pre, fxn_exec, fxn_post, verbose=True, k=5): topk_ok = 0.0 topk_nok = 0.0 for i, (img_path, target_id) in enumerate(workload): - img_np = load_resize_crop(img_path) + img_np: NDArray[Any] = load_resize_crop(img_path) inp = fxn_pre(img_np) ret = fxn_exec(inp) - res = fxn_post(ret) + res: NDArray[Any] = fxn_post(ret) res = res.flatten() res = np.argsort(res)[-k:] res = np.flip(res) diff --git a/src/finn/util/platforms.py b/src/finn/util/platforms.py index 8856ce0ab8..3e5efb0a32 100644 --- a/src/finn/util/platforms.py +++ b/src/finn/util/platforms.py @@ -28,6 +28,8 @@ import numpy as np from abc import abstractmethod +from numpy.typing import NDArray +from typing import Any, Sequence # contains the amount of available FPGA resources for several # Xilinx platforms, as well as certain resource limit guidelines @@ -71,16 +73,16 @@ class Platform: def __init__( self, - nslr=1, - ndevices=1, - sll_count=[], - hbm_slr=-1, - ddr_slr=[0], - eth_slr=0, - eth_gbps=0, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + nslr: int = 1, + ndevices: int = 1, + sll_count: list[list[int]] = [], + hbm_slr: int = -1, + ddr_slr: list[int] = [0], + eth_slr: int = 0, + eth_gbps: int = 0, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: self.nslr = nslr self.sll_count = sll_count self.eth_slr = eth_slr @@ -97,11 +99,11 @@ def __init__( @property @abstractmethod - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: pass @property - def guide_resources(self): + def guide_resources(self) -> list[list[int]]: guide = [] # TODO: assert limits is of correct size guide_res = (np.tile(np.array(self.compute_resources), (self.ndevices, 1))).astype(int) @@ -132,7 +134,7 @@ def guide_resources(self): return guide @property - def resource_count_dict(self): + def resource_count_dict(self) -> dict[str, dict[str, int]]: res = dict() for i in range(self.nslr * self.ndevices): slr_res = dict() @@ -145,7 +147,7 @@ def resource_count_dict(self): return res @property - def compute_connection_cost(self): + def compute_connection_cost(self) -> NDArray[Any]: x = np.full((self.nslr * self.ndevices, self.nslr * self.ndevices), DONT_CARE) # build connection cost matrix for one device's SLRs xlocal = np.full((self.nslr, self.nslr), DONT_CARE) @@ -165,7 +167,7 @@ def compute_connection_cost(self): return x @property - def compute_connection_resource(self): + def compute_connection_resource(self) -> list[list[tuple[int, int]]]: sll = np.full((self.nslr * self.ndevices, self.nslr * self.ndevices), 0) # build connection resource matrix for one device's SLRs slllocal = np.full((self.nslr, self.nslr), -1) @@ -207,7 +209,7 @@ def compute_connection_resource(self): constraints.append(constraints_line) return constraints - def map_device_to_slr(self, idx): + def map_device_to_slr(self, idx: int) -> tuple[int, int]: """Given a global SLR index, return device id and local slr index""" assert idx <= self.nslr * self.ndevices return (idx % self.nslr, idx // self.nslr) @@ -216,10 +218,10 @@ def map_device_to_slr(self, idx): class Zynq7020_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: super(Zynq7020_Platform, self).__init__( nslr=1, ndevices=ndevices, @@ -232,17 +234,17 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: return [[53200, 2 * 53200, 280, 0, 220] for i in range(1)] class ZU3EG_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: super(ZU3EG_Platform, self).__init__( nslr=1, ndevices=ndevices, @@ -255,17 +257,17 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: return [[71000, 2 * 71000, 412, 0, 360] for i in range(1)] class ZU7EV_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: super(ZU7EV_Platform, self).__init__( nslr=1, ndevices=ndevices, @@ -278,17 +280,17 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: return [[230000, 2 * 230000, 610, 92, 1728] for i in range(1)] class ZU9EG_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: super(ZU9EG_Platform, self).__init__( nslr=1, ndevices=ndevices, @@ -301,17 +303,17 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: return [[274000, 2 * 274000, 1824, 0, 2520] for i in range(1)] class ZU28DR_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: super(ZU28DR_Platform, self).__init__( nslr=1, ndevices=ndevices, @@ -324,17 +326,17 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: return [[425000, 2 * 425000, 2160, 80, 4272] for i in range(1)] class Alveo_NxU50_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: # according to Vivado: 23040 SLR0 <-> SLR1 sll_counts = [[0, 5000], [5000, 0]] super(Alveo_NxU50_Platform, self).__init__( @@ -350,7 +352,7 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: # According to UG1120: # U50 has identical resource counts on both SLRs # return [[365000,2*365000,2*564, 304, 2580] for i in range(2)] @@ -364,10 +366,10 @@ def compute_resources(self): class Alveo_NxU200_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: sll_counts = [[0, 5000, 0], [5000, 0, 5000], [0, 5000, 0]] super(Alveo_NxU200_Platform, self).__init__( nslr=3, @@ -381,7 +383,7 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: # According to UG1120: # return [[355000, 723000, 2*638, 320, 2265], # [160000, 331000, 2*326, 160, 1317], @@ -397,10 +399,10 @@ def compute_resources(self): class Alveo_NxU250_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: sll_counts = [ [0, 5000, 0, 0], [5000, 0, 5000, 0], @@ -419,7 +421,7 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: # According to UG1120: # U250 has identical resource counts on all 4 SLRs: # return [[345000,2*345000,2*500, 320, 2877] for i in range(4)] @@ -430,10 +432,10 @@ def compute_resources(self): class Alveo_NxU280_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: sll_counts = [[0, 5000, 0], [5000, 0, 5000], [0, 5000, 0]] super(Alveo_NxU280_Platform, self).__init__( nslr=3, @@ -448,7 +450,7 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: # according to UG1120 # return [[369000, 746000, 2*507, 320, 2733], # [333000, 675000, 2*468, 320, 2877], @@ -464,10 +466,10 @@ def compute_resources(self): class Alveo_NxU55C_Platform(Platform): def __init__( self, - ndevices=1, - limits=DEFAULT_RES_LIMITS, - avg_constraints=DEFAULT_AVG_CONSTRAINTS, - ): + ndevices: int = 1, + limits: NDArray[Any] = DEFAULT_RES_LIMITS, + avg_constraints: Sequence[tuple[tuple[int, ...], float]] = DEFAULT_AVG_CONSTRAINTS, + ) -> None: sll_counts = [[0, 5000, 0], [5000, 0, 5000], [0, 5000, 0]] super(Alveo_NxU55C_Platform, self).__init__( nslr=3, @@ -482,7 +484,7 @@ def __init__( ) @property - def compute_resources(self): + def compute_resources(self) -> list[list[int]]: # according to UG1120 return [ [386000, 773000, 2 * 600, 320, 2664], @@ -491,7 +493,7 @@ def compute_resources(self): ] -platforms = dict() +platforms: dict[str, type[Platform]] = dict() platforms["U50"] = Alveo_NxU50_Platform platforms["U200"] = Alveo_NxU200_Platform platforms["U250"] = Alveo_NxU250_Platform diff --git a/src/finn/util/pytorch.py b/src/finn/util/pytorch.py index 18010083f7..d7b71f973a 100644 --- a/src/finn/util/pytorch.py +++ b/src/finn/util/pytorch.py @@ -27,33 +27,34 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import torch from torch.nn import Module, Sequential +from typing import Sequence, cast -class Normalize(Module): - def __init__(self, mean, std, channels): +class Normalize(Module): # type: ignore[misc] + def __init__(self, mean: Sequence[float], std: float, channels: int) -> None: super(Normalize, self).__init__() self.mean = mean self.std = std self.channels = channels - def forward(self, x): + def forward(self, x: torch.Tensor) -> torch.Tensor: x = x - torch.tensor(self.mean, device=x.device).reshape(1, self.channels, 1, 1) x = x / self.std return x -class ToTensor(Module): - def __init__(self): +class ToTensor(Module): # type: ignore[misc] + def __init__(self) -> None: super(ToTensor, self).__init__() - def forward(self, x): + def forward(self, x: torch.Tensor) -> torch.Tensor: x = x / 255 return x -class NormalizePreProc(Module): - def __init__(self, mean, std, channels): +class NormalizePreProc(Module): # type: ignore[misc] + def __init__(self, mean: Sequence[float], std: float, channels: int) -> None: super(NormalizePreProc, self).__init__() self.features = Sequential() scaling = ToTensor() @@ -61,5 +62,5 @@ def __init__(self, mean, std, channels): normalize = Normalize(mean, std, channels) self.features.add_module("normalize", normalize) - def forward(self, x): - return self.features(x) + def forward(self, x: torch.Tensor) -> torch.Tensor: + return cast(torch.Tensor, self.features(x)) diff --git a/src/finn/util/test.py b/src/finn/util/test.py index 2115e058a8..a4dbf43bed 100644 --- a/src/finn/util/test.py +++ b/src/finn/util/test.py @@ -36,9 +36,12 @@ import torchvision.transforms.functional as torchvision_util import warnings from brevitas_examples import bnn_pynq, imagenet_classification +from numpy.typing import NDArray +from PIL.Image import Image as PILImage from pkgutil import get_data from qonnx.core.modelwrapper import ModelWrapper from qonnx.custom_op.registry import getCustomOp +from typing import Any, Dict, Tuple, Union, cast from finn.core.onnx_exec import execute_onnx from finn.transformation.fpgadataflow.make_zynq_proj import ZynqBuild @@ -62,7 +65,7 @@ } -def get_test_model(netname, wbits, abits, pretrained): +def get_test_model(netname: str, wbits: int, abits: int, pretrained: bool) -> Any: """Returns the model specified by input arguments from the Brevitas BNN-PYNQ test networks. Pretrained weights loaded if pretrained is True.""" model_cfg = (netname, wbits, abits) @@ -71,31 +74,32 @@ def get_test_model(netname, wbits, abits, pretrained): return fc.eval() -def get_test_model_trained(netname, wbits, abits): +def get_test_model_trained(netname: str, wbits: int, abits: int) -> Any: "get_test_model with pretrained=True" return get_test_model(netname, wbits, abits, pretrained=True) -def get_test_model_untrained(netname, wbits, abits): +def get_test_model_untrained(netname: str, wbits: int, abits: int) -> Any: "get_test_model with pretrained=False" return get_test_model(netname, wbits, abits, pretrained=False) -def get_topk(vec, k): +def get_topk(vec: NDArray[Any], k: int) -> NDArray[Any]: "Return indices of the top-k values in given array vec (treated as 1D)." return np.flip(vec.flatten().argsort())[:k] -def soft_verify_topk(invec, idxvec, k): +def soft_verify_topk(invec: NDArray[Any], idxvec: NDArray[Any], k: int) -> bool: """Check that the topK indices provided actually point to the topK largest values in the input vector""" np_topk = np.flip(invec.flatten().argsort())[:k] soft_expected = invec.flatten()[np_topk.astype(np.int_).flatten()] soft_produced = invec.flatten()[idxvec.astype(np.int_).flatten()] - return (soft_expected == soft_produced).all() + result: bool = bool((soft_expected == soft_produced).all()) + return result -def load_test_checkpoint_or_skip(filename): +def load_test_checkpoint_or_skip(filename: str) -> ModelWrapper: "Try to load given .onnx and return ModelWrapper, else skip current test." if os.path.isfile(filename): model = ModelWrapper(filename) @@ -105,7 +109,7 @@ def load_test_checkpoint_or_skip(filename): pytest.skip(filename + " not found from previous test step, skipping") -def get_build_env(board, target_clk_ns): +def get_build_env(board: str, target_clk_ns: float) -> Dict[str, Any]: """Get board-related build environment for testing. - board = any from pynq_part_map or alveo_part_map """ @@ -128,23 +132,26 @@ def get_build_env(board, target_clk_ns): return ret -def get_example_input(topology): +def get_example_input(topology: str) -> NDArray[Any]: "Get example numpy input tensor for given topology." if "fc" in topology: raw_i = get_data("qonnx.data", "onnx/mnist-conv/test_data_set_0/input_0.pb") + assert raw_i is not None, "Could not load test data" onnx_tensor = onnx.load_tensor_from_string(raw_i) - return nph.to_array(onnx_tensor) + return cast(NDArray[Any], nph.to_array(onnx_tensor)) elif topology == "cnv": ref = importlib.files("finn.qnn-data") / "cifar10/cifar10-test-data-class3.npz" with importlib.as_file(ref) as fn: input_tensor = np.load(fn)["arr_0"].astype(np.float32) - return input_tensor + return cast(NDArray[Any], input_tensor) else: raise Exception("Unknown topology, can't return example input") -def get_trained_network_and_ishape(topology, wbits, abits): +def get_trained_network_and_ishape( + topology: str, wbits: int, abits: int +) -> Tuple[Any, Tuple[int, int, int, int]]: "Return (trained_model, shape) for given BNN-PYNQ test config." topology_to_ishape = { @@ -157,7 +164,12 @@ def get_trained_network_and_ishape(topology, wbits, abits): return (model, ishape) -def execute_parent(parent_path, child_path, input_tensor_npy, return_full_ctx=False): +def execute_parent( + parent_path: str, + child_path: str, + input_tensor_npy: NDArray[Any], + return_full_ctx: bool = False, +) -> Union[NDArray[Any], Dict[str, Any]]: """Execute parent model containing a single StreamingDataflowPartition by replacing it with the model at child_path and return result.""" @@ -170,17 +182,17 @@ def execute_parent(parent_path, child_path, input_tensor_npy, return_full_ctx=Fa sdp_node.set_nodeattr("return_full_exec_context", 1 if return_full_ctx else 0) ret = execute_onnx(parent_model, {iname: input_tensor_npy}, True) if return_full_ctx: - return ret + return cast(Dict[str, Any], ret) else: - return ret[oname] + return cast(NDArray[Any], ret[oname]) -def resize_smaller_side(target_pixels, img): +def resize_smaller_side(target_pixels: int, img: PILImage) -> PILImage: """Resizes smallest side of image to target pixels and resizes larger side with same ratio. Expects a PIL image.""" - return torchvision_util.resize(img, target_pixels) + return cast(PILImage, torchvision_util.resize(img, target_pixels)) -def crop_center(size, img): +def crop_center(size: int, img: PILImage) -> PILImage: """Crop central size*size window out of a PIL image.""" - return torchvision_util.center_crop(img, size) + return cast(PILImage, torchvision_util.center_crop(img, size)) diff --git a/src/finn/util/visualization.py b/src/finn/util/visualization.py index 397bebb64c..160320f9ed 100644 --- a/src/finn/util/visualization.py +++ b/src/finn/util/visualization.py @@ -30,13 +30,16 @@ import netron import os from IPython.display import IFrame +from typing import Any, Optional -def showSrc(what): +def showSrc(what: Any) -> None: print("".join(inspect.getsourcelines(what)[0])) -def showInNetron(model_filename: str, localhost_url: str = None, port: int = None): +def showInNetron( + model_filename: str, localhost_url: Optional[str] = None, port: Optional[int] = None +) -> IFrame: """Shows a ONNX model file in the Jupyter Notebook using Netron. :param model_filename: The path to the ONNX model file. diff --git a/src/finn/util/vivado.py b/src/finn/util/vivado.py index 00d9e98e4a..8bba0b3dcd 100644 --- a/src/finn/util/vivado.py +++ b/src/finn/util/vivado.py @@ -27,18 +27,19 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os +from typing import Any, Dict, List from finn.util.basic import launch_process_helper, which def out_of_context_synth( - verilog_dir, - top_name, - float_ip_tcl, - fpga_part="xczu3eg-sbva484-1-e", - clk_name="ap_clk_0", - clk_period_ns=5.0, -): + verilog_dir: str, + top_name: str, + float_ip_tcl: List[str], + fpga_part: str = "xczu3eg-sbva484-1-e", + clk_name: str = "ap_clk_0", + clk_period_ns: float = 5.0, +) -> Dict[str, Any]: "Run out-of-context Vivado synthesis, return resources and slack." # ensure that the OH_MY_XILINX envvar is set @@ -50,7 +51,7 @@ def out_of_context_synth( omx_path = os.environ["OHMYXILINX"] script = "vivadocompile.sh" # vivadocompile.sh - call_omx = "zsh %s/%s %s %s %s %s %f" % ( + call_omx_str = "zsh %s/%s %s %s %s %s %f" % ( omx_path, script, top_name, @@ -59,7 +60,7 @@ def out_of_context_synth( fpga_part, float(clk_period_ns), ) - call_omx = call_omx.split() + call_omx = call_omx_str.split() launch_process_helper(call_omx, proc_env=os.environ.copy(), cwd=verilog_dir) vivado_proj_folder = "%s/results_%s" % (verilog_dir, top_name) @@ -67,7 +68,7 @@ def out_of_context_synth( with open(res_counts_path, "r") as myfile: res_data = myfile.read().split("\n") - ret = {} + ret: Dict[str, Any] = {} ret["vivado_proj_folder"] = vivado_proj_folder for res_line in res_data: res_fields = res_line.split("=")