Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Migrate for avoiding MPI comms buffer reallocation #570

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 204 additions & 5 deletions core/src/Cabana_CommunicationPlan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,44 @@ struct CommunicationDataAoSoA
std::size_t _num_comp = 0;
};

/*!
\brief Store AoSoA send/receive buffers with separate destination.
*/
template <class AoSoAType>
struct CommunicationDataAoSoASeparate : public CommunicationDataAoSoA<AoSoAType>
{
using base_type = CommunicationDataAoSoA<AoSoAType>;
//! Particle data type.
using particle_data_type = typename base_type::particle_data_type;
//! Kokkos memory space.
using memory_space = typename base_type::memory_space;
//! Communication data type.
using data_type = typename base_type::data_type;
//! Communication buffer type.
using buffer_type = typename base_type::buffer_type;

/*!
Constructor
\param particles The particle data (either AoSoA or slice).
*/
CommunicationDataAoSoASeparate( const AoSoAType& src, const AoSoAType& dst )
: base_type( src )
, _dst_particles( dst )
{
}

//! Send buffer.
using base_type::_send_buffer;
//! Receive buffer.
using base_type::_recv_buffer;
//! Particle slice.
AoSoAType _src_particles = base_type::_particles;
//! Destination AoSoA.
AoSoAType _dst_particles;
//! Slice components.
using base_type::_num_comp;
};

/*!
\brief Store slice send/receive buffers.
*/
Expand Down Expand Up @@ -1057,8 +1095,47 @@ struct CommunicationDataSlice
//! Slice components.
std::size_t _num_comp;
};
//---------------------------------------------------------------------------//

/*!
\brief Store slice send/receive buffers with separate destination.
*/
template <class SliceType>
struct CommunicationDataSliceSeparate : public CommunicationDataSlice<SliceType>
{
using base_type = CommunicationDataSlice<SliceType>;
//! Particle data type.
using particle_data_type = typename base_type::particle_data_type;
//! Kokkos memory space.
using memory_space = typename base_type::memory_space;
//! Communication data type.
using data_type = typename base_type::data_type;
//! Communication buffer type.
using buffer_type = typename base_type::buffer_type;

/*!
Constructor
\param particles The particle data.
*/
CommunicationDataSliceSeparate( const particle_data_type& src,
const particle_data_type& dst )
: base_type( src )
, _dst_particles( dst )
{
}

//! Send buffer.
using base_type::_send_buffer;
//! Receive buffer.
using base_type::_recv_buffer;
//! Particle slice.
particle_data_type _src_particles = base_type::_particles;
//! Destination slice.
particle_data_type _dst_particles;
//! Slice components.
using base_type::_num_comp;
};

//---------------------------------------------------------------------------//
/*!
\brief Store communication plan and communication buffers.
*/
Expand Down Expand Up @@ -1096,6 +1173,16 @@ class CommunicationData
, _comm_data( CommDataType( particles ) )
, _overallocation( overallocation )
{
updateRangePolicy();
}
CommunicationData( const CommPlanType& comm_plan,
const CommDataType& comm_data,
const double overallocation = 1.0 )
: _comm_plan( comm_plan )
, _comm_data( comm_data )
, _overallocation( overallocation )
{
updateRangePolicy();
}

//! Get the communication send buffer.
Expand All @@ -1104,7 +1191,7 @@ class CommunicationData
buffer_type getReceiveBuffer() const { return _comm_data._recv_buffer; }

//! Get the particles to communicate.
particle_data_type getData() const { return _comm_data._particles; }
particle_data_type getParticles() const { return _comm_data._particles; }
//! Update particles to communicate.
void setData( const particle_data_type& particles )
{
Expand Down Expand Up @@ -1145,6 +1232,11 @@ class CommunicationData
_comm_data.reallocateReceive( shrunk_recv_size );
}

void resizeParticles( const std::size_t new_size )
{
_comm_data._particles.resize( new_size );
}

//! Perform the communication (migrate, gather, scatter).
virtual void apply() = 0;

Expand All @@ -1163,32 +1255,42 @@ class CommunicationData
reserveImpl( comm_plan, particles, total_send, total_recv );
}
void reserveImpl( const CommPlanType& comm_plan,
const particle_data_type particles,
const particle_data_type& particles,
const std::size_t total_send,
const std::size_t total_recv )
{
_comm_plan = comm_plan;
setData( particles );

updateBuffers( total_send, total_recv );
}
//! \endcond

protected:
void updateBuffers( const std::size_t total_send,
const std::size_t total_recv )
{
auto send_capacity = sendCapacity();
std::size_t new_send_size = total_send * _overallocation;
if ( new_send_size > send_capacity )
_comm_data.reallocateSend( new_send_size );
std::cout << "send cap " << new_send_size << " " << sendCapacity()
<< "\n";

auto recv_capacity = receiveCapacity();
std::size_t new_recv_size = total_recv * _overallocation;
if ( new_recv_size > recv_capacity )
_comm_data.reallocateReceive( new_recv_size );
std::cout << "recv cap " << new_recv_size << " " << receiveCapacity()
<< "\n";

_send_size = total_send;
_recv_size = total_recv;

// Update policies with new sizes.
updateRangePolicy();
}
//! \endcond

protected:
//! Update range policy based on new communication plan.
void updateRangePolicy()
{
Expand All @@ -1215,6 +1317,103 @@ class CommunicationData
std::size_t _recv_size;
};

/*!
\brief Store communication plan and communication buffers.
*/
template <class CommPlanType, class CommDataType>
class CommunicationDataSeparate
: public CommunicationData<CommPlanType, CommDataType>
{
public:
using base_type = CommunicationData<CommPlanType, CommDataType>;
//! Communication plan type (Halo, Distributor)
using plan_type = typename base_type::plan_type;
//! Kokkos execution space.
using execution_space = typename base_type::execution_space;
//! Kokkos execution policy.
using policy_type = typename base_type::policy_type;
//! Communication data type.
using comm_data_type = typename base_type::comm_data_type;
//! Particle data type.
using particle_data_type = typename base_type::particle_data_type;
//! Kokkos memory space.
using memory_space = typename base_type::memory_space;
//! Communication data type.
using data_type = typename base_type::data_type;
//! Communication buffer type.
using buffer_type = typename base_type::buffer_type;

/*!
\param comm_plan The communication plan.
\param particles The particle data (either AoSoA or slice).
\param overallocation An optional factor to keep extra space in the
buffers to avoid frequent resizing.
*/
CommunicationDataSeparate( const CommPlanType& comm_plan,
const particle_data_type& src,
const particle_data_type& dst,
const double overallocation = 1.0 )
: base_type( comm_plan, CommDataType( src, dst ), overallocation )
{
}

//! Get the destination particles.
particle_data_type getDestinationParticles() const
{
return _comm_data._dst_particles;
}
//! Update particles to communicate.
void setParticles( const particle_data_type& src,
const particle_data_type& dst )
{
_comm_data._src_particles = src;
_comm_data._dst_particles = dst;
}

//! \cond Impl
void
reserveImpl( const CommPlanType& comm_plan, const particle_data_type& src,
const particle_data_type& dst, const std::size_t total_send,
const std::size_t total_recv, const double overallocation )
{
if ( overallocation < 1.0 )
throw std::runtime_error( "Cannot allocate buffers with less space "
"than data to communicate!" );
_overallocation = overallocation;

reserveImpl( comm_plan, src, dst, total_send, total_recv );
}
void reserveImpl( const CommPlanType& comm_plan,
const particle_data_type& src,
const particle_data_type& dst,
const std::size_t total_send,
const std::size_t total_recv )
{
_comm_plan = comm_plan;
setParticles( src, dst );
this->updateRangePolicy();

this->updateBuffers( total_send, total_recv );
}
//! \endcond

protected:
//! Communication plan.
using base_type::_comm_plan;
//! Send range policy.
using base_type::_send_policy;
//! Receive range policy.
using base_type::_recv_policy;
//! Communication plan.
using base_type::_comm_data;
//! Overallocation factor.
using base_type::_overallocation;
//! Send sizes.
using base_type::_send_size;
//! Receive sizes.
using base_type::_recv_size;
};

} // end namespace Cabana

#endif // end CABANA_COMMUNICATIONPLAN_HPP
Loading