diff --git a/CMakeLists.txt b/CMakeLists.txt index bd72356f..6d1c7e77 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -101,6 +101,7 @@ kompute_option(KOMPUTE_OPT_ANDROID_BUILD "Enable android compilation flags requi kompute_option(KOMPUTE_OPT_DISABLE_VK_DEBUG_LAYERS "Explicitly disable debug layers even on debug." OFF) kompute_option(KOMPUTE_OPT_DISABLE_VULKAN_VERSION_CHECK "Whether to check if your driver supports the Vulkan Header version you are linking against. This might be useful in case you build shared on a different system than you run later." OFF) kompute_option(KOMPUTE_OPT_BUILD_SHADERS "Rebuilds all compute shaders during compilation and does not use the already precompiled versions. Requires glslangValidator to be installed on your system." OFF) +kompute_option(KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE "Enable manager-owned mutex protection around sequence queue submissions." OFF) # External components kompute_option(KOMPUTE_OPT_USE_BUILT_IN_SPDLOG "Use the built-in version of Spdlog. Requires 'KOMPUTE_OPT_USE_SPDLOG' to be set to ON in order to have any effect." ON) diff --git a/src/Manager.cpp b/src/Manager.cpp index b5facd0c..e5a56d0e 100644 --- a/src/Manager.cpp +++ b/src/Manager.cpp @@ -55,6 +55,10 @@ Manager::Manager(uint32_t physicalDeviceIndex, { this->mManageResources = true; +#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE + this->mSequenceSubmitMutex = std::make_shared(); +#endif + // Make sure the logger is setup #if !KOMPUTE_OPT_LOG_LEVEL_DISABLED logger::setupLogger(); @@ -71,6 +75,10 @@ Manager::Manager(std::shared_ptr instance, { this->mManageResources = false; +#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE + this->mSequenceSubmitMutex = std::make_shared(); +#endif + this->mInstance = instance; this->mPhysicalDevice = physicalDevice; this->mDevice = device; @@ -495,12 +503,18 @@ Manager::sequence(uint32_t queueIndex, uint32_t totalTimestamps) { KP_LOG_DEBUG("Kompute Manager sequence() with queueIndex: {}", queueIndex); + std::shared_ptr submitMutex = nullptr; +#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE + submitMutex = this->mSequenceSubmitMutex; +#endif + std::shared_ptr sq{ new kp::Sequence( - this->mPhysicalDevice, - this->mDevice, - this->mComputeQueues[queueIndex], - this->mComputeQueueFamilyIndices[queueIndex], - totalTimestamps) }; + this->mPhysicalDevice, + this->mDevice, + this->mComputeQueues[queueIndex], + this->mComputeQueueFamilyIndices[queueIndex], + totalTimestamps, + submitMutex) }; if (this->mManageResources) { this->mManagedSequences.push_back(sq); diff --git a/src/Sequence.cpp b/src/Sequence.cpp index d8063e37..120f9c99 100644 --- a/src/Sequence.cpp +++ b/src/Sequence.cpp @@ -8,7 +8,8 @@ Sequence::Sequence(std::shared_ptr physicalDevice, std::shared_ptr device, std::shared_ptr computeQueue, uint32_t queueIndex, - uint32_t totalTimestamps) noexcept + uint32_t totalTimestamps, + std::shared_ptr submitMutex) noexcept { KP_LOG_DEBUG("Kompute Sequence Constructor with existing device & queue"); @@ -17,6 +18,7 @@ Sequence::Sequence(std::shared_ptr physicalDevice, this->mComputeQueue = computeQueue; this->mQueueIndex = queueIndex; this->mFence = this->mDevice->createFence(vk::FenceCreateInfo()); + this->mSubmitMutex = submitMutex; this->createCommandPool(); this->createCommandBuffer(); @@ -133,11 +135,22 @@ Sequence::evalAsync() this->mDevice->resetFences({ this->mFence }); - this->mComputeQueue->submit(1, &submitInfo, this->mFence); + this->submitCommandBuffer(submitInfo); return shared_from_this(); } +void +Sequence::submitCommandBuffer(const vk::SubmitInfo& submitInfo) +{ + if (this->mSubmitMutex) { + std::lock_guard submitLock(*this->mSubmitMutex); + this->mComputeQueue->submit(1, &submitInfo, this->mFence); + } else { + this->mComputeQueue->submit(1, &submitInfo, this->mFence); + } +} + std::shared_ptr Sequence::evalAsync(std::shared_ptr op) { diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp index cc02df80..d76b10d0 100644 --- a/src/include/kompute/Manager.hpp +++ b/src/include/kompute/Manager.hpp @@ -7,6 +7,10 @@ #include "kompute/Sequence.hpp" #include "logger/Logger.hpp" +#ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE +#include +#endif + #define KP_DEFAULT_SESSION "DEFAULT" namespace kp { @@ -546,6 +550,10 @@ class Manager std::vector mComputeQueueFamilyIndices; std::vector> mComputeQueues; + #ifdef KOMPUTE_OPT_THREAD_SAFE_COMPUTE_QUEUE + std::shared_ptr mSequenceSubmitMutex = nullptr; + #endif + bool mManageResources = false; #ifndef KOMPUTE_DISABLE_VK_DEBUG_LAYERS diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index 169321c7..bea23fc2 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -5,6 +5,8 @@ #include "kompute/operations/OpAlgoDispatch.hpp" #include "kompute/operations/OpBase.hpp" +#include +#include namespace kp { @@ -28,7 +30,8 @@ class Sequence : public std::enable_shared_from_this std::shared_ptr device, std::shared_ptr computeQueue, uint32_t queueIndex, - uint32_t totalTimestamps = 0) noexcept; + uint32_t totalTimestamps = 0, + std::shared_ptr submitMutex = nullptr) noexcept; /** * @brief Make Sequence uncopyable @@ -293,11 +296,20 @@ class Sequence : public std::enable_shared_from_this vk::Fence mFence; std::vector> mOperations{}; std::shared_ptr timestampQueryPool = nullptr; + std::shared_ptr mSubmitMutex = nullptr; // State bool mRecording = false; bool mIsRunning = false; + /** + * Submits the command buffer to the compute queue, acquiring the submit + * mutex beforehand if one was provided. + * + * @param submitInfo The Vulkan submit info to pass to the queue. + */ + void submitCommandBuffer(const vk::SubmitInfo& submitInfo); + private: // Create functions void createCommandPool();