Commit 90440419 authored by Michael Bareford's avatar Michael Bareford

Altered write routine to take advantage of variable gather/scatter.

parent 1b2f2bf0
......@@ -154,12 +154,12 @@ uint64_t FieldIOHdf5::v_Write(const std::string &outFilename,
const FieldMetaDataMap &fieldMetaDataMap,
const bool backup)
{
int nprocs = m_comm->GetSize();
int nranks = m_comm->GetSize();
int rk = m_comm->GetRank();
std::stringstream prfx;
prfx << rk << ": FieldIOHdf5::v_Write(): ";
/*
double tm0 = 0.0;
if (0 == rk)
......@@ -183,20 +183,12 @@ uint64_t FieldIOHdf5::v_Write(const std::string &outFilename,
uint64_t nWritten = 0;
std::size_t nFields = fieldDefs.size();
std::size_t nMaxFields = nFields;
int rkFormatter = -1;
int rkFormatter = 0;
std::vector<uint64_t> fieldHashes(nFields);
std::vector<uint64_t> fieldCounts(FIELD_COUNT_SIZE*nFields);
std::vector<uint64_t> totalFieldCounts(FIELD_COUNT_SIZE);
if (reformatting)
{
m_comm->AllReduce(nMaxFields, LibUtilities::ReduceMax);
rkFormatter = GetFormattingRank(nFields, nMaxFields);
fieldHashes.resize(nMaxFields);
fieldCounts.resize(FIELD_COUNT_SIZE*nMaxFields);
}
std::vector<uint64_t> fieldCounts(FIELD_COUNT_SIZE*nFields, 0);
std::vector<uint64_t> totalFieldCounts(FIELD_COUNT_SIZE, 0);
std::vector<std::string> fieldNames(nFields);
std::vector<std::string> shapeStrings(nFields);
std::vector<std::vector<NekDouble> > homoLengths(nFields);
......@@ -384,77 +376,103 @@ uint64_t FieldIOHdf5::v_Write(const std::string &outFilename,
} // end of <for (std::size_t f = 0; f < nFields; ++f)> loop
std::vector<uint64_t> firstDataDecomps(DATA_DECOMP_SIZE, 0);
std::vector<uint64_t> firstDataDecomps;
if (reformatting)
{
std::vector<uint64_t> fieldDecomps(FIELD_DECOMP_SIZE, 0);
fieldDecomps[FIELD_DECOMP_CNT] = nFields;
std::vector<uint64_t> allFieldDecomps = m_comm->Gather(rkFormatter, fieldDecomps);
std::vector<uint64_t> allFieldCounts = m_comm->Gather(rkFormatter, fieldCounts);
std::vector<uint64_t> allFieldHashes = m_comm->Gather(rkFormatter, fieldHashes);
std::size_t nTotFields = 0;
std::vector<int> sizeMap, offsetMap;
std::vector<int> sizeMap2, offsetMap2;
std::vector<int> vnFields(1, nFields);
std::vector<int> allnFields = m_comm->Gather(rkFormatter, vnFields);
if (rkFormatter == rk)
{
sizeMap.resize(nranks, 0);
offsetMap.resize(nranks, 0);
sizeMap[0] = allnFields[0];
for (int r = 1; r < nranks; ++r)
{
sizeMap[r] = allnFields[r];
offsetMap[r] = offsetMap[r-1] + sizeMap[r-1];
}
nTotFields = offsetMap[nranks-1] + sizeMap[nranks-1];
}
std::vector<uint64_t> allFieldHashes = m_comm->Gatherv(rkFormatter, fieldHashes, sizeMap, offsetMap);
std::vector<uint64_t> allFieldDecomps;
if (rkFormatter == rk)
{
allFieldDecomps.resize(FIELD_DECOMP_SIZE*nranks, 0);
sizeMap2.resize(nranks, 0);
offsetMap2.resize(nranks, 0);
for (int r = 0; r < nranks; ++r)
{
std::size_t fdo = FIELD_DECOMP_SIZE*r;
allFieldDecomps[fdo + FIELD_DECOMP_CNT] = sizeMap[r];
allFieldDecomps[fdo + FIELD_DECOMP_OFF] = offsetMap[r];
sizeMap2[r] = FIELD_COUNT_SIZE*sizeMap[r];
offsetMap2[r] = FIELD_COUNT_SIZE*offsetMap[r];
}
}
std::vector<uint64_t> allFieldCounts = m_comm->Gatherv(rkFormatter, fieldCounts, sizeMap2, offsetMap2);
std::vector<uint64_t> allFirstDataDecomps;
std::size_t nTotFields = 0;
m_comm->Reduce(nFields, nTotFields, LibUtilities::ReduceSum, rkFormatter);
if (rkFormatter == rk)
if (rkFormatter == rk)
{
std::vector<uint64_t> allDataDecomps(DATA_DECOMP_SIZE*nTotFields);
allFirstDataDecomps.resize(DATA_DECOMP_SIZE*nprocs);
allFirstDataDecomps.resize(DATA_DECOMP_SIZE*nranks);
nWritten += CreateDataSets(outFilename, rkFormatter, nMaxFields, nTotFields,
nWritten += CreateDataSets(outFilename, rkFormatter, nTotFields,
allFieldHashes, allFieldCounts, allFieldDecomps,
allFirstDataDecomps, allDataDecomps,
fieldDefs, homoYIDs, homoZIDs, homoSIDs, numModesPerDirVar,
fieldData, fieldMetaDataMap);
nWritten += WriteDecompositionData(outFilename, rkFormatter,
allFieldDecomps, allDataDecomps);
std::set<uint64_t> hashesAssigned;
for (std::size_t f = 0; f < nMaxFields; ++f)
{
uint64_t hash = allFieldHashes[nMaxFields*rkFormatter + f];
hashesAssigned.insert(hash);
}
for (int r = 0; r < nprocs; ++r)
for (int r = 0; r < nranks; ++r)
{
if (r == rkFormatter) continue;
for (std::size_t f = 0; f < nMaxFields; ++f)
std::size_t fdo = FIELD_DECOMP_SIZE*r;
std::size_t fdc = allFieldDecomps[fdo + FIELD_DECOMP_CNT];
std::size_t fho = allFieldDecomps[fdo + FIELD_DECOMP_OFF];
for (std::size_t f = 0; f < fdc; ++f)
{
// Note hash can be zero if, for process r, nFields < nMaxFields.
uint64_t hash = allFieldHashes[nMaxFields*r + f];
if (0 == hash) break;
uint64_t hash = allFieldHashes[fho + f];
if (hashesAssigned.find(hash) == hashesAssigned.end())
{
hashesAssigned.insert(hash);
}
else
{
// This field hash has been assigned to another process.
allFieldHashes[nMaxFields*r + f] = 0;
// This field hash has been assigned to be written by another process.
allFieldHashes[fho + f] = 0;
}
}
}
} // end of <if (rkFormatter == rk)> clause
// Scatter those field hashes that indicate which the field definitions that each process writes to file.
std::vector<uint64_t> writeFieldHashes = m_comm->Scatter(rkFormatter, allFieldHashes);
// Write field definitions to checkpoint file.
// Scatter those field hashes that indicate which the field definitions that each process writes to file.
std::vector<uint64_t> writeFieldHashes = m_comm->Scatterv(rkFormatter, allFieldHashes, sizeMap, offsetMap, nFields);
// Write field definitions to checkpoint file.
nWritten += WriteFieldAttributes(outFilename, nFields, varOrder, writeFieldHashes, fieldDefs,
fieldNames, shapeStrings, homoLengths, numModesPerDirUni);
firstDataDecomps = m_comm->Scatter(rkFormatter, allFirstDataDecomps, DATA_DECOMP_SIZE);
} // end of <if (reformatting)> clause
nWritten = WriteData(outFilename, nFields, totalFieldCounts,
firstDataDecomps, fieldDefs, homoYIDs, homoZIDs, homoSIDs,
numModesPerDirVar, fieldData);
m_comm->Block();
/*
......@@ -468,56 +486,11 @@ uint64_t FieldIOHdf5::v_Write(const std::string &outFilename,
}
/**
* @brief Determine the formatting MPI process, i.e., the lowest ranked process handling
* nMaxFields fields, that will be responsible for writing the checkpoint file.
*
* @param nFields The number of fields handled by this process.
* @param nMaxFields The highest number of fields handled by any one process.
* @return The rank of the process responsible for formatting the checkpoint file.
*/
int FieldIOHdf5::GetFormattingRank(const uint64_t nFields, const uint64_t nMaxFields)
{
int nprocs = m_comm->GetSize();
int rk = m_comm->GetRank();
int rkFormatter = -1;
bool amRoot = false;
LibUtilities::CommSharedPtr maxFieldsComm;
if (nprocs > 1)
{
maxFieldsComm = m_comm->CommCreateIf((nFields == nMaxFields) ? 1 : 0);
}
else
{
maxFieldsComm = m_comm;
}
if (maxFieldsComm)
{
rkFormatter = rk;
maxFieldsComm->AllReduce(rkFormatter, LibUtilities::ReduceMin);
amRoot = (rk == rkFormatter);
if (!amRoot)
{
rkFormatter = -1;
}
}
m_comm->AllReduce(rkFormatter, LibUtilities::ReduceMax);
ASSERTL1(rkFormatter >= 0 && rkFormatter < nprocs,
rk << ": FieldIOHdf5:: GetFormattingRank (): " + "invalid formatting rank.");
return rkFormatter;
}
/**
* Create the HDF5 datasets for the checkpoint file.
*
* @param outFilename Output filename.
* @param rkFormatter The rank of the process responsible for formatting the checkpoint file.
* @param nMaxFields The highest number of fields handled by any one process.
* @param nTotFields The total number of fields handled by all processes.
* @param allFieldHashes Hashes of all field definitions.
* @param allFieldCounts The counts associated with all the fields handled by all the processes.
......@@ -535,9 +508,8 @@ int FieldIOHdf5::GetFormattingRank(const uint64_t nFields, const uint64_t nMaxFi
*/
uint64_t FieldIOHdf5::CreateDataSets(const std::string &outFilename,
const int rkFormatter,
const std::size_t nMaxFields,
const std::size_t nTotFields,
std::vector<uint64_t> allFieldHashes,
std::vector<uint64_t> &allFieldHashes,
std::vector<uint64_t> &allFieldCounts,
std::vector<uint64_t> &allFieldDecomps,
std::vector<uint64_t> &allFirstDataDecomps,
......@@ -550,7 +522,7 @@ uint64_t FieldIOHdf5::CreateDataSets(const std::string &outFilename,
std::vector<std::vector<NekDouble> > &fieldData,
const FieldMetaDataMap &fieldMetaDataMap)
{
int nprocs = m_comm->GetSize();
int nranks = m_comm->GetSize();
int rk = m_comm->GetRank();
uint64_t nWritten = 0;
......@@ -576,28 +548,23 @@ uint64_t FieldIOHdf5::CreateDataSets(const std::string &outFilename,
// Calculate the indexes to be used by each MPI process when reading the
// IDS and DATA datasets.
std::size_t rkFieldCnt = 0, runningFieldCnt = 0, runningDataDecompCnt = 0;
std::size_t runningElemIdCnt = 0, runningElemDataCnt = 0, runningOrderCnt = 0;
std::size_t runningHomyCnt = 0, runningHomzCnt = 0, runningHomsCnt = 0;
for (int r = 0; r < nprocs; ++r)
std::size_t ddo = 0;
for (int r = 0; r < nranks; ++r)
{
std::size_t fho = nMaxFields*r;
std::size_t fdo = FIELD_DECOMP_SIZE*r;
std::size_t fco_base = FIELD_COUNT_SIZE*nMaxFields*r;
std::size_t rkFieldCnt = allFieldDecomps[fdo + FIELD_DECOMP_CNT];
std::size_t rkFieldOff = allFieldDecomps[fdo + FIELD_DECOMP_OFF];
std::size_t fco = FIELD_COUNT_SIZE*rkFieldOff;
runningFieldCnt += rkFieldCnt;
rkFieldCnt = allFieldDecomps[fdo + FIELD_DECOMP_CNT];
allFieldDecomps[fdo + FIELD_DECOMP_OFF] = runningFieldCnt;
for (int f = 0; f < rkFieldCnt; ++f)
{
std::size_t fco = fco_base + FIELD_COUNT_SIZE*f;
std::size_t ddo = DATA_DECOMP_SIZE*runningDataDecompCnt;
std::size_t elemIdCnt, elemDataCnt, orderCnt;
std::size_t homyCnt, homzCnt, homsCnt;
allDataDecomps[ddo + DATA_DECOMP_FIELD_HASH] = allFieldHashes[fho + f];
allDataDecomps[ddo + DATA_DECOMP_FIELD_HASH] = allFieldHashes[rkFieldOff + f];
elemIdCnt = allFieldCounts[fco + FIELD_COUNT_IDS];
allDataDecomps[ddo + DATA_DECOMP_IDS_CNT] = elemIdCnt;
......@@ -631,20 +598,20 @@ uint64_t FieldIOHdf5::CreateDataSets(const std::string &outFilename,
if (0 == f)
{
std::size_t fddo = DATA_DECOMP_SIZE*r;
allFirstDataDecomps.insert(allFirstDataDecomps.begin()+fddo,
allFirstDataDecomps.insert(allFirstDataDecomps.end(),
allDataDecomps.begin()+ddo,
allDataDecomps.begin()+ddo+DATA_DECOMP_SIZE);
}
runningDataDecompCnt++;
fco += FIELD_COUNT_SIZE;
ddo += DATA_DECOMP_SIZE;
} // end of <for (int f = 0; f < rkFieldCnt; ++f)> loop
} // end of <for (int r = 0; r < nprocs; ++r)> loop
} // end of <for (int r = 0; r < nranks; ++r)> loop
// Create FIELD_DECOMPOSITION dataset: basic field info for each MPI process.
H5::DataTypeSharedPtr fieldDecompsType = H5::CompoundDataType::OfObject(allFieldDecomps[0]);
H5::DataSpaceSharedPtr fieldDecompsSpace = H5::DataSpace::OneD(FIELD_DECOMP_SIZE*nprocs);
H5::DataSpaceSharedPtr fieldDecompsSpace = H5::DataSpace::OneD(FIELD_DECOMP_SIZE*nranks);
H5::DataSetSharedPtr fieldDecompsDset =
root->CreateDataSet("FIELD_DECOMPOSITION", fieldDecompsType, fieldDecompsSpace);
ASSERTL1(fieldDecompsDset, prfx.str() + "cannot create FIELD_DECOMPOSITION dataset.");
......@@ -801,7 +768,7 @@ uint64_t FieldIOHdf5::WriteFieldAttributes(const std::string &outFilename,
std::vector<std::vector<NekDouble> > &homoLengths,
std::vector<std::string> &numModesPerDirUni)
{
int nprocs = m_comm->GetSize();
int nranks = m_comm->GetSize();
int rk = m_comm->GetRank();
uint64_t nWritten = 0;
bool allZeros = std::all_of(writeFieldHashes.begin(), writeFieldHashes.end(), [](uint64_t h) { return h==0; });
......@@ -809,7 +776,7 @@ uint64_t FieldIOHdf5::WriteFieldAttributes(const std::string &outFilename,
std::stringstream prfx;
prfx << rk << ": FieldIOHdf5:: WriteFieldAttributes(): ";
for (int r = 0; r < nprocs; ++r)
for (int r = 0; r < nranks; ++r)
{
// Write out this rank's field definitions.
if (rk == r && !allZeros)
......@@ -870,7 +837,7 @@ uint64_t FieldIOHdf5::WriteFieldAttributes(const std::string &outFilename,
// We block to avoid more than one processor opening the file at a time.
m_comm->Block();
} // end of <for (r = 0; r < nprocs; ++r)> loop
} // end of <for (r = 0; r < nranks; ++r)> loop
return nWritten;
}
......@@ -901,7 +868,7 @@ uint64_t FieldIOHdf5::WriteData(const std::string &outFilename,
std::vector<std::vector<unsigned int> > &numModesPerDirVar,
std::vector<std::vector<NekDouble> > &fieldData)
{
int nprocs = m_comm->GetSize();
int nranks = m_comm->GetSize();
int rk = m_comm->GetRank();
uint64_t nWritten = 0;
......@@ -911,7 +878,7 @@ uint64_t FieldIOHdf5::WriteData(const std::string &outFilename,
// Set properties for parallel file access (if we're in parallel).
H5::PListSharedPtr parallelProps = H5::PList::Default();
if (nprocs > 1)
if (nranks > 1)
{
// Use MPI/O to access the file
parallelProps = H5::PList::FileAccess();
......@@ -927,7 +894,7 @@ uint64_t FieldIOHdf5::WriteData(const std::string &outFilename,
if (!reformatting)
{
H5::PListSharedPtr readPL = H5::PList::Default();
if (nprocs > 1)
if (nranks > 1)
{
// Use collective IO
readPL = H5::PList::DatasetXfer();
......@@ -1145,7 +1112,7 @@ uint64_t FieldIOHdf5::v_Import(const std::string &inFilename,
FieldMetaDataMap &fieldMetaInfoMap,
const Array<OneD, int> &elementIDs)
{
int nprocs = m_comm->GetSize();
int nranks = m_comm->GetSize();
int rk = m_comm->GetRank();
std::stringstream prfx;
......@@ -1165,7 +1132,7 @@ uint64_t FieldIOHdf5::v_Import(const std::string &inFilename,
H5::PListSharedPtr parallelProps = H5::PList::Default();
H5::PListSharedPtr readPL = H5::PList::Default();
if (nprocs > 1)
if (nranks > 1)
{
// Use MPI/O to access the file
parallelProps = H5::PList::FileAccess();
......
......@@ -245,12 +245,10 @@ private:
const FieldMetaDataMap &fieldMetaDataMap = NullFieldMetaDataMap,
const bool backup = false);
LIB_UTILITIES_EXPORT int GetFormattingRank(const uint64_t nFields, const uint64_t nMaxFields);
LIB_UTILITIES_EXPORT uint64_t CreateDataSets(const std::string &outFilename,
const int rkFormatter,
const std::size_t nMaxFields,
const std::size_t nTotFields,
std::vector<uint64_t> allFieldHashes,
std::vector<uint64_t> &allFieldHashes,
std::vector<uint64_t> &allFieldCounts,
std::vector<uint64_t> &allFieldDecomps,
std::vector<uint64_t> &allFirstDataDecomps,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment