Commit 33eaba6e authored by Chris Cantwell's avatar Chris Cantwell

Recovery of sub-communicators (in comms with more than one rank) works.

Temporarily disabled splitcomm for strips.
parent e7987788
......@@ -1910,11 +1910,11 @@ namespace Nektar
int nProcSem = m_comm->GetSize() / nProcSm;
m_comm->SplitComm(nProcSm,nProcSem);
m_comm->GetColumnComm()->SplitComm(nProcZ/nStripZ,nStripZ);
m_comm->GetColumnComm()->GetColumnComm()->SplitComm(
(nProcY*nProcX),nProcZ/nStripZ);
m_comm->GetColumnComm()->GetColumnComm()->GetColumnComm()
->SplitComm(nProcX,nProcY);
// m_comm->GetColumnComm()->SplitComm(nProcZ/nStripZ,nStripZ);
// m_comm->GetColumnComm()->GetColumnComm()->SplitComm(
// (nProcY*nProcX),nProcZ/nStripZ);
// m_comm->GetColumnComm()->GetColumnComm()->GetColumnComm()
// ->SplitComm(nProcX,nProcY);
}
}
......
......@@ -42,7 +42,6 @@ namespace LibUtilities
{
Comm::Comm(int narg, char *arg[])
{
m_isRecovering = false;
}
Comm::Comm()
......
......@@ -119,7 +119,7 @@ public:
template <class T> T Gather(const int rootProc, T &val);
template <class T> T Scatter(const int rootProc, T &pData);
LIB_UTILITIES_EXPORT inline CommSharedPtr CommCreateIf(int flag);
LIB_UTILITIES_EXPORT inline CommSharedPtr CommCreateIf(int colour);
LIB_UTILITIES_EXPORT inline void SplitComm(int pRows, int pColumns);
LIB_UTILITIES_EXPORT inline CommSharedPtr GetRowComm();
......@@ -129,39 +129,14 @@ public:
LIB_UTILITIES_EXPORT inline bool RemoveExistingFiles(void);
LIB_UTILITIES_EXPORT inline int EnrolSpare();
LIB_UTILITIES_EXPORT inline bool IsRecovering();
LIB_UTILITIES_EXPORT inline void MarkRecoveryComplete();
LIB_UTILITIES_EXPORT inline void BeginTransactionLog()
{
m_isLogging = true;
}
LIB_UTILITIES_EXPORT inline void EndTransactionLog()
{
m_isLogging = false;
if (m_isRecovering)
{
m_isRecovering = false;
for (int i = 0; i < m_derivedComm.size(); ++i)
{
m_derivedComm[i]->m_isRecovering = false;
}
}
v_BackupState();
}
LIB_UTILITIES_EXPORT inline void BeginTransactionLog();
LIB_UTILITIES_EXPORT inline void EndTransactionLog();
protected:
typedef std::vector<CommSharedPtr> DerivedCommType;
typedef std::vector<int> DerivedCommFlagType;
int m_size; ///< Number of processes
std::string m_type; ///< Type of communication
CommSharedPtr m_commRow; ///< Row communicator
CommSharedPtr m_commColumn; ///< Column communicator
bool m_isRecovering; ///< True if we are undergoing recovery from failed process
bool m_isLogging; ///< True if logging MPI output
DerivedCommType m_derivedComm;
DerivedCommFlagType m_derivedCommFlag;
int m_derivedRecoverIndex;
Comm();
......@@ -200,17 +175,15 @@ protected:
void *recvbuf, int recvcount, CommDataType recvtype,
int root) = 0;
virtual CommSharedPtr v_CommCreateIf(int flag) = 0;
virtual CommSharedPtr v_CommCreateIf(int colour) = 0;
virtual void v_SplitComm(int pRows, int pColumns) = 0;
virtual bool v_TreatAsRankZero(void) = 0;
LIB_UTILITIES_EXPORT virtual bool v_RemoveExistingFiles(void);
virtual int v_EnrolSpare() = 0;
virtual bool v_IsRecovering() {return m_isRecovering;}
virtual void v_BackupState() = 0;
virtual void v_BeginTransactionLog() {}
virtual void v_EndTransactionLog() {}
public:
virtual void v_ReplaceComm(void* commptr) {}
};
/**
......@@ -423,14 +396,9 @@ template <class T> T Comm::Scatter(const int rootProc, T &pData)
/**
* @brief If the flag is non-zero create a new communicator.
*/
inline CommSharedPtr Comm::CommCreateIf(int flag)
inline CommSharedPtr Comm::CommCreateIf(int colour)
{
CommSharedPtr c = v_CommCreateIf(flag);
if (m_isRecovering)
{
c->m_isRecovering = true;
}
return c;
return v_CommCreateIf(colour);
}
/**
......@@ -489,15 +457,16 @@ inline int Comm::EnrolSpare()
return v_EnrolSpare();
}
inline bool Comm::IsRecovering()
inline void Comm::BeginTransactionLog()
{
return m_isRecovering;
v_BeginTransactionLog();
}
inline void Comm::MarkRecoveryComplete()
inline void Comm::EndTransactionLog()
{
m_isRecovering = false;
v_EndTransactionLog();
}
}
}
......
......@@ -42,6 +42,7 @@ using namespace std;
#include <boost/mpi/environment.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/nonblocking.hpp>
#include <boost/serialization/string.hpp>
#include <boost/serialization/queue.hpp>
#include <boost/serialization/deque.hpp>
......@@ -65,6 +66,7 @@ int CommMpi::nSpares = 1;
*/
CommMpi::CommMpi(int narg, char *arg[]) : Comm(narg, arg)
{
m_isLogging = false;
m_isRecovering = false;
int init = 0;
......@@ -143,6 +145,8 @@ CommMpi::CommMpi(MPI_Comm pComm) : Comm()
MPI_Comm_rank(m_comm, &m_rank);
m_type = "Parallel MPI";
m_isRecovering = false;
m_isLogging = false;
}
/**
......@@ -214,7 +218,7 @@ bool CommMpi::v_TreatAsRankZero(void)
*/
void CommMpi::v_Block()
{
if (IsRecovering())
if (m_isRecovering)
{
return;
}
......@@ -239,7 +243,7 @@ double CommMpi::v_Wtime()
*/
void CommMpi::v_Send(void *buf, int count, CommDataType dt, int dest)
{
if (IsRecovering())
if (m_isRecovering)
{
return;
}
......@@ -259,7 +263,7 @@ void CommMpi::v_Send(void *buf, int count, CommDataType dt, int dest)
*/
void CommMpi::v_Recv(void *buf, int count, CommDataType dt, int source)
{
if (IsRecovering())
if (m_isRecovering)
{
cout << "IMPLEMENTATION NEEDED" << endl;
}
......@@ -464,9 +468,18 @@ void CommMpi::v_Scatter(void *sendbuf, int sendcount, CommDataType sendtype,
*/
void CommMpi::v_SplitComm(int pRows, int pColumns)
{
cout << "START SPLITCOMM" << endl;
if (m_isRecovering)
{
cout << "Assuming split comm already sorted by enrolspare." << endl;
cout << "Recovering row and column comm" << endl;
m_commRow = m_derivedComm.front();
m_derivedComm.pop_front();
m_commColumn = m_derivedComm.front();
m_derivedComm.pop_front();
ASSERTL1(m_commRow->GetSize() == pColumns, "Row size does not match.");
ASSERTL1(m_commColumn->GetSize() == pRows, "Column size does not match.");
cout << "END SPLITCOMM (Recovering)" << endl;
return;
}
......@@ -483,13 +496,27 @@ void CommMpi::v_SplitComm(int pRows, int pColumns)
// the same communicator. The rank within this communicator is the
// column index.
MPI_Comm_split(m_comm, myRow, myCol, &newComm);
m_commRow = boost::shared_ptr<Comm>(new CommMpi(newComm));
CommMpiSharedPtr commRowMpi = boost::shared_ptr<CommMpi>(new CommMpi(newComm));
m_commRow = commRowMpi;
cout << "Original colour is: " << myRow << endl;
// Split Comm into columns - all processes with same myCol are put
// in the same communicator. The rank within this communicator is
// the row index.
MPI_Comm_split(m_comm, myCol, myRow, &newComm);
m_commColumn = boost::shared_ptr<Comm>(new CommMpi(newComm));
CommMpiSharedPtr commColumnMpi = boost::shared_ptr<CommMpi>(new CommMpi(newComm));
m_commColumn = commColumnMpi;
cout << "Original colour is: " << myCol << endl;
if (m_isLogging)
{
cout << "Logging split comm" << endl;
m_derivedCommFlag.push(myRow);
m_derivedComm.push_back(commRowMpi);
m_commRow->BeginTransactionLog();
m_derivedCommFlag.push(myCol);
m_derivedComm.push_back(commColumnMpi);
m_commColumn->BeginTransactionLog();
}
cout << "END SPLITCOMM" << endl;
}
/**
......@@ -497,10 +524,12 @@ void CommMpi::v_SplitComm(int pRows, int pColumns)
*/
CommSharedPtr CommMpi::v_CommCreateIf(int flag)
{
CommSharedPtr c;
cout << "START CREATECOMMIF" << endl;
CommMpiSharedPtr c;
if (m_isRecovering)
{
c = m_derivedComm[m_derivedRecoverIndex++];
c = m_derivedComm.front();
m_derivedComm.pop_front();
}
else
{
......@@ -510,20 +539,28 @@ CommSharedPtr CommMpi::v_CommCreateIf(int flag)
// implies this is faster than ordering them ourselves.
MPI_Comm_split(m_comm, flag ? 0 : MPI_UNDEFINED, 0, &newComm);
if (flag == 0)
if (newComm == MPI_COMM_NULL)
{
// flag == 0 => get back MPI_COMM_NULL, return a null ptr instead.
c = boost::shared_ptr<Comm>();
c = boost::shared_ptr<CommMpi>();
}
else
{
// Return a real communicator
c = boost::shared_ptr<Comm>(new CommMpi(newComm));
c = boost::shared_ptr<CommMpi>(new CommMpi(newComm));
}
m_derivedComm.push_back(c);
m_derivedCommFlag.push_back(flag);
if (m_isLogging)
{
if (c.get())
{
c->BeginTransactionLog();
}
m_derivedCommFlag.push(flag ? 0 : MPI_UNDEFINED);
m_derivedComm.push_back(c);
}
}
cout << "END CREATECOMMIF" << endl;
return c;
}
......@@ -549,11 +586,6 @@ int CommMpi::v_EnrolSpare()
MPI_Comm_size(scomm, &ssize);
MPI_Comm_rank(scomm, &srank);
// Get number of rows and columns if split comm
int pRows = (m_commColumn.get() ? m_commColumn->GetSize() : 0);
int pColumns = (m_commRow.get() ? m_commRow->GetSize() : 0);
int pDerived = m_derivedComm.size();
// Keep trying until we succeed without further failures
do
{
......@@ -571,59 +603,60 @@ int CommMpi::v_EnrolSpare()
MPI_Abort(MPI_COMM_WORLD, MPI_ERR_PROC_FAILED);
}
MPI_Group cgrp, sgrp, dgrp;
// Get the group of dead processes
MPI_Comm_group(m_comm, &cgrp);
MPI_Comm_group(scomm, &sgrp);
MPI_Group_difference(cgrp, sgrp, &dgrp);
MPI_Group_size(dgrp, &dsize);
// Let rank 0 in the shrunk comm determine new assignments
if ( 0 == srank )
{
MPI_Group cgrp, sgrp, dgrp;
// Get the group of dead processes
MPI_Comm_group(m_comm, &cgrp);
MPI_Comm_group(scomm, &sgrp);
MPI_Group_difference(cgrp, sgrp, &dgrp);
MPI_Group_size(dgrp, &dsize);
// Give every spare rank a new assignment
// [ C C C C C C C C C C ]
// [ C C C D C D C S S S ]
for(int i = 0; i < ssize - (oldsize - dsize); i++) {
// Assign spares to cover the dead ranks
// For each rank in the dead-group, we find the corresponding
// rank in the full communicator
if( i < dsize )
{
MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank);
}
// Any additional spares we have will not be in the newly
// created communicator created shortly.
else
{
drank=-1; /* still a spare */
}
// send their new assignment to all spares
MPI_Send(&drank, 1, MPI_INT, i + oldsize - dsize, 1, scomm);
MPI_Send(&pRows, 1, MPI_INT, i + oldsize - dsize, 2, scomm);
MPI_Send(&pColumns, 1, MPI_INT, i + oldsize - dsize, 3, scomm);
MPI_Send(&pDerived, 1, MPI_INT, i + oldsize - dsize, 4, scomm);
MPI_Send(&m_derivedCommFlag[0], pDerived,
MPI_INT, i + oldsize - dsize, 5, scomm);
}
MPI_Group_free(&cgrp);
MPI_Group_free(&sgrp);
MPI_Group_free(&dgrp);
}
MPI_Group_free(&cgrp);
MPI_Group_free(&sgrp);
MPI_Group_free(&dgrp);
}
// I am a spare waiting for my assignment
else
{
MPI_Recv(&oldrank, 1, MPI_INT, 0, 1, scomm, MPI_STATUS_IGNORE);
MPI_Recv(&pRows, 1, MPI_INT, 0, 2, scomm, MPI_STATUS_IGNORE);
MPI_Recv(&pColumns, 1, MPI_INT, 0, 3, scomm, MPI_STATUS_IGNORE);
MPI_Recv(&pDerived, 1, MPI_INT, 0, 4, scomm, MPI_STATUS_IGNORE);
m_derivedCommFlag.resize(pDerived);
MPI_Recv(&m_derivedCommFlag[0], pDerived, MPI_INT, 0, 5, scomm, MPI_STATUS_IGNORE);
m_derivedRecoverIndex = 0;
std::cout << "Spare received assignment: " << oldrank << std::endl;
std::cout << "Split: " << pRows << " x " << pColumns << endl;
std::cout << "Num of derived comms: " << pDerived << endl;
m_isRecovering = true;
}
// Remove dead process and reassign spare processes to these ranks
// oldrank contains the original rank for those processes which are
// still alive, and contains the new assignment for spares which are
// needed to replace dead processes.
//
// Spares which stay spare are assigned MPI_UNDEFINED and therefore
// are not in the new communicator.
rc = MPI_Comm_split(scomm, oldrank < 0 ? MPI_UNDEFINED : 1, oldrank, &newcomm);
flag = MPIX_Comm_agree(scomm, &flag);
......@@ -648,113 +681,162 @@ int CommMpi::v_EnrolSpare()
MPI_Comm_rank(m_comm, &m_rank);
MPI_Comm_size(m_comm, &m_size);
// Fix split comm
// --------------
// First, free old row and column communicators
if (m_commColumn.get())
RestoreState();
return MPI_SUCCESS;
}
void CommMpi::BackupState()
{
mpi::communicator c(m_comm, mpi::comm_attach);
mpi::request reqs[4];
int rank = c.rank();
int size = c.size();
if (size > 1)
{
m_commColumn.reset();
int recv_rank = (rank + size - 1) % size;
int send_rank = (rank + 1) % size;
cout << "Backup: Sending " << m_data.size() << " items in queue." << endl;
reqs[0] = c.isend(send_rank, 0, m_data);
reqs[1] = c.isend(send_rank, 1, m_derivedCommFlag);
reqs[2] = c.irecv(recv_rank, 0, m_dataBackup);
reqs[3] = c.irecv(recv_rank, 1, m_derivedCommFlagBackup);
cout << "Backup: Sent to " << send_rank << endl;
cout << "Backup: Waiting for data from " << recv_rank << endl;
mpi::wait_all(reqs, reqs + 4);
cout << "Backup: Received " << m_dataBackup.size() << " items." << endl;
cout << "Backup: Received " << m_derivedCommFlagBackup.size() << " derived comm flags." << endl;
}
if (m_commRow.get())
else
{
m_commRow.reset();
cout << "Backup: Not backing up as comm of size 1" << endl;
}
}
if (pRows > 0 && pColumns > 0)
void CommMpi::RestoreState()
{
cout << "Start Restore" << endl;
if (m_size > 1)
{
ASSERTL0(pRows * pColumns == m_size,
"Rows/Columns do not match comm size.");
MPI_Comm tmpComm;
// Compute row and column in grid.
int myCol = m_rank % pColumns;
int myRow = (m_rank - myCol) / pColumns;
// Split Comm into rows - all processes with same myRow are put in
// the same communicator. The rank within this communicator is the
// column index.
MPI_Comm_split(m_comm, myRow, myCol, &tmpComm);
m_commRow = boost::shared_ptr<Comm>(new CommMpi(tmpComm));
// Split Comm into columns - all processes with same myCol are put
// in the same communicator. The rank within this communicator is
// the row index.
MPI_Comm_split(m_comm, myCol, myRow, &tmpComm);
m_commColumn = boost::shared_ptr<Comm>(new CommMpi(tmpComm));
cout << " -> More than 1 process" << endl;
mpi::communicator c(m_comm, mpi::comm_attach);
int send_rank = (m_rank + m_size - 1) % m_size;
int recv_rank = (m_rank + 1) % m_size;
// First work out who needs to send recovery data
cout << " -> Determining recovery participants" << endl;
int restore = m_isRecovering ? 1 : 0;
int sendRecoveryData = 0;
mpi::request reqs[2];
// Retrieve recovery state of my buddy process
reqs[0] = c.irecv(send_rank, 0, sendRecoveryData);
// Send my recovery state
reqs[1] = c.isend(recv_rank, 0, restore);
mpi::wait_all(reqs, reqs + 2);
cout << " -> I am rank: " << m_rank << endl;
cout << " -> Send recovery data: " << sendRecoveryData << endl;
cout << " -> Recovering? " << m_isRecovering << endl;
// Perform restore of data and derived comm flags
// Neighbouring surviving processes to recovering processes must send
// a) backup of the recovering process's data for its recovery
// b) replacement copy of this processes backup data
// c) queue of flags for recovering derived communicators on recovering process
const int nReq = 4;
if (sendRecoveryData)
{
cout << "Restore: Sending " << m_dataBackup.size() << " backup items." << endl;
cout << "Restore: Sending my backup copy to " << send_rank << endl;
mpi::request reqs[nReq];
reqs[0] = c.isend(send_rank, 0, m_dataBackup);
reqs[1] = c.isend(send_rank, 1, m_data);
reqs[2] = c.isend(send_rank, 2, m_derivedCommFlagBackup);
reqs[3] = c.isend(send_rank, 3, m_derivedCommFlag);
cout << "Restore: Waiting for data to be sent to " << send_rank << endl;
mpi::wait_all(reqs, reqs + nReq);
cout << "Restore: Complete" << endl;
cout << "Restore: Sent " << m_data.size() << " items in queue." << endl;
}
// The recovering processes receive this data
// They do not need to send anything
else if (m_isRecovering)
{
mpi::request reqs[nReq];
cout << "Restore: Receiving from " << recv_rank << endl;
reqs[0] = c.irecv(recv_rank, 0, m_data);
reqs[1] = c.irecv(recv_rank, 1, m_dataBackup);
reqs[2] = c.irecv(recv_rank, 2, m_derivedCommFlag);
reqs[3] = c.irecv(recv_rank, 3, m_derivedCommFlagBackup);
cout << "Restore: Waiting for data from " << recv_rank << endl;
mpi::wait_all(reqs, reqs + nReq);
cout << "Restore: Complete" << endl;
cout << "Restore: Received " << m_data.size() << " items in queue." << endl;
cout << "Restore: There are " << m_derivedCommFlag.size() << " derived comms" << endl;
}
// Any processes not recovering, and not a neighbour, do not need to
// participate in recovery.
}
// Fix derived comms
// -----------------
for (int i = 0; i < pDerived; ++i)
// All processes must participate in this. This includes recovering the
// m_rowComm and m_columnComm too, as these are also just derived.
DerivedCommFlagType vDerivedFlagRestore = m_derivedCommFlag;
auto derivedCommIt = m_derivedComm.begin();
cout << "About to fix derived comms." << endl;
cout << " -> there are currently " << m_derivedComm.size() << " comms." << endl;
for (int i = 0; i < m_derivedCommFlag.size(); ++i)
{
MPI_Comm tmpComm;
int flag = m_derivedCommFlag[i];
MPI_Comm_split(m_comm, flag ? 0 : MPI_UNDEFINED, 0, &tmpComm);
int colour = vDerivedFlagRestore.front();
vDerivedFlagRestore.pop();
cout << "Colour is " << colour << endl;
MPI_Comm_split(m_comm, colour, 0, &tmpComm);
if (m_isRecovering) // Recovering spare node
{
// Create a derived communicator from scratch
CommSharedPtr c;
if (flag == 0)
CommMpiSharedPtr cmpi;
if (tmpComm == MPI_COMM_NULL)
{
// flag == 0 => get back MPI_COMM_NULL, return a null ptr instead.
c = boost::shared_ptr<Comm>();
cmpi = boost::shared_ptr<CommMpi>();
cout << "Restored a null comm" << endl;
}
else
{
// Return a real communicator
c = boost::shared_ptr<Comm>(new CommMpi(tmpComm));
cmpi = boost::shared_ptr<CommMpi>(new CommMpi(tmpComm));
cmpi->m_isRecovering = true;
cout << "Restored a comm" << endl;
}
m_derivedComm.push_back(c);
m_derivedComm.push_back(cmpi);
cout << "New comm is of size: " << cmpi->GetSize() << endl;
cout << "Restoring its state..." << endl << endl;
cout << "### Restore sub-communicator ###" << endl;
cmpi->RestoreState();
cout << "### Finished restore of sub-communicator ###" << endl << endl;
}
else // surviving node
{
m_derivedComm[i]->v_ReplaceComm((void*)(tmpComm));
CommMpiSharedPtr cmpi = (*derivedCommIt);
cout << "Previously comm is of size: " << cmpi->GetSize() << endl;
cmpi->ReplaceComm(tmpComm);
cout << "Replaced comm is of size: " <<
cmpi->GetSize() << endl;
int size;
MPI_Comm_size(cmpi->m_comm, &size);
cout << "Underlying comm is of size: " << size << endl;
cout << "Restoring its state..." << endl << endl;
cout << "### Restore sub-communicator ###" << endl;
cmpi->RestoreState();
cout << "### Finished restore of sub-communicator ###" << endl << endl;
derivedCommIt++;
}
}
// Perform restore
mpi::communicator c(m_comm, mpi::comm_attach);
int rank = c.rank();
int size = c.size();
int recv_rank = (rank + 1) % size;
int send_rank = (rank + size - 1) % size;
mpi::request reqs[2];
cout << "Restore: Receiving from " << recv_rank << endl;
reqs[0] = c.irecv(recv_rank, 0, m_data);
cout << "Restore: Sending " << m_dataBackup.size() << " items." << endl;
cout << "Restore: Sending my backup copy to " << send_rank << endl;
reqs[1] = c.isend(send_rank, 0, m_dataBackup);
cout << "Restore: Waiting for data from " << recv_rank << endl;
reqs[0].wait();
cout << "Restore: Complete" << endl;
cout << "Restore: Received " << m_data.size() << " items in queue." << endl;
return MPI_SUCCESS;
}
void CommMpi::v_BackupState()
{
mpi::communicator c(m_comm, mpi::comm_attach);
mpi::request reqs[2];
int rank = c.rank();
int size = c.size();
int recv_rank = (rank + size - 1) % size;
int send_rank = (rank + 1) % size;
cout << "Backup: Sending " << m_data.size() << " items in queue." << endl;
reqs[0] = c.isend(send_rank, 0, m_data);
cout << "Backup: Sent to " << send_rank << endl;
reqs[1] = c.irecv(recv_rank, 0, m_dataBackup);
cout << "Backup: Waiting for data from " << recv_rank << endl;
reqs[1].wait();
cout << "Backup: Received " << m_dataBackup.size() << " items." << endl;
// cout << "Backup: Sending " << m_data.size() << " items in queue." << endl;
// c.send((rank + 1) % size, 0, m_data);
// cout << "Backup: Sent to " << (rank + 1) % size << endl;
// cout << "Backup: Waiting for data from " << (rank + 1) % size << endl;
// c.recv((rank - 1) % size, 0, m_dataBackup);
// cout << "Backup: Received " << m_dataBackup.size() << " items." << endl;
}
}
static void CommMpi::HandleMpiError(MPI_Comm* pcomm, int* perr, ...)
......@@ -779,9 +861,38 @@ static void CommMpi::HandleMpiError(MPI_Comm* pcomm, int* perr, ...)
throw 1; //UlfmFailureDetected(std::string(errstr));
}
void CommMpi::v_ReplaceComm(void* commptr)
void CommMpi::v_BeginTransactionLog()
{
m_isLogging = true;
if (m_isRecovering) {
for (auto x : m_derivedComm)
{
x->m_isRecovering = true;
}
}
}
void CommMpi::v_EndTransactionLog()
{
m_comm = (MPI_Comm)(commptr);
m_isLogging = false;
m_isRecovering = false;
for (auto x : m_derivedComm)
{
std::cout << "Ending transaction log on derived comm." << std::endl;
x->EndTransactionLog();
}
BackupState();
}
void CommMpi::ReplaceComm(MPI_Comm commptr)
{
m_comm = commptr;
MPI_Comm_size(m_comm, &m_size);
MPI_Comm_rank(m_comm, &m_rank);
m_type = "Parallel MPI";
}
}
......
......@@ -40,6 +40,8 @@
#include <string>
#include <queue>
#include <vector>
#include <list>
#include <LibUtilities/Communication/Comm.h>
#include <LibUtilities/Memory/NekMemoryManager.hpp>
......@@ -135,24 +137,36 @@ protected:
virtual void v_SplitComm(int pRows, int pColumns);
virtual CommSharedPtr v_CommCreateIf(int flag);
virtual int v_EnrolSpare();
virtual void v_BackupState();
private:
typedef std::queue<std::vector<char>> StorageType;
typedef std::list<CommMpiSharedPtr> DerivedCommType;
typedef std::queue<int> DerivedCommFlagType;
MPI_Comm m_comm;
MPI_Comm m_agreecomm;
int m_rank;
bool m_isRecovering; ///< True if we are undergoing recovery from failed process
bool m_isLogging; ///< True if logging MPI output
StorageType m_data;
StorageType m_dataBackup;
DerivedCommType m_derivedComm; ///< Temporary derived comm list used during restore
DerivedCommFlagType m_derivedCommFlag; ///< Log derived comm flags
DerivedCommFlagType m_derivedCommFlagBackup; ///< Backup of neighbour flags
static void HandleMpiError(MPI_Comm* pcomm, int* perr, ...);
CommMpi(MPI_Comm pComm);
virtual void v_ReplaceComm(void* commptr);