diff --git a/finn-rtllib/passthru/rtl/passthru_axi.sv b/finn-rtllib/passthru/rtl/passthru_axi.sv new file mode 100644 index 0000000000..355f7dad42 --- /dev/null +++ b/finn-rtllib/passthru/rtl/passthru_axi.sv @@ -0,0 +1,57 @@ +/****************************************************************************** + * Copyright (C) 2024, Advanced Micro Devices, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @author Thomas B. Preußer + * @brief Wiring-only pass-thru AXI-Stream connector. + */ + +module passthru_axi #( + int unsigned DATA_WIDTH +)( + // Global Control - NOT USED + input logic ap_clk, + input logic ap_rst_n, + + // Input Stream + input logic [DATA_WIDTH-1:0] s_axis_tdata, + input logic s_axis_tvalid, + output logic s_axis_tready, + + // Output Stream + output logic [DATA_WIDTH-1:0] m_axis_tdata, + output logic m_axis_tvalid, + input logic m_axis_tready +); + // Simple pass-through Connection + assign m_axis_tdata = s_axis_tdata; + assign m_axis_tvalid = s_axis_tvalid; + assign s_axis_tready = m_axis_tready; + +endmodule : passthru_axi diff --git a/finn-rtllib/passthru/rtl/passthru_template_wrapper.v b/finn-rtllib/passthru/rtl/passthru_template_wrapper.v new file mode 100644 index 0000000000..450b8b8ea2 --- /dev/null +++ b/finn-rtllib/passthru/rtl/passthru_template_wrapper.v @@ -0,0 +1,62 @@ +/****************************************************************************** + * Copyright (C) 2024, Advanced Micro Devices, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @author Thomas B. Preußer + * @brief Verilog wrapper for IP packaging. + */ + +module $MODULE_NAME_AXI_WRAPPER$ #( + int unsigned DATA_WIDTH = $DATA_WIDTH$ +)( + // Global Control - NOT USED + (* X_INTERFACE_PARAMETER = "ASSOCIATED_BUSIF s_axis:m_axis, ASSOCIATED_RESET ap_rst_n" *) + (* X_INTERFACE_INFO = "xilinx.com:signal:clock:1.0 ap_clk CLK" *) + input ap_clk, + (* X_INTERFACE_PARAMETER = "POLARITY ACTIVE_LOW" *) + input ap_rst_n, + + // Input Stream + input logic [DATA_BITS-1:0] s_axis_tdata, + input logic s_axis_tvalid, + output logic s_axis_tready, + + // Output Stream + output logic [DATA_BITS-1:0] m_axis_tdata, + output logic m_axis_tvalid, + input logic m_axis_tready +); + + passthru_axi #(.DATA_BITS(DATA_BITS)) core ( + .ap_clk(ap_clk), .ap_rst_n(ap_rst_n), + .s_axis_tdata(s_axis_tdata), .s_axis_tvalid(s_axis_tvalid), .s_axis_tready(s_axis_tready), + .m_axis_tdata(m_axis_tdata), .m_axis_tvalid(m_axis_tvalid), .m_axis_tready(m_axis_tready) + ); + +endmodule // $MODULE_NAME_AXI_WRAPPER$ diff --git a/src/finn/custom_op/fpgadataflow/__init__.py b/src/finn/custom_op/fpgadataflow/__init__.py index aed2ab7fe1..a9b787112b 100644 --- a/src/finn/custom_op/fpgadataflow/__init__.py +++ b/src/finn/custom_op/fpgadataflow/__init__.py @@ -27,6 +27,36 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# The base class of all generic custom operations before specializing to either +# HLS or RTL backend +from finn.custom_op.fpgadataflow.hwcustomop import HWCustomOp + +# Dictionary of HWCustomOp implementations +custom_op = dict() + + +# Registers a class into the custom_op dictionary +# Note: This must be defined first, before importing any custom op +# implementation to avoid "importing partially initialized module" issues. +def register_custom_op(cls): + # The class must actually implement HWCustomOp + assert issubclass(cls, HWCustomOp), f"{cls} must subclass {HWCustomOp}" + # Insert the class into the custom_op dictionary by its name + custom_op[cls.__name__] = cls # noqa: Some weird type annotation issue? + # Pass through the class unmodified + return cls + + +# flake8: noqa +# Disable linting from here, as all import will be flagged E402 and maybe F401 + + +# Import the submodule containing the Squeeze operation +# Note: This will automatically register all decorated classes into this domain +import finn.custom_op.fpgadataflow.squeeze +# Import the submodule containing the Unsqueeze operation +import finn.custom_op.fpgadataflow.unsqueeze + from finn.custom_op.fpgadataflow.addstreams import AddStreams from finn.custom_op.fpgadataflow.channelwise_op import ChannelwiseOp from finn.custom_op.fpgadataflow.concat import StreamingConcat @@ -55,8 +85,6 @@ from finn.custom_op.fpgadataflow.upsampler import UpsampleNearestNeighbour from finn.custom_op.fpgadataflow.vectorvectoractivation import VVAU -custom_op = dict() - # make sure new HLSCustomOp subclasses are imported here so that they get # registered and plug in correctly into the infrastructure custom_op["MVAU"] = MVAU diff --git a/src/finn/custom_op/fpgadataflow/hls/__init__.py b/src/finn/custom_op/fpgadataflow/hls/__init__.py index 405c47a08d..05fd6931cb 100644 --- a/src/finn/custom_op/fpgadataflow/hls/__init__.py +++ b/src/finn/custom_op/fpgadataflow/hls/__init__.py @@ -26,6 +26,40 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# The base class of all HWCustomOp specializations to HLS backend implementation +from finn.custom_op.fpgadataflow.hlsbackend import HLSBackend + +# The base class of all generic custom operations before specializing to either +# HLS or RTL backend +from finn.custom_op.fpgadataflow.hwcustomop import HWCustomOp + +# Dictionary of HLSBackend implementations +custom_op = dict() + + +# Registers a class into the custom_op dictionary +# Note: This must be defined first, before importing any custom op +# implementation to avoid "importing partially initialized module" issues. +def register_custom_op(cls): + # The class must actually implement HWCustomOp + assert issubclass(cls, HWCustomOp), f"{cls} must subclass {HWCustomOp}" + # The class must also implement the HLSBackend + assert issubclass(cls, HLSBackend), f"{cls} must subclass {HLSBackend}" + # Insert the class into the custom_op dictionary by its name + custom_op[cls.__name__] = cls # noqa: Some weird type annotation issue? + # Pass through the class unmodified + return cls + + +# flake8: noqa +# Disable linting from here, as all import will be flagged E402 and maybe F401 + +# Import the submodule containing the specialization of the Squeeze operation +# Note: This will automatically register all decorated classes into this domain +import finn.custom_op.fpgadataflow.hls.squeeze_hls +# Import the submodule containing the specialization of the Unsqueeze operation +import finn.custom_op.fpgadataflow.hls.unsqueeze_hls + from finn.custom_op.fpgadataflow.hls.addstreams_hls import AddStreams_hls from finn.custom_op.fpgadataflow.hls.channelwise_op_hls import ChannelwiseOp_hls from finn.custom_op.fpgadataflow.hls.checksum_hls import CheckSum_hls @@ -53,8 +87,6 @@ from finn.custom_op.fpgadataflow.hls.upsampler_hls import UpsampleNearestNeighbour_hls from finn.custom_op.fpgadataflow.hls.vectorvectoractivation_hls import VVAU_hls -custom_op = dict() - # make sure new HLSCustomOp subclasses are imported here so that they get # registered and plug in correctly into the infrastructure custom_op["AddStreams_hls"] = AddStreams_hls diff --git a/src/finn/custom_op/fpgadataflow/hls/squeeze_hls.py b/src/finn/custom_op/fpgadataflow/hls/squeeze_hls.py new file mode 100644 index 0000000000..81748976ec --- /dev/null +++ b/src/finn/custom_op/fpgadataflow/hls/squeeze_hls.py @@ -0,0 +1,234 @@ +# noqa: Duplicate: The HLS implementation is identical to the Unsqueeze +# operator, maybe these should be unified... +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Numpy math and arrays +import numpy as np + +# Operating system stuff, e.g. paths +import os + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Utility for registering HLSBackend HWCustomOp implementations into the module +# scope +from finn.custom_op.fpgadataflow.hls import register_custom_op + +# Base class for specializing HW operators as implemented via HLS +from finn.custom_op.fpgadataflow.hlsbackend import HLSBackend + +# The generic HW custom operator version of the operator as a base class +from finn.custom_op.fpgadataflow.squeeze import Squeeze + + +# HLS Backend specialization of the squeeze operator +@register_custom_op +class Squeeze_hls(Squeeze, HLSBackend): # noqa: Class name does not follow + # CapWords convention + # Node attributes matching the HLS operator + def get_nodeattr_types(self): + # Start from parent operator class attributes + attrs = Squeeze.get_nodeattr_types(self) + # Add the HLSBackend default attributes on top + attrs.update(HLSBackend.get_nodeattr_types(self)) + # Add/Specialize implementation specific attributes here... + # Return the updated attributes dictionary + return attrs + + # Executes squeeze operation in C++ simulation + def _execute_node_cppsim(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op + node = self.onnx_node + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Get the input out of the execution context + inp = context[node.input[0]] # noqa: Duplicate code prepare simulation + # Validate the shape of the input + assert list(inp.shape) == self.get_normal_input_shape(ind=0), \ + f"Input shape mismatch for {node.input[0]}" + # Reshape the input into folded form + inp = inp.reshape(self.get_folded_input_shape(ind=0)) + # Save the folded input to file to be used by simulation + np.save(os.path.join(code_gen_dir, "inp.npy"), inp) + + # Execute the precompiled model + super().exec_precompiled_singlenode_model() + + # Load the output numpy file generated by the C++ simulation + out = np.load(os.path.join(code_gen_dir, "out.npy")) + # Reshape the folded output and insert into the execution context + context[node.output[0]] = out.reshape( + self.get_normal_output_shape(ind=0) + ) + + # Maximum width of any ap_int used in this operator + def get_ap_int_max_w(self): + # Width of the input, there is just one input + i_bits_max = self.get_instream_width(ind=0) + # Width of the output, there is just one output + o_bits_max = self.get_outstream_width(ind=0) + # Find the biggest of the inputs/outputs + return max([i_bits_max, o_bits_max]) + + # Generates list of C++ includes to be placed at the top of the generated + # code + def global_includes(self): + # Currently nothing to include + self.code_gen_dict["$GLOBALS$"] = [] + + # Generates C++ parameters file, i.e., constant initializer inputs + def generate_params(self, model: ModelWrapper, path: str): + # Squeeze has no parameters + pass + + # Generates C++ code of type alias, global constant and macro definitions + def defines(self, var): + # Insert constants and type aliases into the dictionary + self.code_gen_dict["$DEFINES$"] = [ + # Input and output element datatypes + f"using InpType = {self.inp_dtype.get_hls_datatype_str()};", + f"using OutType = {self.out_dtype.get_hls_datatype_str()};", + # Width of single elements to avoid using ::width attribute which is + # not present for datatype float + f"static constexpr auto InpWidth = {self.inp_dtype.bitwidth()};", + f"static constexpr auto OutWidth = {self.out_dtype.bitwidth()};", + # Datatype of elements packed into the input stream + f"using InpPacked = ap_uint<{self.get_instream_width(ind=0)}>;", + # Datatype of elements packed into the output stream + f"using OutPacked = ap_uint<{self.get_outstream_width(ind=0)}>;", + # Input and output HLS stream datatypes + "using InpStream = hls::stream;", + "using OutStream = hls::stream;", + ] + + # Generates C++ code for reading data from .npy (numpy format) for testing + # in C++ simulation + def read_npy_data(self): + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Prepare empty stream reading to append optionals + self.code_gen_dict["$READNPYDATA$"] = [] + # Generate function calls for reading the input files into the input + # streams + self.code_gen_dict["$READNPYDATA$"] += [ + # Generate function call reading from file into the input stream + # Note: Inputs are always represented as numpy floats + 'npy2apintstream(', + f'"{code_gen_dir}/inp.npy", inp_{self.hls_sname()}, false', + ');' + ] + + # Generates C++ code for declaring all streams involved in C++ simulation + # for testing + def strm_decl(self): + # There are always one input and one output stream + self.code_gen_dict["$STREAMDECLARATIONS$"] = [ + # Note: Assumes stream type aliases to be set in defines + f"InpStream inp_{self.hls_sname()};" + f"OutStream out_{self.hls_sname()};" + ] + + # Generates C++ code for calling the computation part of the operator + def docompute(self): + # Number of iterations required to process the whole folded input stream + # Note: This is all but the PE (last) dimension + num_iter = np.prod(self.get_folded_output_shape()[:-1]) + # Write the body of the top-level function + self.code_gen_dict["$DOCOMPUTE$"] = [ + # Repeat for the number of inputs + f"for(std::size_t i = 0; i < {num_iter}; ++i) {{", + # Pipeline the steps of this loop + "#pragma HLS pipeline II=1 style=flp", + # Just read from the input and immediately write the same element to + # the output. Squeezed dimensions, i.e., those with a size of 1 do + # not contribute to the number and order of elements and thus can + # simply be ignored. + f"out_{self.hls_sname()}.write(inp_{self.hls_sname()}.read());", + f"}}" # noqa: f-string symmetry + ] + + # Generates C++ code for reading the output stream and converting back to + # numpy format for testing in C** simulation + def dataoutstrm(self): + # Output data will be stored in numpy files in the code generation + # dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Get the expected shape of the folded output array formatted as a C++ + # vector initializer + # Note: Valid formatting relies on correct placement of curly braces + # and line breaks: Open/close all three braces on the same line of code + # to avoid '\n' to be inserted into the string + shape = f"""{{{ + ','.join((str(i) for i in self.get_folded_output_shape(ind=0))) + }}}""" + # Generate function call for reading from the output stream into the + # output file + self.code_gen_dict["$DATAOUTSTREAM$"] = [ + # Generate function call reading from stream into the output file + # Note: Outputs are always represented as numpy floats + 'apintstream2npy(', + f'out_{self.hls_sname()}, {shape}, "{code_gen_dir}/out.npy", false', + ');', + ] + + # Generates C++ code for saving the output of C++ simulation to a file in + # numpy format + def save_as_npy(self): + # Note: This seems to be empty in ALL HLSBackends. Probably it was used + # for something before, which is now integrated into dataoutstrm()? + self.code_gen_dict["$SAVEASCNPY$"] = [] + + # Generates essentially the head of the C++ function from which the IP block + # will be generated during ipgen, i.e. actual synthesis + def blackboxfunction(self): + # Insert function head describing the top level interface of the + # squeeze operator + self.code_gen_dict["$BLACKBOXFUNCTION$"] = [ + # Note: Assumes stream type aliases to be set in defines + f"void {self.onnx_node.name} (", + f" InpStream &inp_{self.hls_sname()},", + f" OutStream &out_{self.hls_sname()}", + ")", + ] + + # Generates C++ pragmas to be inserted into the main function of the C++ + # simulation and the ipgen-blackboxfunction as well + def pragmas(self): + # Check whether there are already pragmas in the code generation + # dictionary + if "$PRAGMAS$" not in self.code_gen_dict: + # If not, insert an empty list to collect more pragmas + self.code_gen_dict["$PRAGMAS$"] = [] + + # Add HLS interface directives specifying how to create RTL ports for + # the top-level function arguments + self.code_gen_dict["$PRAGMAS$"] += [ + # Connect the input and output stream with an axi stream interface + f"#pragma HLS INTERFACE axis port=out_{self.hls_sname()}", + f"#pragma HLS INTERFACE axis port=inp_{self.hls_sname()}", + # No block-level I/O protocol for the function return value + "#pragma HLS INTERFACE ap_ctrl_none port=return" + ] + + # Returns the names of input and output interfaces grouped by protocol + def get_verilog_top_module_intf_names(self): + # Start collecting interface names in a dictionary starting with clock + # and reset + intf_names = {"clk": ["ap_clk"], "rst": ["ap_rst_n"]} # noqa + # AXI stream input interfaces + intf_names["s_axis"] = [ + (f"inp_{self.hls_sname()}", self.get_instream_width_padded(ind=0)) + ] + # AXI stream output interfaces + intf_names["m_axis"] = [ + (f"out_{self.hls_sname()}", self.get_outstream_width_padded(ind=0)) + ] + # No AXI-MM, AXI-Lite or protocol-less interfaces + intf_names["aximm"] = [] + intf_names["axilite"] = [] + intf_names["ap_none"] = [] + # Return the interface name dictionary + return intf_names diff --git a/src/finn/custom_op/fpgadataflow/hls/unsqueeze_hls.py b/src/finn/custom_op/fpgadataflow/hls/unsqueeze_hls.py new file mode 100644 index 0000000000..2e482b5ccb --- /dev/null +++ b/src/finn/custom_op/fpgadataflow/hls/unsqueeze_hls.py @@ -0,0 +1,234 @@ +# noqa: Duplicate: The HLS implementation is identical to the Squeeze operator, +# maybe these should be unified... +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Numpy math and arrays +import numpy as np + +# Operating system stuff, e.g. paths +import os + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Utility for registering HLSBackend HWCustomOp implementations into the module +# scope +from finn.custom_op.fpgadataflow.hls import register_custom_op + +# Base class for specializing HW operators as implemented via HLS +from finn.custom_op.fpgadataflow.hlsbackend import HLSBackend + +# The generic HW custom operator version of the operator as a base class +from finn.custom_op.fpgadataflow.unsqueeze import Unsqueeze + + +# HLS Backend specialization of the unsqueeze operator +@register_custom_op +class Unsqueeze_hls(Unsqueeze, HLSBackend): # noqa: Class name does not follow + # CapWords convention + # Node attributes matching the HLS operator + def get_nodeattr_types(self): + # Start from parent operator class attributes + attrs = Unsqueeze.get_nodeattr_types(self) + # Add the HLSBackend default attributes on top + attrs.update(HLSBackend.get_nodeattr_types(self)) + # Add/Specialize implementation specific attributes here... + # Return the updated attributes dictionary + return attrs + + # Executes unsqueeze operation in C++ simulation + def _execute_node_cppsim(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op + node = self.onnx_node + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Get the input out of the execution context + inp = context[node.input[0]] # noqa: Duplicate code prepare simulation + # Validate the shape of the input + assert list(inp.shape) == self.get_normal_input_shape(ind=0), \ + f"Input shape mismatch for {node.input[0]}" + # Reshape the input into folded form + inp = inp.reshape(self.get_folded_input_shape(ind=0)) + # Save the folded input to file to be used by simulation + np.save(os.path.join(code_gen_dir, "inp.npy"), inp) + + # Execute the precompiled model + super().exec_precompiled_singlenode_model() + + # Load the output numpy file generated by the C++ simulation + out = np.load(os.path.join(code_gen_dir, "out.npy")) + # Reshape the folded output and insert into the execution context + context[node.output[0]] = out.reshape( + self.get_normal_output_shape(ind=0) + ) + + # Maximum width of any ap_int used in this operator + def get_ap_int_max_w(self): + # Width of the input, there is just one input + i_bits_max = self.get_instream_width(ind=0) + # Width of the output, there is just one output + o_bits_max = self.get_outstream_width(ind=0) + # Find the biggest of the inputs/outputs + return max([i_bits_max, o_bits_max]) + + # Generates list of C++ includes to be placed at the top of the generated + # code + def global_includes(self): + # Currently nothing to include + self.code_gen_dict["$GLOBALS$"] = [] + + # Generates C++ parameters file, i.e., constant initializer inputs + def generate_params(self, model: ModelWrapper, path: str): + # Squeeze has no parameters + pass + + # Generates C++ code of type alias, global constant and macro definitions + def defines(self, var): + # Insert constants and type aliases into the dictionary + self.code_gen_dict["$DEFINES$"] = [ + # Input and output element datatypes + f"using InpType = {self.inp_dtype.get_hls_datatype_str()};", + f"using OutType = {self.out_dtype.get_hls_datatype_str()};", + # Width of single elements to avoid using ::width attribute which is + # not present for datatype float + f"static constexpr auto InpWidth = {self.inp_dtype.bitwidth()};", + f"static constexpr auto OutWidth = {self.out_dtype.bitwidth()};", + # Datatype of elements packed into the input stream + f"using InpPacked = ap_uint<{self.get_instream_width(ind=0)}>;", + # Datatype of elements packed into the output stream + f"using OutPacked = ap_uint<{self.get_outstream_width(ind=0)}>;", + # Input and output HLS stream datatypes + "using InpStream = hls::stream;", + "using OutStream = hls::stream;", + ] + + # Generates C++ code for reading data from .npy (numpy format) for testing + # in C++ simulation + def read_npy_data(self): + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Prepare empty stream reading to append optionals + self.code_gen_dict["$READNPYDATA$"] = [] + # Generate function calls for reading the input files into the input + # streams + self.code_gen_dict["$READNPYDATA$"] += [ + # Generate function call reading from file into the input stream + # Note: Inputs are always represented as numpy floats + 'npy2apintstream(', + f'"{code_gen_dir}/inp.npy", inp_{self.hls_sname()}, false', + ');' + ] + + # Generates C++ code for declaring all streams involved in C++ simulation + # for testing + def strm_decl(self): + # There are always one input and one output stream + self.code_gen_dict["$STREAMDECLARATIONS$"] = [ + # Note: Assumes stream type aliases to be set in defines + f"InpStream inp_{self.hls_sname()};" + f"OutStream out_{self.hls_sname()};" + ] + + # Generates C++ code for calling the computation part of the operator + def docompute(self): + # Number of iterations required to process the whole folded input stream + # Note: This is all but the PE (last) dimension + num_iter = np.prod(self.get_folded_output_shape()[:-1]) + # Write the body of the top-level function + self.code_gen_dict["$DOCOMPUTE$"] = [ + # Repeat for the number of inputs + f"for(std::size_t i = 0; i < {num_iter}; ++i) {{", + # Pipeline the steps of this loop + "#pragma HLS pipeline II=1 style=flp", + # Just read from the input and immediately write the same element to + # the output. Unsqueezed dimensions, i.e., those with a size of 1 do + # not contribute to the number and order of elements and thus can + # simply be ignored. + f"out_{self.hls_sname()}.write(inp_{self.hls_sname()}.read());", + f"}}" # noqa: f-string symmetry + ] + + # Generates C++ code for reading the output stream and converting back to + # numpy format for testing in C** simulation + def dataoutstrm(self): + # Output data will be stored in numpy files in the code generation + # dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim") + # Get the expected shape of the folded output array formatted as a C++ + # vector initializer + # Note: Valid formatting relies on correct placement of curly braces + # and line breaks: Open/close all three braces on the same line of code + # to avoid '\n' to be inserted into the string + shape = f"""{{{ + ','.join((str(i) for i in self.get_folded_output_shape(ind=0))) + }}}""" + # Generate function call for reading from the output stream into the + # output file + self.code_gen_dict["$DATAOUTSTREAM$"] = [ + # Generate function call reading from stream into the output file + # Note: Outputs are always represented as numpy floats + 'apintstream2npy(', + f'out_{self.hls_sname()}, {shape}, "{code_gen_dir}/out.npy", false', + ');', + ] + + # Generates C++ code for saving the output of C++ simulation to a file in + # numpy format + def save_as_npy(self): + # Note: This seems to be empty in ALL HLSBackends. Probably it was used + # for something before, which is now integrated into dataoutstrm()? + self.code_gen_dict["$SAVEASCNPY$"] = [] + + # Generates essentially the head of the C++ function from which the IP block + # will be generated during ipgen, i.e. actual synthesis + def blackboxfunction(self): + # Insert function head describing the top level interface of the + # squeeze operator + self.code_gen_dict["$BLACKBOXFUNCTION$"] = [ + # Note: Assumes stream type aliases to be set in defines + f"void {self.onnx_node.name} (", + f" InpStream &inp_{self.hls_sname()},", + f" OutStream &out_{self.hls_sname()}", + ")", + ] + + # Generates C++ pragmas to be inserted into the main function of the C++ + # simulation and the ipgen-blackboxfunction as well + def pragmas(self): + # Check whether there are already pragmas in the code generation + # dictionary + if "$PRAGMAS$" not in self.code_gen_dict: + # If not, insert an empty list to collect more pragmas + self.code_gen_dict["$PRAGMAS$"] = [] + + # Add HLS interface directives specifying how to create RTL ports for + # the top-level function arguments + self.code_gen_dict["$PRAGMAS$"] += [ + # Connect the input and output stream with an axi stream interface + f"#pragma HLS INTERFACE axis port=out_{self.hls_sname()}", + f"#pragma HLS INTERFACE axis port=inp_{self.hls_sname()}", + # No block-level I/O protocol for the function return value + "#pragma HLS INTERFACE ap_ctrl_none port=return" + ] + + # Returns the names of input and output interfaces grouped by protocol + def get_verilog_top_module_intf_names(self): + # Start collecting interface names in a dictionary starting with clock + # and reset + intf_names = {"clk": ["ap_clk"], "rst": ["ap_rst_n"]} # noqa + # AXI stream input interfaces + intf_names["s_axis"] = [ + (f"inp_{self.hls_sname()}", self.get_instream_width_padded(ind=0)) + ] + # AXI stream output interfaces + intf_names["m_axis"] = [ + (f"out_{self.hls_sname()}", self.get_outstream_width_padded(ind=0)) + ] + # No AXI-MM, AXI-Lite or protocol-less interfaces + intf_names["aximm"] = [] + intf_names["axilite"] = [] + intf_names["ap_none"] = [] + # Return the interface name dictionary + return intf_names diff --git a/src/finn/custom_op/fpgadataflow/squeeze.py b/src/finn/custom_op/fpgadataflow/squeeze.py new file mode 100644 index 0000000000..842be34813 --- /dev/null +++ b/src/finn/custom_op/fpgadataflow/squeeze.py @@ -0,0 +1,329 @@ +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Numpy math and arrays +import numpy as np + +# Operating system stuff, e.g. paths +import os + +# Python warning subsystem +import warnings + +# Copies of python objects, copy.deepcopy +import copy + +# QONNX/FINN datatypes +from qonnx.core.datatype import DataType + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Utility for registering HWCustomOp implementations into the module scope +from finn.custom_op.fpgadataflow import register_custom_op + +# Derive custom operators form the FINN base custom op +from finn.custom_op.fpgadataflow.hwcustomop import HWCustomOp + +# Converts inputs/outputs to/from RTL simulation format +from finn.util.data_packing import npy_to_rtlsim_input, rtlsim_output_to_npy + + +# Squeeze operation: Removes single-dimension entries from the shape of a tensor +@register_custom_op +class Squeeze(HWCustomOp): + # Initializes the operator given an onnx graph node + def __init__(self, onnx_node, **kwargs): + # Just forward all arguments to the init method of the CustomOp base + super().__init__(onnx_node, **kwargs) + + # Defines attributes which must be present on this node + def get_nodeattr_types(self): + # Start from parent operator class attributes # noqa: Duplicate + attrs = HWCustomOp.get_nodeattr_types(self) + # Update attributes dictionary for new custom operator + attrs.update({ + # Axes to be squeezed can be given as an attribute for opset < 13 + "axes": ("ints", False, None), + # Data type of the input elements + "inp_dtype": ("s", True, ""), + # Data type of the output elements + "out_dtype": ("s", True, ""), + # Shape of the input + "inp_shape": ("ints", True, [1]), + # Shape of the output + "out_shape": ("ints", True, [1]), + # Number of elements in the last dimensions processed in parallel + "PE": ("i", False, 1), + # Possible execution modes for simulating this node + # Note: Override to support python mode + "exec_mode": ( + "s", False, "python", {"", "rtlsim", "cppsim", "python"} + ), + }) + # Return updated attribute dictionary + return attrs + + # Datatype attribute as property for convenience + @property + def inp_dtype(self): + # Note: Converts from string to QONNX data type + return DataType[self.get_nodeattr("inp_dtype")] + + # Datatype attribute as property for convenience + @property + def out_dtype(self): + # Note: Converts from string to QONNX data type + return DataType[self.get_nodeattr("out_dtype")] + + # Shape attribute as property for convenience + @property + def inp_shape(self): + return self.get_nodeattr("inp_shape") + + # Shape attribute as property for convenience + @property + def out_shape(self): + return self.get_nodeattr("out_shape") + + # Number of parallel processed elements as property for convenience + @property + def pe(self): + return self.get_nodeattr("PE") + + # Makes an operation compatible with the output shape for shape inference + # Note: Propagates shape forward, i.e., never asks for the shape of the + # output, even if it seems easier. + def make_shape_compatible_op(self, model: ModelWrapper): # noqa + # Get the node wrapped by this custom op + node = copy.deepcopy(self.onnx_node) + # Though providing squeezed axes via a second input is supported by the + # implementation, the inferred shapes might eb incorrect if this is + # truly a dynamic list of axes changing at runtime. + if len(node.input) > 1: + # Issue a warning to make the user aware of this potential issue + warnings.warn( + f"{node.name}: Providing dimensions to squeeze as an input" + f" might invalidate shape inference if these are not constant." + ) + # Transplant this operator back into the standard ONNX domain + node.domain = "" + # Shape inference should now work on this standard ONNX node + return node + + # Infers the datatype of the node output + def infer_node_datatype(self, model: ModelWrapper): # noqa + # Get the node wrapped by this custom op # noqa Duplicate + node = self.onnx_node # noqa: Duplicate + # Test for changing input datatype + if model.get_tensor_datatype(node.input[0]) != self.inp_dtype: + # Get the new datatype + new_dtype = model.get_tensor_datatype(node.input[0]) + # Issue a warning message + warnings.warn( + f"{node.name}: inp_dtype changing from" + f" {self.inp_dtype} to {new_dtype}" + ) + # Set the new datatype attribute + self.set_nodeattr("inp_dtype", new_dtype.name) + # Though providing squeezed axes via a second input is supported by the + # implementation, the datatype of this input is ignored here + if len(node.input) > 1: + # Issue a warning to make the user aware of this potential issue + warnings.warn( + f"{node.name}: Providing dimensions to squeeze as an input" + f" will be ignored by datatype inference." + ) + # Make sure the output always has the same type as the input + if self.out_dtype != self.inp_dtype: + # Issue a warning message + warnings.warn( + f"{node.name}: out_dtype changing from" + f" {self.out_dtype} to {self.inp_dtype}" + ) + # Set the new datatype attribute + self.set_nodeattr("out_dtype", self.inp_dtype.name) + # Force the output data type stored as a node attribute + model.set_tensor_datatype(node.output[0], self.out_dtype) + + # Executes squeeze operation in python + def _execute_node_python(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op + node = self.onnx_node # noqa: Duplicate + # Get the input from the execution context + inp = context[node.input[0]] + # Try with axes specified as attribute first + axes = self.get_nodeattr("axes") + # If there are exes specified via attribute but there is a second input + # to the operator, this input specifies the axes to be squeezed + if axes is None and len(node.input) > 1: + # Get the axes list from the execution context + axes = context[node.input[1]] + # If axes are specified convert them to tuple as required by numpy + axes = tuple(axes) if axes is not None else None + # Squeeze the input along the optionally specified axes + out = np.squeeze(inp, axis=axes) + # Make sure the output has the right type (always use float32 as the + # container type) and insert into the execution context + context[node.output[0]] = out.astype(np.float32) + + # Executes squeeze operation in C++ simulation + def _execute_node_cppsim(self, context, graph): # noqa: graph unused + # C++ Simulation needs to be implemented in HLS backend specialization + raise NotImplementedError( + f"exec_mode cppsim of {self.__class__.__name__} is not implemented!" + ) + + # Executes squeeze operation in RTL simulation + def _execute_node_rtlsim(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op # noqa Duplicate + node = self.onnx_node + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_ipgen") + # Get the inputs out of the execution context + inp = context[node.input[0]] # noqa: Duplicate code prepare simulation + # Validate the shape of the inputs + assert list(inp.shape) == self.get_normal_input_shape(ind=0), \ + f"Input shape mismatch for {node.input[0]}" + # Reshape the input into folded form + inp = inp.reshape(self.get_folded_input_shape(ind=0)) + # Path to store the intermediate input in numpy format + inp_filename = os.path.join(code_gen_dir, "inp.npy") + # Save the folded input to file to be used by simulation + np.save(inp_filename, inp) + # Start collecting inputs/outputs to the RTL simulation in a dictionary + # Note: Prepare one output empty output list + io_dict = { + "inputs": {}, + "outputs": {"out": []} + } + # Type and width of the input tensors + inp_dtype = self.get_input_datatype(ind=0) + inp_width = self.get_instream_width(ind=0) + + # Convert input to RTL simulation format + io_dict["inputs"]["inp"] = npy_to_rtlsim_input( + inp_filename, inp_dtype, inp_width + ) + + # Setup PyVerilator simulation of the node + sim = self.get_rtlsim() # noqa: Duplicate code prepare simulation + # Reset the RTL simulation + super().reset_rtlsim(sim) + super().toggle_clk(sim) + # Run the RTL Simulation + self.rtlsim_multi_io(sim, io_dict) + + # Collect the output from RTL simulation + out = io_dict["outputs"]["out"] + # Type and sizes of the output tensor + dtype = self.get_output_datatype(ind=0) # noqa: Duplicate readout code + width = self.get_outstream_width(ind=0) + shape = self.get_folded_output_shape(ind=0) + # Path to store the intermediate numpy file + filename = os.path.join(code_gen_dir, "out.npy") + # Convert from RTL simulation format to numpy format + rtlsim_output_to_npy( + out, filename, dtype, shape, width, dtype.bitwidth() + ) + # Load the generated output numpy file + out = np.load(filename) + # Reshape the folded output and insert into the execution context + context[node.output[0]] = out.reshape( + self.get_normal_output_shape(ind=0) + ) + + # Executes squeeze operation in simulation (either python c++ or rtl sim) + def execute_node(self, context, graph): + # Get the configured execution mode + mode = self.get_nodeattr("exec_mode") + # Lookup table mapping execution modes to implementing methods + exec_fns = { + "python": self._execute_node_python, + "cppsim": self._execute_node_cppsim, + "rtlsim": self._execute_node_rtlsim, + } + # Select and execute the function by mode string + exec_fns[mode](context, graph) + + # Verifies the node attributes, inputs and outputs + def verify_node(self): + # TODO: Implement + return [] + + # Note: End of QONNX CustomOp region, below is FINN HWCustomOp stuff + + # Gets the datatype of input at index ind + def get_input_datatype(self, ind=0): + # There is only one proper input (we ignore the optional axes input + # here) + return self.inp_dtype + + # Gets the datatype of the output at index ind + def get_output_datatype(self, ind=0): + # There is only one output, the type is set as an attribute + return self.out_dtype + + # Gets the shape of the input at index ind without folding + def get_normal_input_shape(self, ind=0): + # There is only one proper input (we ignore the optional axes input + # here) + return self.inp_shape + + # Gets the shape of the output at index ind without folding + def get_normal_output_shape(self, ind=0): + # The output shape is stored as a node attribute + return self.out_shape + + # Gets the shape of the input at index ind with folding + def get_folded_input_shape(self, ind=0): + # Get the normal shape before applying folding + *num_inputs, num_elems = self.get_normal_input_shape(ind=ind) + # Valid folding requires the PE to divide the number of elements + assert num_elems % self.pe == 0, "PE must divide last axis" + # Folding along the last dimension + return *num_inputs, num_elems // self.pe, self.pe + + # Gets the shape of the output at index ind with folding + def get_folded_output_shape(self, ind=0): + # Get the normal shape before applying folding + *num_outputs, num_elems = self.get_normal_output_shape(ind=ind) + # Valid folding requires the PE to divide the number of elements + assert num_elems % self.pe == 0, "PE must divide last axis" + # Folding along the last dimension + return *num_outputs, num_elems // self.pe, self.pe + + # Widths of the input data stream of the input at index ind + def get_instream_width(self, ind=0): + # Get the number of bits used to represent the input + i_bits = self.get_input_datatype(ind).bitwidth() + # Parallelism is the number of elements in the last dimension of the + # folded input + *_, elems = self.get_folded_input_shape(ind) + # Width of a stream receiving input elements in parallel + return elems * i_bits + + # Widths of the output data stream of the output at index ind + def get_outstream_width(self, ind=0): + # Get the number of bits used to represent the output + o_bits = self.get_output_datatype(ind).bitwidth() + # Parallelism is the number of elements in the last dimension of the + # folded output + *_, elems = self.get_folded_output_shape(ind) + # Width of a stream producing output elements in parallel + return elems * o_bits + + # Gets the number of expected output values, i.e. how many times read() + # could/should be called on any output stream of this operator + def get_number_output_values(self): + # Elements over all but the last dimension of the output folded along + # the embedding dimension. + return np.prod(self.get_folded_output_shape()[:-1]) + + # Derives the expected cycles for the squeeze operation given the folding + # configuration + def get_exp_cycles(self): + # Number of iterations required to process the whole folded stream + # Note: This is all but the PE (last, parallelized) dimension + return np.prod(self.get_folded_output_shape()[:-1]) diff --git a/src/finn/custom_op/fpgadataflow/unsqueeze.py b/src/finn/custom_op/fpgadataflow/unsqueeze.py new file mode 100644 index 0000000000..92b3b32f22 --- /dev/null +++ b/src/finn/custom_op/fpgadataflow/unsqueeze.py @@ -0,0 +1,330 @@ +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Numpy math and arrays +import numpy as np + +# Operating system stuff, e.g. paths +import os + +# Python warning subsystem +import warnings + +# Copies of python objects, copy.deepcopy +import copy + +# QONNX/FINN datatypes +from qonnx.core.datatype import DataType + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Utility for registering HWCustomOp implementations into the module scope +from finn.custom_op.fpgadataflow import register_custom_op + +# Derive custom operators form the FINN base custom op +from finn.custom_op.fpgadataflow.hwcustomop import HWCustomOp + +# Converts inputs/outputs to/from RTL simulation format +from finn.util.data_packing import npy_to_rtlsim_input, rtlsim_output_to_npy + + +# Unsqueeze operation: Inserts single-dimension entries into the shape of a +# tensor +@register_custom_op +class Unsqueeze(HWCustomOp): + # Initializes the operator given an onnx graph node + def __init__(self, onnx_node, **kwargs): + # Just forward all arguments to the init method of the CustomOp base + super().__init__(onnx_node, **kwargs) + + # Defines attributes which must be present on this node + def get_nodeattr_types(self): + # Start from parent operator class attributes # noqa: Duplicate + attrs = HWCustomOp.get_nodeattr_types(self) + # Update attributes dictionary for new custom operator + attrs.update({ + # Axes to be unsqueezed can be given as an attribute for opset < 13 + "axes": ("ints", False, None), + # Data type of the input elements + "inp_dtype": ("s", True, ""), + # Data type of the output elements + "out_dtype": ("s", True, ""), + # Shape of the input + "inp_shape": ("ints", True, [1]), + # Shape of the output + "out_shape": ("ints", True, [1]), + # Number of elements in the last dimensions processed in parallel + "PE": ("i", False, 1), + # Possible execution modes for simulating this node + # Note: Override to support python mode + "exec_mode": ( + "s", False, "python", {"", "rtlsim", "cppsim", "python"} + ), + }) + # Return updated attribute dictionary + return attrs + + # Datatype attribute as property for convenience + @property + def inp_dtype(self): + # Note: Converts from string to QONNX data type + return DataType[self.get_nodeattr("inp_dtype")] + + # Datatype attribute as property for convenience + @property + def out_dtype(self): + # Note: Converts from string to QONNX data type + return DataType[self.get_nodeattr("out_dtype")] + + # Shape attribute as property for convenience + @property + def inp_shape(self): + return self.get_nodeattr("inp_shape") + + # Shape attribute as property for convenience + @property + def out_shape(self): + return self.get_nodeattr("out_shape") + + # Number of parallel processed elements as property for convenience + @property + def pe(self): + return self.get_nodeattr("PE") + + # Makes an operation compatible with the output shape for shape inference + # Note: Propagates shape forward, i.e., never asks for the shape of the + # output, even if it seems easier. + def make_shape_compatible_op(self, model: ModelWrapper): # noqa + # Get the node wrapped by this custom op + node = copy.deepcopy(self.onnx_node) + # Though providing squeezed axes via a second input is supported by the + # implementation, the inferred shapes might eb incorrect if this is + # truly a dynamic list of axes changing at runtime. + if len(node.input) > 1: + # Issue a warning to make the user aware of this potential issue + warnings.warn( + f"{node.name}: Providing dimensions to unsqueeze as an input" + f" might invalidate shape inference if these are not constant." + ) + # Transplant this operator back into the standard ONNX domain + node.domain = "" + # Shape inference should now work on this standard ONNX node + return node + + # Infers the datatype of the node output + def infer_node_datatype(self, model: ModelWrapper): # noqa + # Get the node wrapped by this custom op # noqa Duplicate + node = self.onnx_node # noqa: Duplicate + # Test for changing input datatype + if model.get_tensor_datatype(node.input[0]) != self.inp_dtype: + # Get the new datatype + new_dtype = model.get_tensor_datatype(node.input[0]) + # Issue a warning message + warnings.warn( + f"{node.name}: inp_dtype changing from" + f" {self.inp_dtype} to {new_dtype}" + ) + # Set the new datatype attribute + self.set_nodeattr("inp_dtype", new_dtype.name) + # Though providing squeezed axes via a second input is supported by the + # implementation, the datatype of this input is ignored here + if len(node.input) > 1: + # Issue a warning to make the user aware of this potential issue + warnings.warn( + f"{node.name}: Providing dimensions to squeeze as an input" + f" will be ignored by datatype inference." + ) + # Make sure the output always has the same type as the input + if self.out_dtype != self.inp_dtype: + # Issue a warning message + warnings.warn( + f"{node.name}: out_dtype changing from" + f" {self.out_dtype} to {self.inp_dtype}" + ) + # Set the new datatype attribute + self.set_nodeattr("out_dtype", self.inp_dtype.name) + # Force the output data type stored as a node attribute + model.set_tensor_datatype(node.output[0], self.out_dtype) + + # Executes squeeze operation in python + def _execute_node_python(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op + node = self.onnx_node # noqa: Duplicate + # Get the input from the execution context + inp = context[node.input[0]] + # Try with axes specified as attribute first + axes = self.get_nodeattr("axes") + # If there are exes specified via attribute but there is a second input + # to the operator, this input specifies the axes to be unsqueezed + if axes is None and len(node.input) > 1: + # Get the axes list from the execution context + axes = context[node.input[1]] + # If axes are specified convert them to tuple as required by numpy + axes = tuple(axes) if axes is not None else None + # Unsqueeze the input along the optionally specified axes + out = np.expand_dims(inp, axis=axes) + # Make sure the output has the right type (always use float32 as the + # container type) and insert into the execution context + context[node.output[0]] = out.astype(np.float32) + + # Executes squeeze operation in C++ simulation + def _execute_node_cppsim(self, context, graph): # noqa: graph unused + # C++ Simulation needs to be implemented in HLS backend specialization + raise NotImplementedError( + f"exec_mode cppsim of {self.__class__.__name__} is not implemented!" + ) + + # Executes unsqueeze operation in RTL simulation + def _execute_node_rtlsim(self, context, graph): # noqa: graph unused + # Get the node wrapped by this custom op # noqa Duplicate + node = self.onnx_node + # Input data is stored in numpy files in the code generation dictionary + code_gen_dir = self.get_nodeattr("code_gen_dir_ipgen") + # Get the inputs out of the execution context + inp = context[node.input[0]] # noqa: Duplicate code prepare simulation + # Validate the shape of the inputs + assert list(inp.shape) == self.get_normal_input_shape(ind=0), \ + f"Input shape mismatch for {node.input[0]}" + # Reshape the input into folded form + inp = inp.reshape(self.get_folded_input_shape(ind=0)) + # Path to store the intermediate input in numpy format + inp_filename = os.path.join(code_gen_dir, "inp.npy") + # Save the folded input to file to be used by simulation + np.save(inp_filename, inp) + # Start collecting inputs/outputs to the RTL simulation in a dictionary + # Note: Prepare one output empty output list + io_dict = { + "inputs": {}, + "outputs": {"out": []} + } + # Type and width of the input tensors + inp_dtype = self.get_input_datatype(ind=0) + inp_width = self.get_instream_width(ind=0) + + # Convert input to RTL simulation format + io_dict["inputs"]["inp"] = npy_to_rtlsim_input( + inp_filename, inp_dtype, inp_width + ) + + # Setup PyVerilator simulation of the node + sim = self.get_rtlsim() # noqa: Duplicate code prepare simulation + # Reset the RTL simulation + super().reset_rtlsim(sim) + super().toggle_clk(sim) + # Run the RTL Simulation + self.rtlsim_multi_io(sim, io_dict) + + # Collect the output from RTL simulation + out = io_dict["outputs"]["out"] + # Type and sizes of the output tensor + dtype = self.get_output_datatype(ind=0) # noqa: Duplicate readout code + width = self.get_outstream_width(ind=0) + shape = self.get_folded_output_shape(ind=0) + # Path to store the intermediate numpy file + filename = os.path.join(code_gen_dir, "out.npy") + # Convert from RTL simulation format to numpy format + rtlsim_output_to_npy( + out, filename, dtype, shape, width, dtype.bitwidth() + ) + # Load the generated output numpy file + out = np.load(filename) + # Reshape the folded output and insert into the execution context + context[node.output[0]] = out.reshape( + self.get_normal_output_shape(ind=0) + ) + + # Executes unsqueeze operation in simulation (either python c++ or rtl sim) + def execute_node(self, context, graph): + # Get the configured execution mode + mode = self.get_nodeattr("exec_mode") + # Lookup table mapping execution modes to implementing methods + exec_fns = { + "python": self._execute_node_python, + "cppsim": self._execute_node_cppsim, + "rtlsim": self._execute_node_rtlsim, + } + # Select and execute the function by mode string + exec_fns[mode](context, graph) + + # Verifies the node attributes, inputs and outputs + def verify_node(self): + # TODO: Implement + return [] + + # Note: End of QONNX CustomOp region, below is FINN HWCustomOp stuff + + # Gets the datatype of input at index ind + def get_input_datatype(self, ind=0): + # There is only one proper input (we ignore the optional axes input + # here) + return self.inp_dtype + + # Gets the datatype of the output at index ind + def get_output_datatype(self, ind=0): + # There is only one output, the type is set as an attribute + return self.out_dtype + + # Gets the shape of the input at index ind without folding + def get_normal_input_shape(self, ind=0): + # There is only one proper input (we ignore the optional axes input + # here) + return self.inp_shape + + # Gets the shape of the output at index ind without folding + def get_normal_output_shape(self, ind=0): + # The output shape is stored as a node attribute + return self.out_shape + + # Gets the shape of the input at index ind with folding + def get_folded_input_shape(self, ind=0): + # Get the normal shape before applying folding + *num_inputs, num_elems = self.get_normal_input_shape(ind=ind) + # Valid folding requires the PE to divide the number of elements + assert num_elems % self.pe == 0, "PE must divide last axis" + # Folding along the last dimension + return *num_inputs, num_elems // self.pe, self.pe + + # Gets the shape of the output at index ind with folding + def get_folded_output_shape(self, ind=0): + # Get the normal shape before applying folding + *num_outputs, num_elems = self.get_normal_output_shape(ind=ind) + # Valid folding requires the PE to divide the number of elements + assert num_elems % self.pe == 0, "PE must divide last axis" + # Folding along the last dimension + return *num_outputs, num_elems // self.pe, self.pe + + # Widths of the input data stream of the input at index ind + def get_instream_width(self, ind=0): + # Get the number of bits used to represent the input + i_bits = self.get_input_datatype(ind).bitwidth() + # Parallelism is the number of elements in the last dimension of the + # folded input + *_, elems = self.get_folded_input_shape(ind) + # Width of a stream receiving input elements in parallel + return elems * i_bits + + # Widths of the output data stream of the output at index ind + def get_outstream_width(self, ind=0): + # Get the number of bits used to represent the output + o_bits = self.get_output_datatype(ind).bitwidth() + # Parallelism is the number of elements in the last dimension of the + # folded output + *_, elems = self.get_folded_output_shape(ind) + # Width of a stream producing output elements in parallel + return elems * o_bits + + # Gets the number of expected output values, i.e. how many times read() + # could/should be called on any output stream of this operator + def get_number_output_values(self): + # Elements over all but the last dimension of the output folded along + # the embedding dimension. + return np.prod(self.get_folded_output_shape()[:-1]) + + # Derives the expected cycles for the squeeze operation given the folding + # configuration + def get_exp_cycles(self): + # Number of iterations required to process the whole folded stream + # Note: This is all but the PE (last, parallelized) dimension + return np.prod(self.get_folded_output_shape()[:-1]) diff --git a/src/finn/transformation/fpgadataflow/convert_to_hw_layers.py b/src/finn/transformation/fpgadataflow/convert_to_hw_layers.py index b02bc89db8..bc88b7fe87 100644 --- a/src/finn/transformation/fpgadataflow/convert_to_hw_layers.py +++ b/src/finn/transformation/fpgadataflow/convert_to_hw_layers.py @@ -32,6 +32,7 @@ import warnings from onnx import TensorProto, helper from qonnx.core.datatype import DataType +from qonnx.core.modelwrapper import ModelWrapper from qonnx.custom_op.registry import getCustomOp from qonnx.transformation.base import Transformation from qonnx.transformation.general import SortGraph @@ -40,6 +41,9 @@ from qonnx.util.basic import get_by_name from qonnx.util.onnx import nchw_to_nhwc +# Base class for all FINN custom ops, here just used for type-hinting +from finn.custom_op.fpgadataflow.hwcustomop import HWCustomOp + class InferConvInpGen(Transformation): """Convert Im2Col layers to ConvolutionInputGenerator layers.""" @@ -1755,3 +1759,107 @@ def apply(self, model): model = model.transform(InferShapes()) model = model.transform(InferDataTypes()) return (model, graph_modified) + + +# Converts the Squeeze operation to the corresponding FINN custom operation +class InferSqueeze(Transformation): + # Applies the transform to a whole model graph + def apply(self, model: ModelWrapper): # noqa + # Get the model graph out of the model wrapper object + graph = model.graph + # Keep track of whether the graph has been modified + graph_modified = False + # Iterate all nodes in the graph keeping track of the index + for index, node in enumerate(graph.node): + # Handles Squeeze ONNX operations + if node.op_type == "Squeeze": + # Skip already converted nodes + if node.domain == "finn.custom_op.fpgadataflow": + # Skip without warning + continue + # Transplant this operator into our FINN domain + node.domain = "finn.custom_op.fpgadataflow" # noqa: Duplicate + # Now we can get the CustomOp wrapper instance providing easier + # attribute access + inst: HWCustomOp = getCustomOp(node) + # Set the backend attribute to mark this an operation supported + # to be implemented on an FPGA by FINN + inst.set_nodeattr("backend", "fpgadataflow") + # Ge the input and output tensor names + inp, out = node.input[0], node.output[0] + # Set input/output shape and datatype node attributes required + # by FINN custom op + inst.set_nodeattr( + "inp_dtype", str(model.get_tensor_datatype(inp)) + ) + inst.set_nodeattr("inp_shape", model.get_tensor_shape(inp)) + inst.set_nodeattr( + "out_dtype", str(model.get_tensor_datatype(out)) + ) + inst.set_nodeattr("out_shape", model.get_tensor_shape(out)) + # Consider the graph to be modified, triggering exhaustive + # re-application of this transformation + graph_modified = True + # Exiting here triggers type and shape inference and cleanup + # after each transformed node. This helps QONNX to behave + # better/more consistent in certain cases... + break + # Re-do shape and data type annotations after potential changes to the + # model graph + model = model.transform(InferShapes()) + model = model.transform(InferDataTypes()) + # Return the transformed model and indicate whether the graph actually + # has been transformed + return model, graph_modified + + +# Converts the Unsqueeze operation to the corresponding FINN custom operation +class InferUnsqueeze(Transformation): + # Applies the transform to a whole model graph + def apply(self, model: ModelWrapper): # noqa + # Get the model graph out of the model wrapper object + graph = model.graph + # Keep track of whether the graph has been modified + graph_modified = False + # Iterate all nodes in the graph keeping track of the index + for index, node in enumerate(graph.node): + # Handles Squeeze ONNX operations + if node.op_type == "Unsqueeze": + # Skip already converted nodes # noqa: Duplicate + if node.domain == "finn.custom_op.fpgadataflow": + # Skip without warning + continue + # Transplant this operator into our FINN domain + node.domain = "finn.custom_op.fpgadataflow" + # Now we can get the CustomOp wrapper instance providing easier + # attribute access + inst: HWCustomOp = getCustomOp(node) + # Set the backend attribute to mark this an operation supported + # to be implemented on an FPGA by FINN + inst.set_nodeattr("backend", "fpgadataflow") + # Ge the input and output tensor names + inp, out = node.input[0], node.output[0] + # Set input/output shape and datatype node attributes required + # by FINN custom op + inst.set_nodeattr( + "inp_dtype", str(model.get_tensor_datatype(inp)) + ) + inst.set_nodeattr("inp_shape", model.get_tensor_shape(inp)) + inst.set_nodeattr( + "out_dtype", str(model.get_tensor_datatype(out)) + ) + inst.set_nodeattr("out_shape", model.get_tensor_shape(out)) + # Consider the graph to be modified, triggering exhaustive + # re-application of this transformation + graph_modified = True + # Exiting here triggers type and shape inference and cleanup + # after each transformed node. This helps QONNX to behave + # better/more consistent in certain cases... + break + # Re-do shape and data type annotations after potential changes to the + # model graph + model = model.transform(InferShapes()) + model = model.transform(InferDataTypes()) + # Return the transformed model and indicate whether the graph actually + # has been transformed + return model, graph_modified diff --git a/src/finn/transformation/fpgadataflow/set_folding.py b/src/finn/transformation/fpgadataflow/set_folding.py index eaee499e6a..4532cda4fd 100644 --- a/src/finn/transformation/fpgadataflow/set_folding.py +++ b/src/finn/transformation/fpgadataflow/set_folding.py @@ -106,6 +106,8 @@ def apply(self, model): "GlobalAccPool_hls", "Thresholding_hls", "Thresholding_rtl", + "Squeeze_hls", + "Unsqueeze_hls" ] # these ops use SIMD parallelism, up to a max value of NumChannels # ConvolutionInputGenerator* has a special case when depthwise=1 diff --git a/src/finn/transformation/streamline/reorder.py b/src/finn/transformation/streamline/reorder.py index 9a7e9d0723..b331c35686 100644 --- a/src/finn/transformation/streamline/reorder.py +++ b/src/finn/transformation/streamline/reorder.py @@ -33,6 +33,7 @@ from onnx import TensorProto from onnx import helper as oh from qonnx.core.datatype import DataType +from qonnx.core.modelwrapper import ModelWrapper from qonnx.core.onnx_exec import execute_node from qonnx.custom_op.registry import getCustomOp from qonnx.transformation.base import Transformation @@ -1258,3 +1259,130 @@ def apply(self, model): class MoveTransposePastJoinAdd(MoveIdenticalOpPastJoinOp): def __init__(self): super().__init__(["Transpose"], ["Add"]) + + +# Moves a Squeeze operation past MultiThresholds +# TODO: extend to all operations invariant to or compatible with squeezing +class MoveSqueezePastMultiThreshold(Transformation): + # Applies the transform to a whole model graph + def apply(self, model: ModelWrapper): # noqa + # Get the model graph out of the model wrapper object + graph = model.graph + # Keep track of whether the graph has been modified + graph_modified = False + # Iterate all nodes in the graph keeping track of the index + for index, node in enumerate(graph.node): + # Applies to Squeeze operation types + if node.op_type == "Squeeze": + # Currently does not handle fork- or join-nodes + if model.is_fork_node(node) or model.is_join_node(node): + # Softly skip this node + continue + # As this is not a fork-node, there can be at most one successor + successor = model.find_direct_successors(node) + # If Squeeze is the final operation in the graph, there might + # be no successor + if successor is None: + # Softly skip this node + continue + # Now there is exactly one successor which needs to be extracted + # from the list + successor = successor[0] + # Applies to MultiThreshold + if successor.op_type in {"MultiThreshold"}: + # Get names of all tensors involved in connecting the nodes + inp = node.input[0] # noqa: Duplicate + mid = node.output[0] + out = successor.output[0] + # Rewire the graph to feed original into the MultiThreshold + # node first + successor.input[0] = inp + # Repurpose the middle tensor for the output of the + # MultiThreshold + successor.output[0] = mid + # The Squeeze operator now gets the middle tensor as its + # input + node.input[0] = mid + # Squeeze now produces the original output tensor + node.output[0] = out + # Delete the shape annotation of the connecting tensors + # to be re-done later + model.set_tensor_shape(mid, None) + model.set_tensor_shape(out, None) + # Track whether the graph has been modified, never + # resets to False + graph_modified = True + # Break the loop after deleting shape annotations to + # immediately re-do these before changing the next + # operator + break + # Need to redo the shape inference after potentially deleting them + model = model.transform(InferShapes()) # noqa: Shadows model + # Return the transformed model and indicate whether the graph + # actually has been transformed + return model, graph_modified + + +# Moves a Squeeze operation past MatMul +# TODO: extend to all operations invariant to or compatible with squeezing +class MoveSqueezePastMatMul(Transformation): + # Applies the transform to a whole model graph + def apply(self, model: ModelWrapper): # noqa + # Get the model graph out of the model wrapper object + graph = model.graph + # Keep track of whether the graph has been modified + graph_modified = False + # Iterate all nodes in the graph keeping track of the index + for index, node in enumerate(graph.node): + # Applies to Squeeze operation types + if node.op_type == "Squeeze": + # Currently does not handle fork- or join-nodes + if model.is_fork_node(node) or model.is_join_node(node): + # Softly skip this node + continue + # As this is not a fork-node, there can be at most one successor + successor = model.find_direct_successors(node) + # If Squeeze is the final operation in the graph, there might + # be no successor + if successor is None: + # Softly skip this node + continue + # Now there is exactly one successor which needs to be extracted + # from the list + successor = successor[0] + # Applies to MatMul + # TODO: Check behavior for multi-dimensional and potentially + # broadcasting MatMuls... + if successor.op_type in {"MatMul"}: + # Get names of all tensors involved in # noqa: Duplicate + # connecting the nodes + inp = node.input[0] # noqa: Duplicate + mid = node.output[0] + out = successor.output[0] + # Rewire the graph to feed original into the MultiThreshold + # node first + successor.input[0] = inp + # Repurpose the middle tensor for the output of the + # MultiThreshold + successor.output[0] = mid + # The Squeeze operator now gets the middle tensor as its + # input + node.input[0] = mid + # Squeeze now produces the original output tensor + node.output[0] = out + # Delete the shape annotation of the connecting tensors + # to be re-done later + model.set_tensor_shape(mid, None) + model.set_tensor_shape(out, None) + # Track whether the graph has been modified, never + # resets to False + graph_modified = True + # Break the loop after deleting shape annotations to + # immediately re-do these before changing the next + # operator + break + # Need to redo the shape inference after potentially deleting them + model = model.transform(InferShapes()) # noqa: Shadows model + # Return the transformed model and indicate whether the graph + # actually has been transformed + return model, graph_modified diff --git a/tests/fpgadataflow/test_squeeze.py b/tests/fpgadataflow/test_squeeze.py new file mode 100644 index 0000000000..0ba7f61836 --- /dev/null +++ b/tests/fpgadataflow/test_squeeze.py @@ -0,0 +1,392 @@ +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Testing framework +import pytest + +# Numpy math and arrays +import numpy as np + +# Create temporary files automatically deleted after integration test +import tempfile + +# PyTorch required for integration test +import torch + +# Export brevitas models to QONNX representation in integration test +from brevitas.export import export_qonnx + +# ONNX graph and tensor utility +from onnx import TensorProto +from onnx import helper as oh + +# QONNX/FINN datatypes +from qonnx.core.datatype import DataType + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Execute onnx model graphs +from qonnx.core.onnx_exec import execute_onnx + +# Registry of all QONNX CustomOps +from qonnx.custom_op.registry import getCustomOp + +# Cleanup transformations required after QONNX model import +from qonnx.transformation.general import ( + ApplyConfig, + GiveReadableTensorNames, + GiveUniqueNodeNames, + GiveUniqueParameterTensors, + RemoveUnusedTensors, +) + +# Adds data layout annotations to the model graph to correctly convert +# quantizers to multi-thresholds +from qonnx.transformation.infer_data_layouts import InferDataLayouts + +# QONNX graph transformations for inferring datatypes and shapes +from qonnx.transformation.infer_datatypes import InferDataTypes +from qonnx.transformation.infer_shapes import InferShapes + +# Utility for wrapping onnx graphs and generating tensor of FINN datatypes +from qonnx.util.basic import gen_finn_dt_tensor, qonnx_make_model + +# FINN graph transformations for preparing simulation (cppsim or rtlsim) +from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim + +# Mapping to hardware operators of the operations relevant for the +# integration test +# Note: The integration test serves as the test-case for InferSqueeze +from finn.transformation.fpgadataflow.convert_to_hw_layers import InferSqueeze + +# Synthesizes HLS code generated from an operator to IP block +from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP + +# Transformations preparing the operators for C++ and RTL simulation +from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim +from finn.transformation.fpgadataflow.prepare_ip import PrepareIP +from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim +from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode +from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers + +# Converts between QONNX and FINN dialect of ONNX representation +from finn.transformation.qonnx.convert_qonnx_to_finn import ConvertQONNXtoFINN + +# Standard set of streamlining transformations delivered with FINN +from finn.transformation.streamline import Streamline + +# Checks whether a node is a fpgadataflow backend node handled by FINN +from finn.util.fpgadataflow import is_fpgadataflow_node + + +# Specializes all nodes to be implemented as HLS backend +def specialize_hls(model: ModelWrapper): + # Mark all nodes to be specialized as HLS backend implementations + for node in model.graph.node: # noqa: Duplicate test setup code + # Skip non-fpgadataflow backend operators as these do not have the + # preferred_impl_style attribute + if is_fpgadataflow_node(node): + # Get the CustomOp instance of the node to get access to the node + # attributes + inst = getCustomOp(node) + # Note: only HLS-based layers execute C++ Simulation + inst.set_nodeattr("preferred_impl_style", "hls") + # Turn all HWCustomOp layers into HLS specializations + return model.transform(SpecializeLayers("xczu7ev-ffvc1156-2-e")) + + +# Creates a dummy model for testing the Squeeze operation +def mock_squeeze(axes, inp_dtype, out_dtype, inp_shape, out_shape, pe): + # Create a node representing the squeeze operation + node = oh.make_node( + # Operator type from the name of the fpgadataflow hlscustomop + op_type="Squeeze", + # Specify the domain, i.e., the package to look for the custom operator + # implementation + domain="finn.custom_op.fpgadataflow", + # Execution backend: Required attribute inherited from HLSCustomOp + backend="fpgadataflow", + # Just one input + inputs=["inp"], + # Enumerate the outputs + outputs=["out"], + # Axes to be squeezed + axes=axes, + # Data type of the input elements + inp_dtype=inp_dtype, + # Data type of the output elements + out_dtype=inp_dtype, + # Shape of the input + inp_shape=inp_shape, + # Shape of the output + out_shape=out_shape, + # Number of elements to process in parallel + PE=pe, + ) + # Construct the input tensor value infos + inp = oh.make_tensor_value_info("inp", TensorProto.FLOAT, inp_shape) + # Construct output tensor value infos + out = oh.make_tensor_value_info("out", TensorProto.FLOAT, out_shape) + # Create a graph connecting the node to the inputs and outputs + graph = oh.make_graph([node], inputs=[inp], outputs=[out], name="squeeze") + # Wrap the ONNX graph in QONNX model wrapper + model = ModelWrapper( + qonnx_make_model(graph, producer_name="squeeze") + ) + + # Add datatype annotation to the value info of input tensors + model.set_tensor_datatype("inp", DataType[inp_dtype]) + model.set_tensor_datatype("out", DataType[out_dtype]) + + # Return the wrapped onnx model + return model + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [None, (1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 1, 7, 1] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_squeeze_python(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the squeezed output shape + out_shape = np.squeeze(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_squeeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + + # Set model execution mode to python simulation + model = model.transform(SetExecMode("python")) + model = model.transform(GiveUniqueNodeNames()) + + # Compute ground-truth output in software + o_expected = np.squeeze(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [None, (1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 1, 7, 1] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_squeeze_cppsim(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the squeezed output shape + out_shape = np.squeeze(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_squeeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + # Specializes all nodes to be implemented as HLS backend + model = specialize_hls(model) + + # Set model execution mode to C++ simulation + model = model.transform(SetExecMode("cppsim")) + # Generates the C++ source and compiles the C++ simulation + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareCppSim()) + model = model.transform(CompileCppSim()) + + # Compute ground-truth output in software + o_expected = np.squeeze(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [None, (1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 1, 7, 1] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_squeeze_rtlsim(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the squeezed output shape + out_shape = np.squeeze(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_squeeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + # Specializes all nodes to be implemented as HLS backend + model = specialize_hls(model) + + # Set model execution mode to RTL simulation + model = model.transform(SetExecMode("rtlsim")) + # Generates the C++ source and compiles the RTL simulation + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareIP("xczu7ev-ffvc1156-2-e", 10)) # noqa + model = model.transform(HLSSynthIP()) + model = model.transform(PrepareRTLSim()) + + # Compute ground-truth output in software + o_expected = np.squeeze(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [1, 2], [2, 1, 4], [3, 1, 4], +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1, 2]) +def test_integration_squeeze(inp_shape, pe): + # PyTorch model wrapping the component(s) to be tested + class Dummy(torch.nn.Module): + # Sets up the test model and initializes parameters + def __init__(self): + # Initialize the PyTorch Module superclass + super().__init__() + + # Model forward squeezing the input + def forward(self, x): # noqa: Forward may be static... + return torch.squeeze(x) + + # Create the test instance of the dummy model + model = Dummy() + # Create dummy test inputs + inp = torch.randn(*inp_shape) + # Do a forward pass with model in training mode to calibrate the quantizers + _ = model(inp) + # Switch model to evaluation mode to keep parameters fixed for export + model = model.eval() + # Do not accumulate gradients while generating test output + with torch.no_grad(): + # Model forward pass generating the expected output for verification + out_expected = model(inp).numpy().astype(np.float32) + # Generate a temporary directory for running this test + with tempfile.TemporaryDirectory() as tmp: + # Export the model to ONNX format to be consumed by FINN + export_qonnx(model, (inp, ), tmp + "/model.onnx") # noqa: Duplicate + # Wrap the model with QONNX wrapper for transformations + model = ModelWrapper(tmp + "/model.onnx") # noqa: Duplicate + # Cleanup transformations preparing the model to be consumed by FINN + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + model = model.transform(InferDataLayouts()) + model = model.transform(ConvertQONNXtoFINN()) + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(GiveUniqueParameterTensors()) + model = model.transform(GiveReadableTensorNames()) + model = model.transform(RemoveUnusedTensors()) + # Do a single round of standard streamlining of the model graph + model = model.transform(Streamline()) + # Convert layers to hardware custom operations + model = model.transform(InferSqueeze()) + + # Apply folding config to set the PE parallelism for hardware layers + model = model.transform(ApplyConfig({ # noqa: Duplicate test code + "Defaults": {"PE": [pe, ["Squeeze"]]} + })) + + # Prepare the execution context with dummy data from above and input + # node names extracted from transformed modelo graph + context = { # noqa: Duplicate + model.graph.input[0].name: inp.numpy().astype(np.float32) + } + + # Set model execution mode to python simulation + model = model.transform(SetExecMode("python")) # noqa: Duplicate + model = model.transform(GiveUniqueNodeNames()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "Python simulation verification failed" + + # # Specializes all nodes to their backend implementation + model = model.transform(SpecializeLayers("xczu7ev-ffvc1156-2-e")) + + # Set model execution mode to C++ simulation + model = model.transform(SetExecMode("cppsim")) + model = model.transform(GiveUniqueNodeNames()) + # Generates the C++ source and compiles the C++ simulation + model = model.transform(PrepareCppSim()) + model = model.transform(CompileCppSim()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "C++ simulation verification failed" + + # Set model execution mode to RTL simulation + model = model.transform(SetExecMode("rtlsim")) + model = model.transform(GiveUniqueNodeNames()) + # Generates the C++ source and compiles the RTL simulation + model = model.transform(PrepareIP("xczu7ev-ffvc1156-2-e", 10)) # noqa + model = model.transform(HLSSynthIP()) + model = model.transform(PrepareRTLSim()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "RTL simulation verification failed" diff --git a/tests/fpgadataflow/test_unsqueeze.py b/tests/fpgadataflow/test_unsqueeze.py new file mode 100644 index 0000000000..7335b64a93 --- /dev/null +++ b/tests/fpgadataflow/test_unsqueeze.py @@ -0,0 +1,394 @@ +# fmt: off +# Disable formatter. This is deliberately formatted to stay within 80 characters +# per line. Black, however, formats some lines going beyond this. + +# Testing framework +import pytest + +# Numpy math and arrays +import numpy as np + +# Create temporary files automatically deleted after integration test +import tempfile + +# PyTorch required for integration test +import torch + +# Export brevitas models to QONNX representation in integration test +from brevitas.export import export_qonnx + +# ONNX graph and tensor utility +from onnx import TensorProto +from onnx import helper as oh + +# QONNX/FINN datatypes +from qonnx.core.datatype import DataType + +# QONNX wrapper to ONNX model graphs +from qonnx.core.modelwrapper import ModelWrapper + +# Execute onnx model graphs +from qonnx.core.onnx_exec import execute_onnx + +# Registry of all QONNX CustomOps +from qonnx.custom_op.registry import getCustomOp + +# Cleanup transformations required after QONNX model import +from qonnx.transformation.general import ( + ApplyConfig, + GiveReadableTensorNames, + GiveUniqueNodeNames, + GiveUniqueParameterTensors, + RemoveUnusedTensors, +) + +# Adds data layout annotations to the model graph to correctly convert +# quantizers to multi-thresholds +from qonnx.transformation.infer_data_layouts import InferDataLayouts + +# QONNX graph transformations for inferring datatypes and shapes +from qonnx.transformation.infer_datatypes import InferDataTypes +from qonnx.transformation.infer_shapes import InferShapes + +# Utility for wrapping onnx graphs and generating tensor of FINN datatypes +from qonnx.util.basic import gen_finn_dt_tensor, qonnx_make_model + +# FINN graph transformations for preparing simulation (cppsim or rtlsim) +from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim + +# Mapping to hardware operators of the two operations relevant for the +# integration test +# Note: The integration test serves as the test-case for InferUnsqueeze +from finn.transformation.fpgadataflow.convert_to_hw_layers import InferUnsqueeze + +# Synthesizes HLS code generated from an operator to IP block +from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP + +# Transformations preparing the operators for C++ and RTL simulation +from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim +from finn.transformation.fpgadataflow.prepare_ip import PrepareIP +from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim +from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode +from finn.transformation.fpgadataflow.specialize_layers import SpecializeLayers + +# Converts between QONNX and FINN dialect of ONNX representation +from finn.transformation.qonnx.convert_qonnx_to_finn import ConvertQONNXtoFINN + +# Standard set of streamlining transformations delivered with FINN +from finn.transformation.streamline import Streamline + +# Checks whether a node is a fpgadataflow backend node handled by FINN +from finn.util.fpgadataflow import is_fpgadataflow_node + + +# Specializes all nodes to be implemented as HLS backend +def specialize_hls(model: ModelWrapper): + # Mark all nodes to be specialized as HLS backend implementations + for node in model.graph.node: # noqa: Duplicate test setup code + # Skip non-fpgadataflow backend operators as these do not have the + # preferred_impl_style attribute + if is_fpgadataflow_node(node): + # Get the CustomOp instance of the node to get access to the node + # attributes + inst = getCustomOp(node) + # Note: only HLS-based layers execute C++ Simulation + inst.set_nodeattr("preferred_impl_style", "hls") + # Turn all HWCustomOp layers into HLS specializations + return model.transform(SpecializeLayers("xczu7ev-ffvc1156-2-e")) + + +# Creates a dummy model for testing the Unsqueeze operation +def mock_unsqueeze(axes, inp_dtype, out_dtype, inp_shape, out_shape, pe): + # Create a node representing the unsqueeze operation + node = oh.make_node( + # Operator type from the name of the fpgadataflow hlscustomop + op_type="Unsqueeze", + # Specify the domain, i.e., the package to look for the custom operator + # implementation + domain="finn.custom_op.fpgadataflow", + # Execution backend: Required attribute inherited from HLSCustomOp + backend="fpgadataflow", + # Just one input + inputs=["inp"], + # Enumerate the outputs + outputs=["out"], + # Axes to be squeezed + axes=axes, + # Data type of the input elements + inp_dtype=inp_dtype, + # Data type of the output elements + out_dtype=inp_dtype, + # Shape of the input + inp_shape=inp_shape, + # Shape of the output + out_shape=out_shape, + # Number of elements to process in parallel + PE=pe, + ) + # Construct the input tensor value infos + inp = oh.make_tensor_value_info("inp", TensorProto.FLOAT, inp_shape) + # Construct output tensor value infos + out = oh.make_tensor_value_info("out", TensorProto.FLOAT, out_shape) + # Create a graph connecting the node to the inputs and outputs + graph = oh.make_graph([node], inputs=[inp], outputs=[out], name="unsqueeze") + # Wrap the ONNX graph in QONNX model wrapper + model = ModelWrapper( + qonnx_make_model(graph, producer_name="unsqueeze") + ) + + # Add datatype annotation to the value info of input tensors + model.set_tensor_datatype("inp", DataType[inp_dtype]) + model.set_tensor_datatype("out", DataType[out_dtype]) + + # Return the wrapped onnx model + return model + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [(1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 7] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_unsqueeze_python(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the unsqueezed output shape + out_shape = np.expand_dims(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_unsqueeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { # noqa: Duplicate test setup + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + + # Set model execution mode to python simulation + model = model.transform(SetExecMode("python")) + model = model.transform(GiveUniqueNodeNames()) + + # Compute ground-truth output in software + o_expected = np.expand_dims(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [(1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 7] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_unsqueeze_cppsim(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the unsqueezed output shape + out_shape = np.expand_dims(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_unsqueeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { # noqa: Duplicate test setup + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + # Specializes all nodes to be implemented as HLS backend + model = specialize_hls(model) + + # Set model execution mode to C++ simulation + model = model.transform(SetExecMode("cppsim")) + # Generates the C++ source and compiles the C++ simulation + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareCppSim()) + model = model.transform(CompileCppSim()) + + # Compute ground-truth output in software + o_expected = np.expand_dims(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Axes to be squeezed +@pytest.mark.parametrize( # noqa: Duplicate test setup + "axes", [(1,), (1, 3), (-1,)] +) +# Data type of the input elements +@pytest.mark.parametrize("inp_dtype", ["INT8"]) +@pytest.mark.parametrize("out_dtype", ["INT8"]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [3, 1, 7, 1] +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1]) +def test_unsqueeze_rtlsim(axes, inp_dtype, out_dtype, inp_shape, pe): + # Derive the unsqueezed output shape + out_shape = np.expand_dims(np.zeros(inp_shape), axis=axes).shape # noqa + # Make dummy model for testing + model = mock_unsqueeze( # noqa: Duplicate test setup + axes, inp_dtype, out_dtype, inp_shape, out_shape, pe + ) + # Prepare the execution context + context = { # noqa: Duplicate test setup + "inp": gen_finn_dt_tensor(DataType[inp_dtype], inp_shape), + } + + # Test running shape and data type inference on the model graph + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + # Specializes all nodes to be implemented as HLS backend + model = specialize_hls(model) + + # Set model execution mode to RTL simulation + model = model.transform(SetExecMode("rtlsim")) + # Generates the C++ source and compiles the RTL simulation + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(PrepareIP("xczu7ev-ffvc1156-2-e", 10)) # noqa + model = model.transform(HLSSynthIP()) + model = model.transform(PrepareRTLSim()) + + # Compute ground-truth output in software + o_expected = np.expand_dims(context["inp"], axes) + # Execute the onnx model to collect the result + o_produced = execute_onnx(model, context)["out"] + + # Compare the expected to the produced for exact equality + assert np.all(o_produced == o_expected) + # Compare the produced shape to the expected squeezed shape + assert o_produced.shape == out_shape + + +# Axis to unsqueeze +@pytest.mark.parametrize("axis", [0, 1]) +# Shape of the input +@pytest.mark.parametrize("inp_shape", [ + [1, 2], [2, 1, 4], [3, 1, 4], +]) +# Number of elements to process in parallel +@pytest.mark.parametrize("pe", [1, 2]) +def test_integration_unsqueeze(axis, inp_shape, pe): + # PyTorch model wrapping the component(s) to be tested + class Dummy(torch.nn.Module): + # Sets up the test model and initializes parameters + def __init__(self): + # Initialize the PyTorch Module superclass + super().__init__() + + # Model forward squeezing the input + def forward(self, x): # noqa: Forward may be static... + return torch.unsqueeze(x, dim=axis) + + # Create the test instance of the dummy model + model = Dummy() + # Create dummy test inputs + inp = torch.randn(*inp_shape) + # Do a forward pass with model in training mode to calibrate the quantizers + _ = model(inp) + # Switch model to evaluation mode to keep parameters fixed for export + model = model.eval() + # Do not accumulate gradients while generating test output + with torch.no_grad(): + # Model forward pass generating the expected output for verification + out_expected = model(inp).numpy().astype(np.float32) + # Generate a temporary directory for running this test + with tempfile.TemporaryDirectory() as tmp: + # Export the model to ONNX format to be consumed by FINN + export_qonnx(model, (inp,), tmp + "/model.onnx") # noqa: Duplicate + # Wrap the model with QONNX wrapper for transformations + model = ModelWrapper(tmp + "/model.onnx") + # Cleanup transformations preparing the model to be consumed by FINN + model = model.transform(InferDataTypes()) + model = model.transform(InferShapes()) + model = model.transform(InferDataLayouts()) + model = model.transform(ConvertQONNXtoFINN()) + model = model.transform(GiveUniqueNodeNames()) + model = model.transform(GiveUniqueParameterTensors()) + model = model.transform(GiveReadableTensorNames()) + model = model.transform(RemoveUnusedTensors()) + # Do a single round of standard streamlining of the model graph + model = model.transform(Streamline()) + # Convert layers to hardware custom operations + model = model.transform(InferUnsqueeze()) + + # Apply folding config to set the PE parallelism for hardware layers + model = model.transform(ApplyConfig({ # noqa: Duplicate test code + "Defaults": {"PE": [pe, ["Unsqueeze"]]} + })) + + # Prepare the execution context with dummy data from above and input + # node names extracted from transformed modelo graph + context = { # noqa: Duplicate + model.graph.input[0].name: inp.numpy().astype(np.float32) + } + + # Set model execution mode to python simulation + model = model.transform(SetExecMode("python")) # noqa: Duplicate + model = model.transform(GiveUniqueNodeNames()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "Python simulation verification failed" + + # # Specializes all nodes to their backend implementation + model = model.transform(SpecializeLayers("xczu7ev-ffvc1156-2-e")) + + # Set model execution mode to C++ simulation + model = model.transform(SetExecMode("cppsim")) + model = model.transform(GiveUniqueNodeNames()) + # Generates the C++ source and compiles the C++ simulation + model = model.transform(PrepareCppSim()) + model = model.transform(CompileCppSim()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "C++ simulation verification failed" + + # Set model execution mode to RTL simulation + model = model.transform(SetExecMode("rtlsim")) + model = model.transform(GiveUniqueNodeNames()) + # Generates the C++ source and compiles the RTL simulation + model = model.transform(PrepareIP("xczu7ev-ffvc1156-2-e", 10)) # noqa + model = model.transform(HLSSynthIP()) + model = model.transform(PrepareRTLSim()) + # Execute the onnx model to collect the result + out_produced = execute_onnx(model, context)[model.graph.output[0].name] + # Compare the expected to the produced + # Note: Only test for close up to some tolerance as the model has been + # streamlined, which may involve rounding + assert np.allclose(out_produced, out_expected, atol=1e-3), \ + "RTL simulation verification failed"