Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions docs/overview/advanced-examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ Async/Await Example

A simple example of asynchronous submission can be found below.

You can also use async submissions with Vulkan semaphores to synchronize
Kompute-generated submits with user-managed queue submits.

First we are able to create the manager as we normally would.

.. code-block:: cpp
Expand Down Expand Up @@ -233,15 +236,25 @@ The parameter provided is the maximum amount of time to wait in nanoseconds. Whe

auto sq = mgr.sequence();

// Run Async Kompute operation on the parameters provided
sq->evalAsync<kp::OpAlgoDispatch>(algo);
// Optional: pass submit-level synchronization primitives so this submit
// waits/signals alongside user-managed queue work
std::vector<vk::Semaphore> waitSemaphores = { externalWaitSemaphore };
std::vector<vk::PipelineStageFlags> waitDstStageMasks = {
vk::PipelineStageFlagBits::eComputeShader
};
std::vector<vk::Semaphore> signalSemaphores = { externalSignalSemaphore };
auto opAlgo = std::make_shared<kp::OpAlgoDispatch>(algo);
sq->evalAsync(opAlgo, waitSemaphores, waitDstStageMasks, signalSemaphores);

// Here we can do other work

// When we're ready we can wait
// When we're ready we can wait
// The default wait time is UINT64_MAX
sq->evalAwait();

``evalAwait()`` must be called before invoking ``evalAsync()`` again on the
same ``Sequence``.


Finally, below you can see that we can also run syncrhonous commands without having to change anything.

Expand Down
2 changes: 1 addition & 1 deletion docs/overview/custom-operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Below you
* - preEval()
- When the Sequence is Evaluated this preEval is called across all operations before dispatching the batch of recorded commands to the GPU. This is useful for example if you need to copy data from local to host memory.
* - postEval()
- After the sequence is Evaluated this postEval is called across all operations. When running asynchronously the postEval is called when you call `evalAwait()`, which is why it's important to always run evalAwait() to ensure the process doesn't go into inconsistent state.
- After the sequence is Evaluated this postEval is called across all operations. In asynchronous flows postEval is called when you run `evalAwait()`, and `evalAwait()` must be called before triggering `evalAsync()` again on the same sequence to avoid inconsistent state.


Simple Operation Extending OpAlgoBase
Expand Down
50 changes: 47 additions & 3 deletions src/Sequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ Sequence::eval(std::shared_ptr<OpBase> op)

std::shared_ptr<Sequence>
Sequence::evalAsync()
{
return this->evalAsync({}, {}, {});
}

std::shared_ptr<Sequence>
Sequence::evalAsync(const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores)
{
if (this->isRecording()) {
this->end();
Expand All @@ -125,8 +133,35 @@ Sequence::evalAsync()
this->mOperations[i]->preEval(*this->mCommandBuffer);
}

if (!waitDstStageMasks.empty() &&
waitSemaphores.size() != waitDstStageMasks.size()) {
throw std::runtime_error("Kompute Sequence evalAsync wait semaphore "
"count must match wait dst stage mask count");
}

std::vector<vk::PipelineStageFlags> resolvedWaitDstStageMasks =
waitDstStageMasks;
if (resolvedWaitDstStageMasks.empty() && !waitSemaphores.empty()) {
resolvedWaitDstStageMasks.resize(waitSemaphores.size(),
vk::PipelineStageFlagBits::eAllCommands);
}

const vk::Semaphore* waitSemaphoresPtr =
waitSemaphores.empty() ? nullptr : waitSemaphores.data();
const vk::PipelineStageFlags* waitDstStageMasksPtr =
resolvedWaitDstStageMasks.empty() ? nullptr
: resolvedWaitDstStageMasks.data();
const vk::Semaphore* signalSemaphoresPtr =
signalSemaphores.empty() ? nullptr : signalSemaphores.data();

vk::SubmitInfo submitInfo(
0, nullptr, nullptr, 1, this->mCommandBuffer.get());
static_cast<uint32_t>(waitSemaphores.size()),
waitSemaphoresPtr,
waitDstStageMasksPtr,
1,
this->mCommandBuffer.get(),
static_cast<uint32_t>(signalSemaphores.size()),
signalSemaphoresPtr);

KP_LOG_DEBUG(
"Kompute sequence submitting command buffer into compute queue");
Expand All @@ -140,11 +175,20 @@ Sequence::evalAsync()

std::shared_ptr<Sequence>
Sequence::evalAsync(std::shared_ptr<OpBase> op)
{
return this->evalAsync(op, {}, {}, {});
}

std::shared_ptr<Sequence>
Sequence::evalAsync(std::shared_ptr<OpBase> op,
const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores)
{
this->clear();
this->record(op);
this->evalAsync();
return shared_from_this();
return this->evalAsync(
waitSemaphores, waitDstStageMasks, signalSemaphores);
}

std::shared_ptr<Sequence>
Expand Down
60 changes: 54 additions & 6 deletions src/include/kompute/Sequence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,70 @@ class Sequence : public std::enable_shared_from_this<Sequence>

/**
* Eval Async sends all the recorded and stored operations in the vector of
* operations into the gpu as a submit job without a barrier. EvalAwait()
* must ALWAYS be called after to ensure the sequence is terminated
* correctly.
* operations into the gpu as a submit job without a barrier.
*
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @return Boolean stating whether execution was successful.
*/
std::shared_ptr<Sequence> evalAsync();
/**
* Eval Async sends all recorded operations as a submit job and allows
* submit-level GPU synchronization by providing wait and signal semaphores.
*
* This overload is useful for synchronizing Kompute submissions with
* user-managed queue submissions without forcing CPU-side synchronization.
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @param waitSemaphores Semaphores that must be signaled before this submit
* starts executing.
* @param waitDstStageMasks Pipeline stages at which to wait for each
* semaphore. If empty and waitSemaphores is not empty, defaults to
* vk::PipelineStageFlagBits::eAllCommands for each wait semaphore.
* @param signalSemaphores Semaphores that this submit will signal when it
* completes.
* @return shared_ptr<Sequence> of the Sequence class itself
*/
std::shared_ptr<Sequence> evalAsync(
const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores);
/**
* Clears currnet operations to record provided one in the vector of
* operations into the gpu as a submit job without a barrier. EvalAwait()
* must ALWAYS be called after to ensure the sequence is terminated
* correctly.
* operations into the gpu as a submit job without a barrier.
*
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @return Boolean stating whether execution was successful.
*/
std::shared_ptr<Sequence> evalAsync(std::shared_ptr<OpBase> op);
/**
* Clears current operations, records the provided one and submits with
* optional wait/signal semaphores for submit-level GPU synchronization.
*
* This overload is useful for synchronizing Kompute submissions with
* user-managed queue submissions without forcing CPU-side synchronization.
* evalAwait() must be called before invoking evalAsync() again on this same
* Sequence to complete the previous async run and reset internal state.
*
* @param op Operation to record prior to submit.
* @param waitSemaphores Semaphores that must be signaled before this submit
* starts executing.
* @param waitDstStageMasks Pipeline stages at which to wait for each
* semaphore. If empty and waitSemaphores is not empty, defaults to
* vk::PipelineStageFlagBits::eAllCommands for each wait semaphore.
* @param signalSemaphores Semaphores that this submit will signal when it
* completes.
* @return shared_ptr<Sequence> of the Sequence class itself
*/
std::shared_ptr<Sequence> evalAsync(
std::shared_ptr<OpBase> op,
const std::vector<vk::Semaphore>& waitSemaphores,
const std::vector<vk::PipelineStageFlags>& waitDstStageMasks,
const std::vector<vk::Semaphore>& signalSemaphores);
/**
* Eval sends all the recorded and stored operations in the vector of
* operations into the gpu as a submit job with a barrier.
Expand Down
60 changes: 60 additions & 0 deletions test/TestSequence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,63 @@ TEST(TestSequence, CorrectSequenceRunningError)

EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
}

TEST(TestSequence, EvalAsyncSemaphoreOverloadSupportsEmptySyncLists)
{
kp::Manager mgr;

std::shared_ptr<kp::Sequence> sq = mgr.sequence();

std::shared_ptr<kp::TensorT<float>> tensorA = mgr.tensor({ 1, 2, 3 });
std::shared_ptr<kp::TensorT<float>> tensorB = mgr.tensor({ 2, 2, 2 });
std::shared_ptr<kp::TensorT<float>> tensorOut = mgr.tensor({ 0, 0, 0 });

sq->eval<kp::OpSyncDevice>({ tensorA, tensorB, tensorOut });

std::vector<uint32_t> spirv = compileSource(R"(
#version 450

layout (local_size_x = 1) in;

layout(set = 0, binding = 0) buffer bina { float tina[]; };
layout(set = 0, binding = 1) buffer binb { float tinb[]; };
layout(set = 0, binding = 2) buffer bout { float tout[]; };

void main() {
uint index = gl_GlobalInvocationID.x;
tout[index] = tina[index] * tinb[index];
}
)");

std::shared_ptr<kp::Algorithm> algo =
mgr.algorithm({ tensorA, tensorB, tensorOut }, spirv);

sq->record<kp::OpAlgoDispatch>(algo)->record<kp::OpSyncLocal>(
{ tensorA, tensorB, tensorOut });

EXPECT_NO_THROW(sq->evalAsync({}, {}, {}));
EXPECT_NO_THROW(sq->evalAwait());

EXPECT_EQ(tensorOut->vector(), std::vector<float>({ 2, 4, 6 }));
}

TEST(TestSequence, EvalAsyncSemaphoreOverloadValidatesWaitMaskCount)
{
kp::Manager mgr;

std::shared_ptr<kp::Sequence> sq = mgr.sequence();

std::shared_ptr<kp::TensorT<float>> tensorA = mgr.tensor({ 1, 2, 3 });

sq->record<kp::OpSyncDevice>({ tensorA });

std::vector<vk::Semaphore> waitSemaphores = { vk::Semaphore{} };
std::vector<vk::PipelineStageFlags> waitDstStageMasks = {
vk::PipelineStageFlagBits::eComputeShader,
vk::PipelineStageFlagBits::eTransfer
};
std::vector<vk::Semaphore> signalSemaphores = {};

EXPECT_ANY_THROW(
sq->evalAsync(waitSemaphores, waitDstStageMasks, signalSemaphores));
}