Commit d48920a5 authored by Chris Cantwell's avatar Chris Cantwell

Added option to specify number of spares on command-line.

Added a bit more debugging code.
parent 1bc367cb
......@@ -365,6 +365,7 @@ namespace Nektar
po::options_description desc("Allowed options");
desc.add_options()
("verbose,v", "be verbose")
("spares", po::value<int>(), "number of spares")
("version,V", "print version information")
("help,h", "print this help message")
("solverinfo,I", po::value<vector<std::string> >(),
......
......@@ -50,6 +50,9 @@ using namespace std;
#include <boost/serialization/map.hpp>
namespace mpi = boost::mpi;
#include <boost/program_options.hpp>
namespace po = boost::program_options;
#include <LibUtilities/BasicUtils/SharedArray.hpp>
#include <LibUtilities/Communication/CommMpi.h>
......@@ -60,15 +63,26 @@ namespace LibUtilities
{
std::string CommMpi::className = GetCommFactory().RegisterCreatorFunction(
"ParallelMPI", CommMpi::create, "Parallel communication using MPI.");
int CommMpi::nSpares = 1;
/**
*
*/
CommMpi::CommMpi(int narg, char *arg[]) : Comm(narg, arg)
{
po::options_description desc("Allowed options");
desc.add_options()("spares", po::value<int>(), "number of spares");
po::parsed_options parsed = po::command_line_parser(narg, arg).
options(desc).
allow_unregistered().
run();
po::variables_map opts;
po::store(parsed, opts);
po::notify(opts);
m_isLogging = false;
m_isRecovering = false;
m_nSpares = opts["spares"].as<int>();
int init = 0;
MPI_Initialized(&init);
......@@ -94,7 +108,7 @@ CommMpi::CommMpi(int narg, char *arg[]) : Comm(narg, arg)
MPI_Comm_set_errhandler(m_agreecomm, MPI_ERRORS_RETURN);
// Decide if we are a spare
int spare = (worldRank > worldSize - nSpares - 1)? MPI_UNDEFINED : 1;
int spare = (worldRank > worldSize - m_nSpares - 1)? MPI_UNDEFINED : 1;
// Create a communicator without the spares
MPI_Comm_split( MPI_COMM_WORLD, spare, worldRank, &m_comm );
......@@ -268,9 +282,18 @@ void CommMpi::v_Send(void *buf, int count, CommDataType dt, int dest)
*/
void CommMpi::v_Recv(void *buf, int count, CommDataType dt, int source)
{
if (m_isRecovering)
int dtsize;
MPI_Type_size(dt, &dtsize);
if (m_isRecovering)
{
cout << "IMPLEMENTATION NEEDED" << endl;
ASSERTL0(!m_data.empty(), "QUEUE IS EMPTY!!");
std::vector<char> x = m_data.front();
m_data.pop();
memcpy(buf, &x[0], count*dtsize);
return;
}
cout << "Recv" << endl;
MPI_Recv(buf, count, dt, source, 0, m_comm, MPI_STATUS_IGNORE);
......@@ -473,12 +496,13 @@ GsHandle CommMpi::v_GsInit(const Nektar::Array<OneD, long> pId,
int count = pId.num_elements();
int dtsize;
MPI_Type_size(dt, &dtsize);
cout << "GsInit: There are " << m_gsHandles.size() << " handles." << endl;
if (m_isRecovering)
{
cout << "GsInit: Recovering handle " << m_gsHandlesRestoreIt - m_gsHandles.begin() << endl;
GsHandle x;
x.comm = shared_from_this();
x.idx = (m_gsHandlesRestoreIt++) - m_gsHandles.begin();
x.idx = (m_gsHandlesRestoreIt++) - m_gsHandles.begin();
return x;
}
else
......@@ -493,6 +517,7 @@ GsHandle CommMpi::v_GsInit(const Nektar::Array<OneD, long> pId,
m_gsInitData.push(x);
cout << "GsInit: Appended " << count*dtsize << " bytes of data, array size: " << count << endl;
}
cout << "GsInit: Creating GS handle." << endl;
Gs::gs_data * handle = Gs::Init(pId, m_comm, verbose);
......@@ -571,6 +596,7 @@ cout << "GsGather: Recover " << count << " values." << endl;
}
else
{
cout << "Performing GS Gather operation." << endl;
Gs::Gather(pU, pOp, m_gsHandles[pGsh], pBuffer);
if (m_isLogging)
......@@ -800,11 +826,14 @@ int CommMpi::v_EnrolSpare()
}
m_comm = newcomm;
// Update rank and size
MPI_Comm_rank(m_comm, &m_rank);
MPI_Comm_size(m_comm, &m_size);
if (m_comm != MPI_COMM_NULL)
{
// Update rank and size
MPI_Comm_rank(m_comm, &m_rank);
MPI_Comm_size(m_comm, &m_size);
RestoreState();
RestoreState();
}
return MPI_SUCCESS;
}
......@@ -979,19 +1008,21 @@ cout << "Colour is " << colour << endl;
int n = m_gsInitData.size();
StorageType vInitData = m_gsInitData;
StorageType vBackupData = m_gsInitDataBackup;
cout << "Restoring " << n << " GS handles" << endl;
// All processes get fresh handles to GS data structure
m_gsHandles.clear();
// Update handles to GS data structure
for (int i = 0; i < n; ++i)
{
std::vector<char> x = vInitData.front();
vInitData.pop();
std::vector<char> y = vBackupData.front();
vBackupData.pop();
int count = x.size() / dtsize;
Array<OneD, long> pId(count);
cout << "Data size was " << x.size() << ", Array size: " << count << endl;
cout << "Backup size was " << y.size() << endl;
if (count > 0)
{
memcpy(&pId[0], &x[0], x.size());
......@@ -999,7 +1030,14 @@ cout << "Data size was " << x.size() << ", Array size: " << count << endl;
Gs::gs_data * handle = Gs::Init(pId, m_comm, false);
m_gsHandles.push_back(handle);
if (m_isRecovering) {
cout << "New handle" << endl;
m_gsHandles.push_back(handle);
}
else {
cout << "Replaced handle" << endl;
m_gsHandles[i] = handle;
}
}
m_gsHandlesRestoreIt = m_gsHandles.begin();
}
......@@ -1011,7 +1049,8 @@ static void CommMpi::HandleMpiError(MPI_Comm* pcomm, int* perr, ...)
// Get type of error and check if it is a proc failed.
int eclass;
MPI_Error_class(err, &eclass);
if (MPIX_ERR_PROC_FAILED != eclass)
if (MPIX_ERR_PROC_FAILED != eclass &&
MPIX_ERR_REVOKED != eclass)
{
std::cout << "An non-proc-failed MPI error occured..." << std::endl;
//MPI_Abort(comm, err);
......@@ -1023,6 +1062,8 @@ static void CommMpi::HandleMpiError(MPI_Comm* pcomm, int* perr, ...)
std::cout << "An MPI error occured: " << errstr << std::endl;
MPIX_Comm_revoke(*pcomm);
throw 1; //UlfmFailureDetected(std::string(errstr));
}
......
......@@ -94,8 +94,6 @@ public:
/// Name of class
static std::string className;
static int nSpares;
CommMpi(int narg, char *arg[]);
virtual ~CommMpi();
......@@ -169,6 +167,7 @@ private:
MPI_Comm m_comm;
MPI_Comm m_agreecomm;
int m_rank;
int m_nSpares;
bool m_isRecovering; ///< True if we are undergoing recovery from failed process
bool m_isLogging; ///< True if logging MPI output
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment