Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions Src/Particle/AMReX_ParticleCommunication.H
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
#include <AMReX_Scan.H>
#include <AMReX_TypeTraits.H>
#include <AMReX_MakeParticle.H>
#include <AMReX_ParmParse.H>

#include <map>

namespace amrex {

class ParticleContainerBase;

struct NeighborUnpackPolicy
{
template <class PTile>
Expand Down Expand Up @@ -137,6 +140,8 @@ struct ParticleCopyPlan
BL_PROFILE("ParticleCopyPlan::build");

m_local = local;
ParmParse pp("particles");
pp.query("do_one_sided_comms", m_do_one_sided_comms);

const int ngrow = 1; // note - fix

Expand Down Expand Up @@ -265,7 +270,7 @@ struct ParticleCopyPlan
m_superparticle_size += num_real_comm_comp * sizeof(typename PC::ParticleType::RealType)
+ num_int_comm_comp * sizeof(int);

buildMPIStart(pc.BufferMap(), m_superparticle_size);
buildMPIStart(pc, pc.BufferMap(), m_superparticle_size);
}

void clear ();
Expand All @@ -274,14 +279,14 @@ struct ParticleCopyPlan

private:

void buildMPIStart (const ParticleBufferMap& map, Long psize);
void buildMPIStart (const ParticleContainerBase& pc, const ParticleBufferMap& map, Long psize);

//
// Snds - a Vector with the number of bytes that is process will send to each proc.
// Rcvs - a Vector that, after calling this method, will contain the
// number of bytes this process will receive from each proc.
//
void doHandShake (const Vector<Long>& Snds, Vector<Long>& Rcvs) const;
void doHandShake (const ParticleContainerBase& pc, const Vector<Long>& Snds, Vector<Long>& Rcvs) const;

//
// In the local version of this method, each proc knows which other
Expand All @@ -294,14 +299,22 @@ private:
// In the global version, we don't know who we'll receive from, so we
// need to do some collective communication first.
//
static void doHandShakeGlobal (const Vector<Long>& Snds, Vector<Long>& Rcvs);
static void doHandShakeReduceScatter (const Vector<Long>& Snds, Vector<Long>& Rcvs);

//
// Another version of the global handshake implemented with MPI-3
// one-sided communication.
//
static void doHandShakeOneSided (const ParticleContainerBase& pc,
const Vector<Long>& Snds, Vector<Long>& Rcvs);

//
// Another version of the above that is implemented using MPI All-to-All
//
static void doHandShakeAllToAll (const Vector<Long>& Snds, Vector<Long>& Rcvs);

bool m_local;
bool m_local = false;
int m_do_one_sided_comms = 0;
};

struct GetSendBufferOffset
Expand Down
66 changes: 60 additions & 6 deletions Src/Particle/AMReX_ParticleCommunication.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <AMReX_ParticleCommunication.H>
#include <AMReX_ParticleContainerBase.H>
#include <AMReX_ParallelDescriptor.H>

namespace amrex {
Expand Down Expand Up @@ -45,7 +46,9 @@ void ParticleCopyPlan::clear ()
m_rcv_box_levs.clear();
}

void ParticleCopyPlan::buildMPIStart (const ParticleBufferMap& map, Long psize) // NOLINT(readability-convert-member-functions-to-static)
void ParticleCopyPlan::buildMPIStart (const ParticleContainerBase& pc,
const ParticleBufferMap& map,
Long psize) // NOLINT(readability-convert-member-functions-to-static)
{
BL_PROFILE("ParticleCopyPlan::buildMPIStart");

Expand Down Expand Up @@ -94,7 +97,7 @@ void ParticleCopyPlan::buildMPIStart (const ParticleBufferMap& map, Long psize)
m_NumSnds += nbytes;
}

doHandShake(m_Snds, m_Rcvs);
doHandShake(pc, m_Snds, m_Rcvs);

const int SeqNum = ParallelDescriptor::SeqNum();
Long tot_snds_this_proc = 0;
Expand Down Expand Up @@ -206,7 +209,7 @@ void ParticleCopyPlan::buildMPIStart (const ParticleBufferMap& map, Long psize)
snd_stats.resize(snd_reqs.size());
ParallelDescriptor::Waitall(snd_reqs, snd_stats);
#else
amrex::ignore_unused(map,psize);
amrex::ignore_unused(pc,map,psize);
#endif
}

Expand Down Expand Up @@ -259,11 +262,20 @@ void ParticleCopyPlan::buildMPIFinish (const ParticleBufferMap& map) // NOLINT(r
#endif // MPI
}

void ParticleCopyPlan::doHandShake (const Vector<Long>& Snds, Vector<Long>& Rcvs) const // NOLINT(readability-convert-member-functions-to-static)
void ParticleCopyPlan::doHandShake (const ParticleContainerBase& pc,
const Vector<Long>& Snds,
Vector<Long>& Rcvs) const // NOLINT(readability-convert-member-functions-to-static)
{
BL_PROFILE("ParticleCopyPlan::doHandShake");
if (m_local) { doHandShakeLocal(Snds, Rcvs); }
else { doHandShakeGlobal(Snds, Rcvs); }
else if (m_do_one_sided_comms) {
#if defined(BL_USE_MPI3)
doHandShakeOneSided(pc, Snds, Rcvs);
#else
amrex::Abort("ParticleCopyPlan::doHandShake: particles.do_one_sided_comms=1 requires MPI-3");
#endif
}
else { doHandShakeReduceScatter(Snds, Rcvs); }
}

void ParticleCopyPlan::doHandShakeLocal (const Vector<Long>& Snds, Vector<Long>& Rcvs) const // NOLINT(readability-convert-member-functions-to-static)
Expand Down Expand Up @@ -333,7 +345,7 @@ void ParticleCopyPlan::doHandShakeAllToAll (const Vector<Long>& Snds, Vector<Lon
#endif
}

void ParticleCopyPlan::doHandShakeGlobal (const Vector<Long>& Snds, Vector<Long>& Rcvs)
void ParticleCopyPlan::doHandShakeReduceScatter (const Vector<Long>& Snds, Vector<Long>& Rcvs)
{
#ifdef AMREX_USE_MPI
const int SeqNum = ParallelDescriptor::SeqNum();
Expand Down Expand Up @@ -381,6 +393,48 @@ void ParticleCopyPlan::doHandShakeGlobal (const Vector<Long>& Snds, Vector<Long>
#endif
}

void ParticleCopyPlan::doHandShakeOneSided (const ParticleContainerBase& pc,
const Vector<Long>& Snds,
Vector<Long>& Rcvs)
{
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
const int MyProc = ParallelContext::MyProcSub();
const int NProcs = ParallelContext::NProcsSub();

AMREX_ALWAYS_ASSERT(static_cast<int>(Snds.size()) == NProcs);
AMREX_ALWAYS_ASSERT(static_cast<int>(Rcvs.size()) == NProcs);

pc.ensureParticleHandshakeWindow();
auto* handshake_buffer = pc.particleHandshakeBuffer();
AMREX_ALWAYS_ASSERT(handshake_buffer != nullptr);
std::fill_n(handshake_buffer, NProcs, Long(0));

MPI_Win win = pc.particleHandshakeWindow();
BL_MPI_REQUIRE(MPI_Win_fence(0, win));

for (int i = 0; i < NProcs; ++i)
{
if (i == MyProc || Snds[i] == 0) { continue; }

BL_MPI_REQUIRE(MPI_Put(&Snds[i],
1,
ParallelDescriptor::Mpi_typemap<Long>::type(),
i,
MyProc,
1,
ParallelDescriptor::Mpi_typemap<Long>::type(),
win));
}

BL_MPI_REQUIRE(MPI_Win_fence(0, win));
std::copy_n(handshake_buffer, NProcs, Rcvs.begin());

AMREX_ASSERT(Rcvs[MyProc] == 0);
#else
amrex::ignore_unused(pc,Snds,Rcvs);
#endif
}

void communicateParticlesFinish (const ParticleCopyPlan& plan)
{
BL_PROFILE("amrex::communicateParticlesFinish");
Expand Down
21 changes: 18 additions & 3 deletions Src/Particle/AMReX_ParticleContainerBase.H
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <AMReX_Vector.H>
#include <AMReX_ParticleUtil.H>
#include <AMReX_MultiFab.H>
#include <AMReX_iMultiFab.H>
#include <AMReX_ParticleLocator.H>
#include <AMReX_DenseBins.H>

Expand Down Expand Up @@ -72,13 +73,13 @@ public:
{
}

virtual ~ParticleContainerBase () = default;
virtual ~ParticleContainerBase ();

ParticleContainerBase ( const ParticleContainerBase &) = delete;
ParticleContainerBase& operator= ( const ParticleContainerBase & ) = delete;

ParticleContainerBase ( ParticleContainerBase && ) = default;
ParticleContainerBase& operator= ( ParticleContainerBase && ) = default;
ParticleContainerBase ( ParticleContainerBase && other ) noexcept;
ParticleContainerBase& operator= ( ParticleContainerBase && other ) noexcept;

void Define (ParGDBBase* gdb) { m_gdb = gdb;}

Expand Down Expand Up @@ -237,6 +238,13 @@ public:

const ParticleBufferMap& BufferMap () const {return m_buffer_map;}

#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
void ensureParticleHandshakeWindow () const;
void releaseParticleHandshakeWindow ();
[[nodiscard]] Long* particleHandshakeBuffer () const { return m_particle_handshake_ptr; }
[[nodiscard]] MPI_Win particleHandshakeWindow () const { return m_particle_handshake_win; }
#endif

Vector<int> NeighborProcs(int ngrow) const
{
return computeNeighborProcs(this->GetParGDB(), ngrow);
Expand Down Expand Up @@ -284,6 +292,13 @@ protected:
mutable amrex::Vector<int> neighbor_procs;
mutable ParticleBufferMap m_buffer_map;

#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
mutable MPI_Win m_particle_handshake_win = MPI_WIN_NULL;
mutable Long* m_particle_handshake_ptr = nullptr;
mutable int m_particle_handshake_nprocs = 0;
mutable MPI_Comm m_particle_handshake_comm = MPI_COMM_NULL;
#endif

};

} // namespace amrex
Expand Down
125 changes: 125 additions & 0 deletions Src/Particle/AMReX_ParticleContainerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,78 @@ IntVect ParticleContainerBase::tile_size { AMREX_D_DECL(1024000,8,8) };
bool ParticleContainerBase::memEfficientSort = true;
bool ParticleContainerBase::use_comms_arena = false;

ParticleContainerBase::~ParticleContainerBase ()
{
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
releaseParticleHandshakeWindow();
#endif
}

ParticleContainerBase::ParticleContainerBase (ParticleContainerBase&& other) noexcept
: m_particle_locator(std::move(other.m_particle_locator)),
m_verbose(other.m_verbose),
m_stable_redistribute(other.m_stable_redistribute),
m_gdb_object(std::move(other.m_gdb_object)),
m_gdb(other.m_gdb),
m_dummy_mf(std::move(other.m_dummy_mf)),
m_arena(other.m_arena),
redistribute_mask_ptr(std::move(other.redistribute_mask_ptr)),
redistribute_mask_nghost(other.redistribute_mask_nghost),
neighbor_procs(std::move(other.neighbor_procs)),
m_buffer_map(std::move(other.m_buffer_map))
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
, m_particle_handshake_win(other.m_particle_handshake_win),
m_particle_handshake_ptr(other.m_particle_handshake_ptr),
m_particle_handshake_nprocs(other.m_particle_handshake_nprocs),
m_particle_handshake_comm(other.m_particle_handshake_comm)
#endif
{
other.m_gdb = nullptr;
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
other.m_particle_handshake_win = MPI_WIN_NULL;
other.m_particle_handshake_ptr = nullptr;
other.m_particle_handshake_nprocs = 0;
other.m_particle_handshake_comm = MPI_COMM_NULL;
#endif
}

ParticleContainerBase&
ParticleContainerBase::operator= (ParticleContainerBase&& other) noexcept
{
if (this != &other)
{
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
releaseParticleHandshakeWindow();
#endif

m_particle_locator = std::move(other.m_particle_locator);
m_verbose = other.m_verbose;
m_stable_redistribute = other.m_stable_redistribute;
m_gdb_object = std::move(other.m_gdb_object);
m_gdb = other.m_gdb;
m_dummy_mf = std::move(other.m_dummy_mf);
m_arena = other.m_arena;
redistribute_mask_ptr = std::move(other.redistribute_mask_ptr);
redistribute_mask_nghost = other.redistribute_mask_nghost;
neighbor_procs = std::move(other.neighbor_procs);
m_buffer_map = std::move(other.m_buffer_map);
#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
m_particle_handshake_win = other.m_particle_handshake_win;
m_particle_handshake_ptr = other.m_particle_handshake_ptr;
m_particle_handshake_nprocs = other.m_particle_handshake_nprocs;
m_particle_handshake_comm = other.m_particle_handshake_comm;

other.m_particle_handshake_win = MPI_WIN_NULL;
other.m_particle_handshake_ptr = nullptr;
other.m_particle_handshake_nprocs = 0;
other.m_particle_handshake_comm = MPI_COMM_NULL;
#endif
other.m_gdb = nullptr;
}

return *this;
}

void ParticleContainerBase::Define (const Geometry & geom,
const DistributionMapping & dmap,
const BoxArray & ba)
Expand Down Expand Up @@ -80,6 +152,59 @@ ParticleContainerBase::defineBufferMap () const
}
}

#if defined(AMREX_USE_MPI) && defined(BL_USE_MPI3)
void ParticleContainerBase::releaseParticleHandshakeWindow ()
{
if (m_particle_handshake_win != MPI_WIN_NULL) {
BL_MPI_REQUIRE(MPI_Win_free(&m_particle_handshake_win));
}
if (m_particle_handshake_comm != MPI_COMM_NULL) {
BL_MPI_REQUIRE(MPI_Comm_free(&m_particle_handshake_comm));
}
m_particle_handshake_ptr = nullptr;
m_particle_handshake_nprocs = 0;
}

void ParticleContainerBase::ensureParticleHandshakeWindow () const
{
const int nprocs = ParallelContext::NProcsSub();
MPI_Comm comm = ParallelContext::CommunicatorSub();

bool needs_rebuild = (m_particle_handshake_win == MPI_WIN_NULL)
|| (m_particle_handshake_nprocs != nprocs)
|| (m_particle_handshake_comm == MPI_COMM_NULL);

if (!needs_rebuild)
{
int cmp = MPI_UNEQUAL;
BL_MPI_REQUIRE(MPI_Comm_compare(comm, m_particle_handshake_comm, &cmp));
needs_rebuild = (cmp != MPI_IDENT && cmp != MPI_CONGRUENT);
}

if (needs_rebuild)
{
const_cast<ParticleContainerBase*>(this)->releaseParticleHandshakeWindow();

Long* baseptr = nullptr;
MPI_Win win = MPI_WIN_NULL;
BL_MPI_REQUIRE(MPI_Win_allocate(static_cast<MPI_Aint>(nprocs*sizeof(Long)),
sizeof(Long),
MPI_INFO_NULL,
comm,
&baseptr,
&win));

MPI_Comm dup_comm = MPI_COMM_NULL;
BL_MPI_REQUIRE(MPI_Comm_dup(comm, &dup_comm));

m_particle_handshake_ptr = baseptr;
m_particle_handshake_win = win;
m_particle_handshake_nprocs = nprocs;
m_particle_handshake_comm = dup_comm;
}
}
#endif

void ParticleContainerBase::SetParGDB (const Geometry & geom,
const DistributionMapping & dmap,
const BoxArray & ba)
Expand Down
13 changes: 13 additions & 0 deletions Tests/Particles/RedistributeGlobal/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
foreach(D IN LISTS AMReX_SPACEDIM)
set(_sources main.cpp)
if (NOT AMReX_GPU_BACKEND STREQUAL NONE)
set(_input_files inputs.rt.cuda)
else ()
set(_input_files inputs.rt)
endif ()

setup_test(${D} _sources _input_files)

unset(_sources)
unset(_input_files)
endforeach()
Loading
Loading