Commit 0a821d56 authored by Chris Cantwell's avatar Chris Cantwell

Added code to support enrolement of spares to replace dead processes.

Generalised Comm::GetComm to remove dependency on CommMpi in GSlib header.
parent 263ed281
......@@ -42,6 +42,7 @@ namespace LibUtilities
{
Comm::Comm(int narg, char *arg[])
{
m_isRecovering = false;
}
Comm::Comm()
......
......@@ -80,6 +80,8 @@ public:
LIB_UTILITIES_EXPORT inline void Finalise();
LIB_UTILITIES_EXPORT inline void* GetComm();
/// Returns number of processes
LIB_UTILITIES_EXPORT inline int GetSize();
LIB_UTILITIES_EXPORT inline int GetRank();
......@@ -126,15 +128,21 @@ public:
LIB_UTILITIES_EXPORT inline bool TreatAsRankZero(void);
LIB_UTILITIES_EXPORT inline bool RemoveExistingFiles(void);
LIB_UTILITIES_EXPORT inline int EnrolSpare();
LIB_UTILITIES_EXPORT inline bool IsRecovering();
LIB_UTILITIES_EXPORT inline void MarkRecoveryComplete();
protected:
int m_size; ///< Number of processes
std::string m_type; ///< Type of communication
CommSharedPtr m_commRow; ///< Row communicator
CommSharedPtr m_commColumn; ///< Column communicator
bool m_isRecovering; ///< True if we are undergoing recovery from failed process
Comm();
virtual void v_Finalise() = 0;
virtual void* v_GetComm() = 0;
virtual int v_GetRank() = 0;
virtual void v_Block() = 0;
virtual NekDouble v_Wtime() = 0;
......@@ -172,6 +180,8 @@ protected:
virtual void v_SplitComm(int pRows, int pColumns) = 0;
virtual bool v_TreatAsRankZero(void) = 0;
LIB_UTILITIES_EXPORT virtual bool v_RemoveExistingFiles(void);
virtual int v_EnrolSpare() = 0;
};
/**
......@@ -182,6 +192,14 @@ inline void Comm::Finalise()
v_Finalise();
}
/**
*
*/
inline void* Comm::GetComm()
{
return v_GetComm();
}
/**
*
*/
......@@ -431,6 +449,21 @@ inline bool Comm::RemoveExistingFiles(void)
{
return v_RemoveExistingFiles();
}
inline int Comm::EnrolSpare()
{
return v_EnrolSpare();
}
inline bool Comm::IsRecovering()
{
return m_isRecovering;
}
inline void Comm::MarkRecoveryComplete()
{
m_isRecovering = false;
}
}
}
......
......@@ -46,6 +46,7 @@ namespace LibUtilities
{
std::string CommMpi::className = GetCommFactory().RegisterCreatorFunction(
"ParallelMPI", CommMpi::create, "Parallel communication using MPI.");
int CommMpi::nSpares = 1;
/**
*
......@@ -62,9 +63,54 @@ CommMpi::CommMpi(int narg, char *arg[]) : Comm(narg, arg)
ASSERTL0(false, "Failed to initialise MPI");
}
m_comm = MPI_COMM_WORLD;
MPI_Comm_size(m_comm, &m_size);
MPI_Comm_rank(m_comm, &m_rank);
int worldSize;
int worldRank;
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
// Set MPI to call our error handler on failrue
MPI_Errhandler errh;
MPI_Comm_create_errhandler(HandleMpiError, &errh);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, errh);
MPI_Comm_dup(MPI_COMM_WORLD, &m_agreecomm);
MPI_Comm_set_errhandler(m_agreecomm, MPI_ERRORS_RETURN);
// Decide if we are a spare
int spare = (worldRank > worldSize - nSpares - 1)? MPI_UNDEFINED : 1;
// Create a communicator without the spares
MPI_Comm_split( MPI_COMM_WORLD, spare, worldRank, &m_comm );
// If we are a spare, sit and wait
if ( MPI_COMM_NULL == m_comm )
{
std::cout << "Im a spare...rank " << worldRank << std::endl;
do
{
// Always ready to complete
int completed = 1;
int x = MPIX_Comm_agree( m_agreecomm, &completed );
std::cout << "Return value from comm agree is " << x << std::endl;
if( completed )
{
std::cout << "Spare process invoking Finalize" << std::endl;
MPI_Finalize();
exit(0);
}
std::cout << "Spare process about to enroll" << std::endl;
EnrolSpare();
std::cout << "Completed enroll" << std::endl;
} while ( MPI_COMM_NULL == m_comm );
MPI_Comm_size(m_comm, &m_size);
MPI_Comm_rank(m_comm, &m_rank);
}
else
{
std::cout << "Active process: rank " << worldRank << std::endl;
MPI_Comm_size(m_comm, &m_size);
MPI_Comm_rank(m_comm, &m_rank);
}
#ifdef NEKTAR_USING_PETSC
PetscInitializeNoArguments();
......@@ -92,13 +138,6 @@ CommMpi::~CommMpi()
{
}
/**
*
*/
MPI_Comm CommMpi::GetComm()
{
return m_comm;
}
/**
*
......@@ -112,10 +151,25 @@ void CommMpi::v_Finalise()
MPI_Finalized(&flag);
if (!flag)
{
if ( MPI_COMM_NULL != m_comm )
{
std::cout << "Non-spare process invoked Finalize" << std::endl;
int completed = 1;
MPIX_Comm_agree(MPI_COMM_WORLD, &completed);
}
MPI_Comm_free(&m_comm);
MPI_Finalize();
}
}
/**
*
*/
void* CommMpi::v_GetComm()
{
return (void*)(m_comm);
}
/**
*
*/
......@@ -145,7 +199,11 @@ bool CommMpi::v_TreatAsRankZero(void)
*/
void CommMpi::v_Block()
{
MPI_Barrier(m_comm);
int rc = MPI_Barrier(m_comm);
if (rc != MPI_SUCCESS)
{
throw 1;
}
}
/**
......@@ -370,5 +428,127 @@ CommSharedPtr CommMpi::v_CommCreateIf(int flag)
return boost::shared_ptr<Comm>(new CommMpi(newComm));
}
}
int CommMpi::v_EnrolSpare()
{
// Unlock spares so they join us
if (MPI_COMM_NULL != m_comm)
{
int completed = 0;
int x = MPIX_Comm_agree(m_agreecomm, &completed);
std::cout << "Return value from comm agree is " << x << std::endl;
}
// First remove the dead process
MPI_Comm scomm, newcomm;
int rc, flag, ssize, srank, oldsize, oldrank, dsize, drank;
MPIX_Comm_shrink(MPI_COMM_WORLD, &scomm);
MPI_Comm_set_errhandler( scomm, MPI_ERRORS_RETURN );
MPI_Comm_size(scomm, &ssize);
MPI_Comm_rank(scomm, &srank);
// Keep trying until we succeed without further failures
do
{
// I am a surviving rank, work out which ranks failed
if (MPI_COMM_NULL != m_comm)
{
// Get our old rank and size
MPI_Comm_size(m_comm, &oldsize);
MPI_Comm_rank(m_comm, &oldrank);
// First check we have enough spares left to replace all those
// which have failed
if ( oldsize > ssize )
{
MPI_Abort(MPI_COMM_WORLD, MPI_ERR_PROC_FAILED);
}
// Let rank 0 in the shrunk comm determine new assignments
if ( 0 == srank )
{
MPI_Group cgrp, sgrp, dgrp;
// Get the group of dead processes
MPI_Comm_group(m_comm, &cgrp);
MPI_Comm_group(scomm, &sgrp);
MPI_Group_difference(cgrp, sgrp, &dgrp);
MPI_Group_size(dgrp, &dsize);
for(int i = 0; i < ssize - (oldsize - dsize); i++) {
if( i < dsize )
{
MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank);
}
else
{
drank=-1; /* still a spare */
}
// send their new assignment to all spares
MPI_Send(&drank, 1, MPI_INT, i + oldsize - dsize, 1, scomm);
}
MPI_Group_free(&cgrp);
MPI_Group_free(&sgrp);
MPI_Group_free(&dgrp);
}
}
// I am a spare waiting for my assignment
else
{
MPI_Recv(&oldrank, 1, MPI_INT, 0, 1, scomm, MPI_STATUS_IGNORE);
std::cout << "Spare received assignment: " << oldrank << std::endl;
}
// Remove dead process and reassign spare processes to these ranks
rc = MPI_Comm_split(scomm, oldrank < 0 ? MPI_UNDEFINED : 1, oldrank, &newcomm);
flag = MPIX_Comm_agree(scomm, &flag);
MPI_Comm_free(&scomm);
if( MPI_SUCCESS != flag ) {
if( MPI_SUCCESS == rc )
{
MPI_Comm_free(&newcomm);
}
}
} while ( MPI_SUCCESS != flag );
// Replace the original comm
if (MPI_COMM_NULL != m_comm)
{
MPI_Comm_free(&m_comm);
}
m_comm = newcomm;
m_isRecovering = true;
return MPI_SUCCESS;
}
static void CommMpi::HandleMpiError(MPI_Comm* pcomm, int* perr, ...)
{
MPI_Comm comm = *pcomm;
int err = *perr;
// Get type of error and check if it is a proc failed.
int eclass;
MPI_Error_class(err, &eclass);
if (MPIX_ERR_PROC_FAILED != eclass)
{
std::cout << "An non-proc-failed MPI error occured..." << std::endl;
//MPI_Abort(comm, err);
}
int len;
char errstr[MPI_MAX_ERROR_STRING];
MPI_Error_string(err, errstr, &len);
std::cout << "An MPI error occured: " << errstr << std::endl;
throw 1; //UlfmFailureDetected(std::string(errstr));
}
}
}
......@@ -36,6 +36,7 @@
#define NEKTAR_LIB_UTILITIES_COMMMPI_H
#include <mpi.h>
#include <mpi-ext.h>
#include <string>
#include <LibUtilities/Communication/Comm.h>
......@@ -57,7 +58,26 @@ class CommMpi;
/// Pointer to a Communicator object.
typedef boost::shared_ptr<CommMpi> CommMpiSharedPtr;
/// A global linear system.
/**
* Exception class for Ulfm-detected failure
*/
class UlfmFailureDetected: public std::runtime_error
{
public:
UlfmFailureDetected(std::string && pError)
: std::runtime_error(pError.c_str()), m_error(pError) {}
private:
virtual const char* what() const throw()
{
return m_error.c_str();
}
std::string m_error;
};
/// A communicator which uses MPI
class CommMpi : public Comm
{
public:
......@@ -70,13 +90,14 @@ public:
/// Name of class
static std::string className;
static int nSpares;
CommMpi(int narg, char *arg[]);
virtual ~CommMpi();
MPI_Comm GetComm();
protected:
virtual void v_Finalise();
virtual void* v_GetComm();
virtual int v_GetRank();
virtual void v_Block();
virtual double v_Wtime();
......@@ -112,10 +133,15 @@ protected:
virtual void v_SplitComm(int pRows, int pColumns);
virtual CommSharedPtr v_CommCreateIf(int flag);
virtual int v_EnrolSpare();
private:
MPI_Comm m_comm;
MPI_Comm m_agreecomm;
int m_rank;
static void HandleMpiError(MPI_Comm* pcomm, int* perr, ...);
CommMpi(MPI_Comm pComm);
};
}
......
......@@ -69,6 +69,15 @@ void CommSerial::v_Finalise()
#endif
}
/**
*
*/
void* CommSerial::v_GetComm()
{
ASSERTL0(false, "Error: Do not call GetComm when in serial. Check with NEKTAR_USE_MPI def.");
return 0;
}
/**
*
*/
......
......@@ -69,6 +69,7 @@ public:
protected:
LIB_UTILITIES_EXPORT virtual void v_Finalise();
LIB_UTILITIES_EXPORT virtual void* v_GetComm();
LIB_UTILITIES_EXPORT virtual int v_GetRank();
LIB_UTILITIES_EXPORT virtual bool v_TreatAsRankZero(void);
......@@ -113,6 +114,8 @@ protected:
LIB_UTILITIES_EXPORT virtual void v_SplitComm(int pRows, int pColumns);
LIB_UTILITIES_EXPORT virtual CommSharedPtr v_CommCreateIf(int flag);
LIB_UTILITIES_EXPORT virtual int v_EnrolSpare() {return 0;}
};
}
}
......
......@@ -41,7 +41,7 @@
#include <LibUtilities/BasicConst/NektarUnivTypeDefs.hpp>
#include <LibUtilities/BasicUtils/SharedArray.hpp>
#ifdef NEKTAR_USE_MPI
#include <LibUtilities/Communication/CommMpi.h>
#include <LibUtilities/Communication/Comm.h>
#endif
namespace Gs
......@@ -172,13 +172,13 @@ static inline gs_data *Init(const Nektar::Array<OneD, long> pId,
{
return 0;
}
LibUtilities::CommMpiSharedPtr vCommMpi =
boost::dynamic_pointer_cast<LibUtilities::CommMpi>(pComm);
ASSERTL1(vCommMpi, "Failed to cast MPI Comm object.");
//LibUtilities::CommMpiSharedPtr vCommMpi =
// boost::dynamic_pointer_cast<LibUtilities::CommMpi>(pComm);
//ASSERTL1(vCommMpi, "Failed to cast MPI Comm object.");
comm vComm;
MPI_Comm_dup(vCommMpi->GetComm(), &vComm.c);
vComm.id = vCommMpi->GetRank();
vComm.np = vCommMpi->GetSize();
MPI_Comm_dup((MPI_Comm)(pComm->GetComm()), &vComm.c);
vComm.id = pComm->GetRank();
vComm.np = pComm->GetSize();
gs_data *result = nektar_gs_setup(pId.get(), pId.num_elements(), &vComm, 0,
gs_auto, (int)verbose);
MPI_Comm_free(&vComm.c);
......@@ -205,13 +205,13 @@ static inline void Unique(const Nektar::Array<OneD, long> pId,
{
return;
}
LibUtilities::CommMpiSharedPtr vCommMpi =
boost::dynamic_pointer_cast<LibUtilities::CommMpi>(pComm);
ASSERTL1(vCommMpi, "Failed to cast MPI Comm object.");
// LibUtilities::CommMpiSharedPtr vCommMpi =
// boost::dynamic_pointer_cast<LibUtilities::CommMpi>(pComm);
// ASSERTL1(vCommMpi, "Failed to cast MPI Comm object.");
comm vComm;
vComm.c = vCommMpi->GetComm();
vComm.id = vCommMpi->GetRank();
vComm.np = vCommMpi->GetSize();
vComm.c = (MPI_Comm)(pComm->GetComm());
vComm.id = pComm->GetRank();
vComm.np = pComm->GetSize();
nektar_gs_unique(pId.get(), pId.num_elements(), &vComm);
#endif
}
......
......@@ -33,6 +33,8 @@
//
///////////////////////////////////////////////////////////////////////////////
#include <signal.h>
#include <LibUtilities/BasicUtils/SessionReader.h>
#include <LibUtilities/BasicUtils/FieldIO.h>
#include <SpatialDomains/MeshGraph.h>
......@@ -87,16 +89,39 @@ int main(int argc, char *argv[])
// Zero field coefficients for initial guess for linear solver.
Vmath::Zero(field->GetNcoeffs(), field->UpdateCoeffs(), 1);
//BackupStaticState();
// Time integrate using backward Euler
for (unsigned int n = 0; n < nSteps; ++n)
{
try {
//BackupDynamicState();
if (session->GetComm()->GetRank() == 1 && n == 5) {
raise( SIGKILL );
}
Vmath::Smul(nq, -1.0/delta_t/epsilon, field->GetPhys(), 1,
field->UpdatePhys(), 1);
field->HelmSolve(field->GetPhys(), field->UpdateCoeffs(),
NullFlagList, factors);
//field->HelmSolve(field->GetPhys(), field->UpdateCoeffs(),
// NullFlagList, factors);
session->GetComm()->Block();
field->BwdTrans(field->GetCoeffs(), field->UpdatePhys());
} catch (...) {
try {
cout << "Caught an error - trying to invoke a spare." << endl;
int x = session->GetComm()->EnrolSpare();
cout << "Enroled spare, result: " << x << endl;
} catch (...) {
cout << "ERROR WHEN PERFORMING ENROLSPARE!!!" << endl;
}
//RestoreStaticState();
//RestoreDynamicState();
}
}
// Write solution to file
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment