diff --git a/src/AMSlib/wf/validator.hpp b/src/AMSlib/wf/validator.hpp new file mode 100644 index 00000000..f6a9716e --- /dev/null +++ b/src/AMSlib/wf/validator.hpp @@ -0,0 +1,718 @@ +#ifndef __AMS_VALIDATOR_HPP__ +#define __AMS_VALIDATOR_HPP__ +#include +#include +#include // memcpy +#include +#include +#include +#include +#include +#include // is_same + +#if __STANDALONE_TEST__ +namespace ams +{ +using AMSResourceType = int; +struct ResourceManager +{ + template + static T* allocate(size_t n, AMSResourceType appDataLoc) { + return (T*) malloc (sizeof(T)*n); + } + static void deallocate(void* ptr, AMSResourceType appDataLoc) { + free(ptr); + } +}; +} +#else // for off-line testing +#include "wf/resource_manager.hpp" +#endif + +namespace ams +{ +template +struct ValPoint +{ +public: + ValPoint () : m_pos(0ul), m_o_sur(0.0), m_o_phy(0.0) {} + ValPoint (size_t pos, FPTypeValue o_sur) + : m_pos(pos), m_o_sur(o_sur), m_o_phy(0.0) {} + ValPoint (size_t pos, FPTypeValue o_sur, FPTypeValue o_phy) + : m_pos(pos), m_o_sur(o_sur), m_o_phy(o_phy) {} + + std::string to_string() const { + return "[ " + std::to_string(m_pos) + + ", " + std::to_string(m_o_sur) + + ", " + std::to_string(m_o_phy) + " ]"; + } + +public: + size_t m_pos; + FPTypeValue m_o_sur; + FPTypeValue m_o_phy; +}; + +template +struct ValStats +{ + ValStats() + : m_cnt({0u,0u}), + m_avg({static_cast(0), static_cast(0)}), + m_var({static_cast(0), static_cast(0)}) + {} + + ValStats(const std::array& cnt, + const std::array& avg, + const std::array& var) + : m_cnt(cnt), m_avg(avg), m_var(var) + {} + + std::array m_cnt; + std::array m_avg; + std::array m_var; +}; + +template +class StepValPoints { +public: + StepValPoints() : m_step(0ul) {} + void set_step (const size_t i) { m_step = i; } + void inc_step () { m_step ++; } + size_t get_iter () const { return m_step; } + void add_pt(const ValPoint& pt) { m_val_pts.emplace_back(pt); } + void add_pt(ValPoint&& pt) { m_val_pts.emplace_back(pt); } + + void sort_points() { + auto less_pt = [](const ValPoint& a, const ValPoint& b) + { + return (a.m_pos < b.m_pos); + }; + + std::sort(m_val_pts.begin(), m_val_pts.end(), less_pt); + } + + size_t get_num_points() const { return m_val_pts.size(); } + + size_t get_last_pos() const { + return ((m_val_pts.size() == 0ul)? 0ul : m_val_pts.back().m_pos); + } + + void print() const { + std::cout << "step " << m_step << std::endl << "validation points : "; + for (const auto& pt: m_val_pts) { + std::cout << ' ' + pt.to_string(); + } + std::cout << std::endl; + } + const std::vector>& val_pts() const { return m_val_pts; } + std::vector>& val_pts() { return m_val_pts; } + +protected: + size_t m_step; + std::vector> m_val_pts; +}; + +/** + * + */ +template +class VPCollector { +public: + VPCollector(unsigned seed, MPI_Comm comm = MPI_COMM_WORLD); + ~VPCollector(); + + bool set_validation(const bool* pred_loc, + size_t num_pred_loc, + AMSResourceType appDataLoc, + unsigned k_v); +#if __STANDALONE_TEST__ + void backup_surrogate_outs(const FPTypeValue* sur_out, + StepValPoints& step_valpoints); + std::array backup_validation_outs(const FPTypeValue* phy_out, + StepValPoints& step_valpoints); + ValStats get_error_stats(const FPTypeValue* phy_out, + StepValPoints& step_valpoints); +#endif + void backup_surrogate_outs(const std::vector sur_out, + std::vector>& step_valpoints); + + const bool* predicate() const { return m_pred_loc_new; } + + std::vector> get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints); + +protected: + int gather_predicate(const bool* pred_loc, size_t num_pred_loc); + size_t pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const; + std::vector pick_val_pts(unsigned k = 0u); + bool count_local_vals(const std::vector& val_pts, + std::vector& num_chosen_per_rank); + bool turn_predicate_on(const unsigned my_val_pt_cnt); + bool* distribute_predicate(const std::vector& local_val_pt_cnts, + const AMSResourceType appDataLoc); + /// Clear intermediate data for root rank to handle predicate update + void clear_intermediate_info(); + std::array sum_sqdev(const FPTypeValue* phy_out, + const FPTypeValue* sur_out, + const StepValPoints& step_valpoints, + const std::array& avg); + std::vector> backup_validation_outs( + const std::vector phy_out, + std::vector>& step_valpoints); + +protected: + MPI_Comm m_comm; + int m_rank; + int m_num_ranks; + size_t m_num_pred_loc; + /// Local predicate array + const bool* m_pred_loc; + /// indices of positive local predicates + std::vector m_pred_loc_pos; + bool* m_pred_loc_new; + AMSResourceType m_appDataLoc; + std::vector m_sur; ///< surrogate output backup + + // For root rank + /// The total number of predicates + size_t m_tot_num_preds; + /// The total number of positive predicates + size_t m_tot_num_preds_pos; + /// number of predicates and the positive ones of each rank + std::vector m_num_pred_all; + /// displacement of positive predicates across ranks + std::vector m_displs_pos; + std::default_random_engine m_rng; +}; + +template +void VPCollector::clear_intermediate_info() +{ + m_tot_num_preds = 0u; + m_tot_num_preds_pos = 0u; + m_num_pred_all.clear(); + m_displs_pos.clear(); +} + +template +VPCollector::VPCollector(unsigned seed, MPI_Comm comm) +: m_comm(comm), m_num_pred_loc(0u), m_pred_loc(nullptr), m_pred_loc_new(nullptr), + m_tot_num_preds(0u), m_tot_num_preds_pos(0u) +{ + if (comm == MPI_COMM_NULL) { + m_rank = 0; + m_num_ranks = 1; + } else { + MPI_Comm_rank(comm, &m_rank); + MPI_Comm_size(comm, &m_num_ranks); + } + m_rng.seed(seed + 1357u + m_rank); + m_rng(); +} + +template +VPCollector::~VPCollector() +{ + clear_intermediate_info(); + if (m_pred_loc_new) { +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, m_appDataLoc); + for (size_t i = 0u; i < m_sur.size(); ++i) { + ams::ResourceManager::deallocate(m_sur[i], m_appDataLoc); + } +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, m_appDataLoc); + for (size_t i = 0u; i < m_sur.size(); ++i) { + rm_d.deallocate(m_sur[i], m_appDataLoc); + } +#endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + m_sur.clear(); + } + m_pred_loc = nullptr; + m_num_pred_loc = 0ul; +} + +/// Gather predicates to root rank +template +int VPCollector::gather_predicate(const bool* pred_loc, size_t num_pred_loc) +{ + m_pred_loc = pred_loc; + m_num_pred_loc = num_pred_loc; + int rc = MPI_SUCCESS; + + m_pred_loc_pos.clear(); + m_pred_loc_pos.reserve(m_num_pred_loc); + + for (size_t i = 0u; i < m_num_pred_loc; ++i) { + if (m_pred_loc[i]) m_pred_loc_pos.push_back(i); + } + + std::vector cnt_loc = {static_cast(m_num_pred_loc), + static_cast(m_pred_loc_pos.size())}; + + m_num_pred_all.clear(); + if (m_rank == 0) { + m_num_pred_all.resize(m_num_ranks*2); + } + // Gather the data sizes (i.e., the number of items) from each rank + rc = MPI_Gather(reinterpret_cast(cnt_loc.data()), + 2, + MPI_UNSIGNED, + reinterpret_cast(m_num_pred_all.data()), + 2, + MPI_UNSIGNED, + 0, + m_comm); + + if (rc != MPI_SUCCESS) { + if (m_rank == 0) { + std::cerr << "MPI_Gather() in gather_predicate() failed with code (" + << rc << ")" << std::endl; + } + return rc; + } + + m_displs_pos.clear(); + + if (m_rank == 0) { + m_displs_pos.resize(m_num_ranks); + + size_t offset = 0u; + size_t offset_pos = 0u; + auto it = m_num_pred_all.cbegin(); + for (int i = 0; i < m_num_ranks; ++i) { + m_displs_pos[i] = offset_pos; + offset += static_cast(*it++); + offset_pos += static_cast(*it++); + } + m_tot_num_preds = offset; + m_tot_num_preds_pos = offset_pos; + } + return MPI_SUCCESS; +} + +/// Determine the number of points to evaluate physics model on while leveraging workers idle due to load imbalance +template +size_t VPCollector::pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const +{ + const size_t imbalance = n_F % m_num_ranks; + const size_t n_idle = (imbalance == static_cast(0ul))? 0ul : (m_num_ranks - imbalance); + const size_t n_val = std::min (n_idle + k * m_num_ranks, n_T); + + return n_val; +} + +/** Randonly choose the points to run physics on, out of those accepted with + the surrogate. + `k_v': the minimum number of new physics evaluation points per rank. + By default, it is 0. Each rank will have k_v or k_v+1 extra physic + evaluation points. */ + +template +std::vector VPCollector::pick_val_pts(unsigned k_v) +{ + if (m_tot_num_preds_pos < 1u) { + return std::vector{}; + } + + const size_t num_vals = pick_num_val_pts(m_tot_num_preds_pos, + m_tot_num_preds - m_tot_num_preds_pos, + k_v); + + std::uniform_int_distribution dist(0u, m_tot_num_preds_pos-1); + + std::set chosen; + while (chosen.size() < num_vals) { + chosen.insert(dist(m_rng)); + } + + return std::vector(chosen.cbegin(), chosen.cend()); +} + +/** The root rank could have translated the indices of chosen predicates to the + * local indices of owner ranks and then send the indices back to each rank. + * However, as each rank can own a various number of predicates chosen to + * flip, it would require sending the number of indices for each rank to + * receive in advance. To save MPI communication, we simply send the count to + * flip to each rank and let each rank locally determine which predicates to + * flip as many as the count it receives. Here, we prepare the count for each + * rank. */ +template +bool VPCollector::count_local_vals(const std::vector& val_pts, + std::vector& num_chosen_per_rank) +{ + num_chosen_per_rank.assign(m_num_ranks, 0u); + int cur_rank = 0; + + for (auto idx: val_pts) { + while (idx >= m_displs_pos[cur_rank+1]) { + ++ cur_rank; + if (cur_rank >= m_num_ranks) { + std::cerr << "Invalid predicate index generated for error analysis!" << std::endl; + return false; + } + } + num_chosen_per_rank[cur_rank]++; + } + return true; +} + +template +bool VPCollector::turn_predicate_on(const unsigned my_val_pt_cnt) +{ + std::memcpy(m_pred_loc_new, m_pred_loc, m_num_pred_loc*sizeof(bool)); + + if (static_cast(my_val_pt_cnt) > m_pred_loc_pos.size()) { + std::cerr << "Invalid number of positive predicates computed for error analysis!" << std::endl; + return false; + } + if (my_val_pt_cnt == 0u) { + return true; + } + + auto pred_idx = m_pred_loc_pos; + + std::shuffle(pred_idx.begin(), pred_idx.end(), m_rng); + + for (unsigned i = 0u; i < my_val_pt_cnt; ++i) { + #if DEBUG + if (!m_pred_loc_new[pred_idx[i]]) { + std::cerr << "Incorrect predicate chosen to flip for error analysis!" << std::endl; + } + #endif + m_pred_loc_new[pred_idx[i]] = false; + } + + return true; +} + +template +bool* VPCollector::distribute_predicate( + const std::vector& local_val_pt_cnts, + const AMSResourceType appDataLoc) +{ + int rc = 0; + m_appDataLoc = appDataLoc; +#if __STANDALONE_TEST__ + m_pred_loc_new = ams::ResourceManager::allocate(m_num_pred_loc, appDataLoc); +#else + auto &rm_a = ams::ResourceManager::getInstance(); + m_pred_loc_new = rm_a.allocate(m_num_pred_loc, appDataLoc); +#endif // __STANDALONE_TEST__ + + if (!m_pred_loc_new) { + if (m_rank == 0) { + std::cerr << "allocate() in distribute_predicate() failed!" << std::endl; + clear_intermediate_info(); + } + return nullptr; + } + + unsigned my_val_pt_cnt = 0u; + rc = MPI_Scatter( + reinterpret_cast(local_val_pt_cnts.data()), + 1, + MPI_UNSIGNED, + reinterpret_cast(&my_val_pt_cnt), + 1, + MPI_UNSIGNED, + 0, m_comm); + + if (rc != MPI_SUCCESS) { + if (m_rank == 0) { + std::cerr << "MPI_Scatterv() in distribute_predicate() failed with code (" + << rc << ")" << std::endl; + clear_intermediate_info(); + } +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, appDataLoc); +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, appDataLoc); +#endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + + return nullptr; + } + clear_intermediate_info(); + + // Unset the predicate for those selected as validation points + if (!turn_predicate_on(my_val_pt_cnt)) { + std::cerr << "Failed to turn predicates on for extra validation!" << std::endl; +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, appDataLoc); +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, appDataLoc); +#endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + } + + return m_pred_loc_new; +} + +template +bool VPCollector::set_validation( + const bool* pred_loc, + const size_t num_pred_loc, + const AMSResourceType appDataLoc, + const unsigned k_v) +{ + int rc = 0; + rc = gather_predicate(pred_loc, num_pred_loc); + if (rc != MPI_SUCCESS) { + return false; + } + + std::vector local_val_pt_cnts; + if (m_rank == 0) { + // indices of predicates to flip for enabling validation + std::vector pred_idx = pick_val_pts(k_v); + if (!count_local_vals(pred_idx, local_val_pt_cnts)) { + return false; + } + } + + // distribute updated predicate + if (!distribute_predicate(local_val_pt_cnts, appDataLoc)) { + return false; + } + + return true; +} + +template +std::array VPCollector::sum_sqdev( + const FPTypeValue* phy_out, + const FPTypeValue* sur_out, + const StepValPoints& step_valpoints, + const std::array& avg) +{ + std::array sum{static_cast(0), static_cast(0)}; + + size_t k = 0ul; + for (auto& pt: step_valpoints.val_pts()) { + for (; k < pt.m_pos; ++k) { + sum[0] += std::abs(phy_out[k] - sur_out[k]); + } + k++; + auto diff = std::abs(pt.m_o_phy - pt.m_o_sur) - avg[1]; + sum[1] += diff*diff; + } + for (; k < m_num_pred_loc; ++k) { + sum[0] += std::abs(phy_out[k] - sur_out[k]); + } + + return sum; +} + +#if __STANDALONE_TEST__ +template +void VPCollector::backup_surrogate_outs( + const FPTypeValue* sur_out, + StepValPoints& step_valpoints) +{ + m_sur.resize(1u, nullptr); +#if __STANDALONE_TEST__ + m_sur[0] = ams::ResourceManager::allocate(m_num_pred_loc, m_appDataLoc); +#endif // __STANDALONE_TEST__ + std::memcpy(m_sur[0], sur_out, m_num_pred_loc); + + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { + if (m_pred_loc_new[i] != m_pred_loc[i]) { + step_valpoints.add_pt(ValPoint(i, sur_out[i])); + } + } + //step_valpoints.sort_points(); + step_valpoints.inc_step(); +} + +template +std::array VPCollector::backup_validation_outs( + const FPTypeValue* phy_out, + StepValPoints& step_valpoints) +{ + std::array sum {static_cast(0), static_cast(0)}; + + size_t k = 0ul; + for (auto& pt: step_valpoints.val_pts()) { + for (; k < pt.m_pos; ++k) { + sum[0] += std::abs(phy_out[k] - m_sur[0][k]); + } + k++; + pt.m_o_phy = phy_out[pt.m_pos]; + sum[1] += std::abs(pt.m_o_phy - pt.m_o_sur); + } + + for (; k < m_num_pred_loc; ++k) { + sum[0] += std::abs(phy_out[k] - m_sur[0][k]); + } + + return sum; +} + +template +ValStats VPCollector::get_error_stats( + const FPTypeValue* phy_out, + StepValPoints& step_valpoints) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + std::array err_sum_glo {static_cast(0), static_cast(0)}; + std::array err_avg_glo {static_cast(0), static_cast(0)}; + + std::array err_cnt_loc {static_cast(m_num_pred_loc - step_valpoints.get_num_points()), + static_cast(step_valpoints.get_num_points())}; + std::array err_cnt_glo {0u, 0u}; + + MPI_Allreduce(&err_cnt_loc[0], &err_cnt_glo[0], 2, MPI_UNSIGNED, MPI_SUM, m_comm); + MPI_Datatype mpi_dtype = MPI_FLOAT; + if (std::is_same::value) { + mpi_dtype = MPI_DOUBLE; + } else //if (std::is_same::value) + { + mpi_dtype = MPI_FLOAT; + } + + MPI_Allreduce(&err_sum_loc[0], &err_sum_glo[0], 2, mpi_dtype, MPI_SUM, m_comm); + err_avg_glo[0] = err_sum_glo[0] / static_cast(err_cnt_glo[0]); + err_avg_glo[1] = err_sum_glo[1] / static_cast(err_cnt_glo[1]); + + const auto err_var_loc = sum_sqdev(phy_out, m_sur[0], step_valpoints, err_avg_glo); + //err_var_loc[0] /= static_cast(err_cnt_glo[0]); + //err_var_loc[1] /= static_cast(err_cnt_glo[1]); + + std::array err_var_glo{0,0}; + MPI_Allreduce(&err_var_loc[0], &err_var_glo[0], 2, mpi_dtype, MPI_SUM, m_comm); + err_var_glo[0] /= static_cast(err_cnt_glo[0]); + err_var_glo[1] /= static_cast(err_cnt_glo[1]); + + return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); +} +#endif // __STANDALONE_TEST__ + + +template +void VPCollector::backup_surrogate_outs( + const std::vector sur_out, + std::vector>& step_valpoints) +{ + const size_t dim = sur_out.size(); + step_valpoints.clear(); + step_valpoints.resize(dim); + + m_sur.resize(dim); + for (size_t j = 0ul; j < dim; ++j) { +#if __STANDALONE_TEST__ + m_sur[j] = ams::ResourceManager::allocate(m_num_pred_loc, m_appDataLoc); +#else + auto &rm_a = ams::ResourceManager::getInstance(); + m_sur[j] = rm_a.allocate(m_num_pred_loc, m_appDataLoc); +#endif // __STANDALONE_TEST__ + std::memcpy(m_sur[j], sur_out[j], m_num_pred_loc); + } + + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { + if (m_pred_loc_new[i] != m_pred_loc[i]) { + for (size_t j = 0ul; j < dim; ++j) { + step_valpoints[j].add_pt(ValPoint(i, sur_out[j][i])); + } + } + } + for (size_t j = 0ul; j < dim; ++j) { + //step_valpoints[j].sort_points(); + step_valpoints[j].inc_step(); + } +} + +template +std::vector> VPCollector::backup_validation_outs( + const std::vector phy_out, + std::vector>& step_valpoints) +{ + const size_t dim = step_valpoints.size(); + + std::vector> sum( + dim, + {static_cast(0), static_cast(0)}); + + if (phy_out.size() != dim) { + // exception + std::cerr << "Invalud data dimension!" << std::endl; + return sum; + } + + for (size_t j = 0ul; j < dim; ++j) { + size_t k = 0ul; + for (auto& pt: step_valpoints[j].val_pts()) { + for ( ; k < pt.m_pos; ++k) { + sum[j][0] += std::abs(phy_out[j][k] - m_sur[j][k]); + } + k++; + pt.m_o_phy = phy_out[j][pt.m_pos]; + sum[j][1] += std::abs(pt.m_o_phy - pt.m_o_sur); + } + for ( ; k < m_num_pred_loc; ++k) { + sum[j][0] += std::abs(phy_out[j][k] - m_sur[j][k]); + } + } + return sum; +} + + +template +std::vector> VPCollector::get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + const size_t dim = err_sum_loc.size(); + std::vector> err_sum_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> err_avg_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> err_var_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> stats(dim); + +#if DEBUG + if (m_num_pred_loc <= step_valpoints.at(0).get_num_points()) { + std::cerr << "Incorrect number of validation points!" << std::endl; + } +#endif + + std::array err_cnt_loc {static_cast(m_num_pred_loc - step_valpoints.at(0).get_num_points()), + static_cast(step_valpoints.at(0).get_num_points())}; + std::array err_cnt_glo {0u, 0u}; + + MPI_Allreduce(&err_cnt_loc[0], &err_cnt_glo[0], 2, MPI_UNSIGNED, MPI_SUM, m_comm); + + MPI_Datatype mpi_dtype = MPI_FLOAT; + if (std::is_same::value) { + mpi_dtype = MPI_DOUBLE; + } else //if (std::is_same::value) + { + mpi_dtype = MPI_FLOAT; + } + + for (size_t j = 0ul; j < dim; ++j) { + MPI_Allreduce(&err_sum_loc[j][0], &err_sum_glo[j][0], 2, mpi_dtype, MPI_SUM, m_comm); + err_avg_glo[j][0] = err_sum_glo[j][0] / static_cast(err_cnt_glo[0]); + err_avg_glo[j][1] = err_sum_glo[j][1] / static_cast(err_cnt_glo[1]); + std::array err_var_loc = + sum_sqdev(phy_out[j], m_sur[j], step_valpoints[j], err_avg_glo[j]); + //err_var_loc[0] /= static_cast(err_cnt_glo[0]); + //err_var_loc[1] /= static_cast(err_cnt_glo[1]); + MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, mpi_dtype, MPI_SUM, m_comm); + err_var_glo[j][0] /= static_cast(err_cnt_glo[0]); + err_var_glo[j][1] /= static_cast(err_cnt_glo[1]); + stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); + } + + return stats; +} + + +} // namespace ams + +#endif // __AMS_VALIDATOR_HPP__ diff --git a/src/AMSlib/wf/workflow.hpp b/src/AMSlib/wf/workflow.hpp index 543ecb80..52957755 100644 --- a/src/AMSlib/wf/workflow.hpp +++ b/src/AMSlib/wf/workflow.hpp @@ -29,6 +29,7 @@ #include #include "wf/redist_load.hpp" +#include "wf/validator.hpp" #endif #include "wf/debug.h" @@ -72,7 +73,7 @@ class AMSWorkflow std::shared_ptr DB; /** @brief The process id. For MPI runs this is the rank */ - const int rId; + int rId; /** @brief The total number of processes participating in the simulation * (world_size for MPI) */ @@ -92,6 +93,18 @@ class AMSWorkflow /** @brief Is the evaluate a distributed execution **/ bool isDistributed; +#ifdef __ENABLE_MPI__ + /** @brief outpus from surrogate and physics models sampled for validation + * over iterations and output dimensions **/ + std::vector>> valsteps; +#endif + + /** Seed for randonly choosing validation points **/ + unsigned int val_seed; + + /** minimum number of validation points per rank **/ + unsigned int min_val_pts; + /** \brief Store the data in the database and copies * data from the GPU to the CPU and then to the database. * To store GPU resident data we use a 1MB of "pinned" @@ -219,7 +232,9 @@ class AMSWorkflow #ifdef __ENABLE_MPI__ comm(MPI_COMM_NULL), #endif - ePolicy(AMSExecPolicy::AMS_UBALANCED) + ePolicy(AMSExecPolicy::AMS_UBALANCED), + val_seed(1234u), + min_val_pts(0u) { DB = nullptr; auto &dbm = ams::db::DBManager::getInstance(); @@ -232,7 +247,17 @@ class AMSWorkflow void set_physics(AMSPhysicFn _AppCall) { AppCall = _AppCall; } #ifdef __ENABLE_MPI__ - void set_communicator(MPI_Comm communicator) { comm = communicator; } + void set_communicator(MPI_Comm communicator) { + comm = communicator; + + if (comm == MPI_COMM_NULL) { + rId = 0; + wSize = 1; + } else { + MPI_Comm_rank(comm, &rId); + MPI_Comm_size(comm, &wSize); + } + } #endif void set_exec_policy(AMSExecPolicy policy) { ePolicy = policy; } @@ -240,12 +265,23 @@ class AMSWorkflow bool should_load_balance() const { #ifdef __ENABLE_MPI__ - return (comm != MPI_COMM_NULL && ePolicy == AMSExecPolicy::AMS_BALANCED); + return (comm != MPI_COMM_NULL && ePolicy == AMSExecPolicy::AMS_BALANCED && (wSize > 1)); #else return false; #endif } +#ifdef __ENABLE_MPI__ + const std::vector>>& get_validation_samples() const + { return valsteps; } +#endif + + void setup_validation(unsigned int s, unsigned int m) + { + val_seed = s; + min_val_pts = m; + } + ~AMSWorkflow() { DBG(Workflow, "Destroying Workflow Handler"); } /** @brief This is the main entry point of AMSLib and replaces the original @@ -362,6 +398,24 @@ class AMSWorkflow DBG(Workflow, "Computed Predicates") +#ifdef __ENABLE_MPI__ + //------------------------------------------------------------------ + // Prepare validation of surrogate model with extra physics outputs + //------------------------------------------------------------------ + // TODO: if the max number of iterations is known, reserve the vector + valsteps.push_back({}); + auto& valstep = valsteps.back(); + + set_exec_policy(AMSExecPolicy::AMS_BALANCED); + VPCollector vpcol(val_seed, comm); + if (should_load_balance()) { + vpcol.set_validation(predicate, totalElements, appDataLoc, min_val_pts); + + // Backup the surrogate outputs for those selected to compute physics + vpcol.backup_surrogate_outs(origOutputs, valstep); + } +#endif // __ENABLE_MPI__ + // Pointer values which store input data values // to be computed using the eos function. std::vector packedInputs; @@ -437,6 +491,25 @@ class AMSWorkflow DBG(Workflow, "Finished physics evaluation") +#ifdef __ENABLE_MPI__ + //------------------------------------------------------------------ + // Process validation of surrogate model with extra physics outputs + // Compute the error statistics between surrogate and physics model outpus + //------------------------------------------------------------------ + if (should_load_balance()) { + auto stats = vpcol.get_error_stats(origOutputs, valstep); + for (int i = 0; i < outputDim; i++) { + CINFO(ErrorStats, + rId == 0, + "dim, %d, num_samples, %u, avg, %f, var, %f\n", + i, + stats[i].m_cnt, + stats[i].m_avg, + stats[i].m_var); + } + } +#endif // __ENABLE_MPI__ + if (DB) { CALIPER(CALI_MARK_BEGIN("DBSTORE");) if (!DB->storePredicate()) { diff --git a/tests/AMSlib/ams_ete.cpp b/tests/AMSlib/ams_ete.cpp index 729f3752..4004529b 100644 --- a/tests/AMSlib/ams_ete.cpp +++ b/tests/AMSlib/ams_ete.cpp @@ -130,14 +130,15 @@ void callBackSingle(void *cls, long elements, void **inputs, void **outputs) int main(int argc, char **argv) { - if (argc != 12) { + if ((argc != 12) && (argc != 13)) { std::cout << "Wrong cli\n"; std::cout << argv[0] << " use_device(0|1) num_inputs num_outputs model_path " "data_type(float|double) uq_policy(random|deltaUQ " "(mean)|deltaUQ (max)) threshold(0) " "num_iterations avg_num_values db_type(none|csv|hdf5) " - "db_path(path to existing path to store data)"; + "db_path(path to existing path to store data) " + "use_mpi(0|1)"; return -1; } @@ -154,6 +155,12 @@ int main(int argc, char **argv) int avg_elements = std::atoi(argv[9]); std::string db_type_str = std::string(argv[10]); std::string fs_path = std::string(argv[11]); +#ifdef __ENABLE_MPI__ + const bool use_mpi = (argc == 13) ? static_cast(std::atoi(argv[12])) : false; + MPI_Init(&argc, &argv); +#else + const bool use_mpi = false; +#endif // __ENABLE_MPI__ AMSDBType db_type = ams::db::getDBType(db_type_str); AMSResourceType resource = AMSResourceType::AMS_HOST; srand(time(NULL)); @@ -174,24 +181,52 @@ int main(int argc, char **argv) if (data_type == AMSDType::AMS_SINGLE) { Problem prob(num_inputs, num_outputs); - AMSExecutor wf = AMSCreateExecutor(model_descr, - AMSDType::AMS_SINGLE, - resource, - (AMSPhysicFn)callBackSingle, - 0, - 1); +// TODO: I do not think we should pass the process id and world size here. +// It should be obtained from the communicator passed. + AMSExecutor wf = +#ifdef __ENABLE_MPI__ + use_mpi? + AMSCreateDistributedExecutor(model_descr, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + MPI_COMM_WORLD, + 0, + 1) : +#endif // __ENABLE_MPI__ + AMSCreateExecutor(model_descr, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + 0, + 1); prob.ams_run(wf, resource, num_iterations, avg_elements); } else { Problem prob(num_inputs, num_outputs); - AMSExecutor wf = AMSCreateExecutor(model_descr, - AMSDType::AMS_DOUBLE, - resource, - (AMSPhysicFn)callBackDouble, - 0, - 1); + AMSExecutor wf = +#ifdef __ENABLE_MPI__ + use_mpi? + AMSCreateDistributedExecutor(model_descr, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + MPI_COMM_WORLD, + 0, + 1) : +#endif // __ENABLE_MPI__ + AMSCreateExecutor(model_descr, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + 0, + 1); prob.ams_run(wf, resource, num_iterations, avg_elements); } +#ifdef __ENABLE_MPI__ + MPI_Finalize(); +#endif // __ENABLE_MPI__ + return 0; }