Commit 68496b2c authored by Simon Clifford's avatar Simon Clifford

Threading as library

Backported from feature/threading-topdown:
* all changes to threading library
* mutex locks around memory, factories, etc
* changes to Loki to make it thread-safe

Did not port:
* GsMPI stuff
* ThreadedComm
* changes relating to ThreadedComm
parent 2ca98313
......@@ -66,7 +66,8 @@ namespace Nektar
{
typedef Loki::SingletonHolder<MeshPartitionFactory,
Loki::CreateUsingNew,
Loki::NoDestroy > Type;
Loki::NoDestroy,
Loki::SingleThreaded> Type;
return Type::Instance();
}
......
......@@ -43,6 +43,8 @@
#include <boost/preprocessor/arithmetic/sub.hpp>
#include <boost/preprocessor/punctuation/comma_if.hpp>
#include <boost/preprocessor/iteration/iterate.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/shared_ptr.hpp>
......@@ -65,6 +67,8 @@ namespace Nektar
// Generate parameter typenames with default type of 'none'
#define FACTORY_print(z, n, data) BOOST_PP_CAT(data, n) = none
typedef boost::unique_lock<boost::shared_mutex> WriteLock;
typedef boost::shared_lock<boost::shared_mutex> ReadLock;
/**
* @class NekFactory
......@@ -144,7 +148,7 @@ namespace Nektar
public:
NekFactory() {}
NekFactory() : m_mutex() {}
/**
* @brief Create an instance of the class referred to by \c idKey.
......@@ -158,6 +162,9 @@ namespace Nektar
tBaseSharedPtr CreateInstance(tKey idKey BOOST_PP_COMMA_IF(MAX_PARAM)
BOOST_PP_ENUM_BINARY_PARAMS(MAX_PARAM, tParam, x))
{
ReadLock vReadLock(m_mutex);
// Now try and find the key in the map.
TMapFactoryIterator it = getMapFactory()->find(idKey);
......@@ -165,11 +172,14 @@ namespace Nektar
// create a new instance of the class.
if (it != getMapFactory()->end())
{
if (it->second.m_func)
ModuleEntry *tmp = &(it->second);
vReadLock.unlock();
if (tmp->m_func)
{
try
{
return it->second.m_func(BOOST_PP_ENUM_PARAMS(MAX_PARAM, x));
return tmp->m_func(BOOST_PP_ENUM_PARAMS(MAX_PARAM, x));
}
catch (const std::string& s)
{
......@@ -205,6 +215,8 @@ namespace Nektar
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator,
tDescription pDesc = "")
{
WriteLock vWriteLock(m_mutex);
ModuleEntry e(classCreator, pDesc);
getMapFactory()->insert(std::pair<tKey,ModuleEntry>(idKey, e));
return idKey;
......@@ -216,6 +228,8 @@ namespace Nektar
*/
bool ModuleExists(tKey idKey)
{
ReadLock vReadLock(m_mutex);
// Now try and find the key in the map.
TMapFactoryIterator it = getMapFactory()->find(idKey);
......@@ -232,6 +246,8 @@ namespace Nektar
*/
void PrintAvailableClasses(std::ostream& pOut = std::cout)
{
ReadLock vReadLock(m_mutex);
pOut << std::endl << "Available classes: " << std::endl;
TMapFactoryIterator it;
for (it = getMapFactory()->begin(); it != getMapFactory()->end(); ++it)
......@@ -260,6 +276,8 @@ namespace Nektar
*/
tKey GetKey(tDescription pDesc)
{
ReadLock vReadLock(m_mutex);
TMapFactoryIterator it;
for (it = getMapFactory()->begin(); it != getMapFactory()->end(); ++it)
{
......@@ -280,6 +298,8 @@ namespace Nektar
*/
std::string GetClassDescription(tKey idKey)
{
ReadLock vReadLock(m_mutex);
// Now try and find the key in the map.
TMapFactoryIterator it = getMapFactory()->find(idKey);
......@@ -305,6 +325,8 @@ namespace Nektar
TMapFactory mMapFactory;
boost::shared_mutex m_mutex;
};
#undef FACTORY_print
......@@ -325,6 +347,11 @@ namespace Nektar
#define n BOOST_PP_ITERATION()
// Define macro for printing the non-required template parameters
#define FACTORY_print(z, n, data) data
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/locks.hpp>
typedef boost::unique_lock<boost::shared_mutex> WriteLock;
typedef boost::shared_lock<boost::shared_mutex> ReadLock;
template < typename tKey,
typename tBase BOOST_PP_COMMA_IF(n)
......@@ -352,19 +379,24 @@ namespace Nektar
typedef std::map<tKey, ModuleEntry, tPredicator> TMapFactory;
typedef typename TMapFactory::iterator TMapFactoryIterator;
NekFactory() {}
NekFactory() : m_mutex() {}
tBaseSharedPtr CreateInstance(tKey idKey BOOST_PP_COMMA_IF(n)
BOOST_PP_ENUM_BINARY_PARAMS(n, tParam, x))
{
ReadLock vReadLock(m_mutex);
TMapFactoryIterator it = getMapFactory()->find(idKey);
if (it != getMapFactory()->end())
{
if (it->second.m_func)
ModuleEntry *tmp = &(it->second);
vReadLock.unlock();
if (tmp->m_func)
{
try
{
return it->second.m_func(BOOST_PP_ENUM_PARAMS(n, x));
return tmp->m_func(BOOST_PP_ENUM_PARAMS(n, x));
}
catch (const std::string& s)
{
......@@ -385,6 +417,8 @@ namespace Nektar
tKey RegisterCreatorFunction(tKey idKey,
CreatorFunction classCreator,
tDescription pDesc = "") {
WriteLock vWriteLock(m_mutex);
ModuleEntry e(classCreator, pDesc);
getMapFactory()->insert(std::pair<tKey,ModuleEntry>(idKey, e));
return idKey;
......@@ -392,6 +426,8 @@ namespace Nektar
bool ModuleExists(tKey idKey)
{
ReadLock vReadLock(m_mutex);
// Now try and find the key in the map.
TMapFactoryIterator it = getMapFactory()->find(idKey);
......@@ -404,6 +440,8 @@ namespace Nektar
void PrintAvailableClasses(std::ostream& pOut = std::cout)
{
ReadLock vReadLock(m_mutex);
pOut << std::endl << "Available classes: " << std::endl;
TMapFactoryIterator it;
for (it = getMapFactory()->begin(); it != getMapFactory()->end(); ++it)
......@@ -423,6 +461,8 @@ namespace Nektar
tKey GetKey(tDescription pDesc)
{
ReadLock vReadLock(m_mutex);
TMapFactoryIterator it;
for (it = getMapFactory()->begin(); it != getMapFactory()->end(); ++it)
{
......@@ -439,6 +479,8 @@ namespace Nektar
std::string GetClassDescription(tKey idKey)
{
ReadLock vReadLock(m_mutex);
// Now try and find the key in the map.
TMapFactoryIterator it = getMapFactory()->find(idKey);
......@@ -458,6 +500,8 @@ namespace Nektar
NekFactory& operator=(const NekFactory& rhs);
TMapFactory mMapFactory;
boost::shared_mutex m_mutex;
};
#undef n
#undef FACTORY_print
......
......@@ -41,6 +41,8 @@
#include <boost/function.hpp>
#include <boost/call_traits.hpp>
#include <boost/concept_check.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/shared_ptr.hpp>
#include <LibUtilities/BasicUtils/ErrorUtil.hpp>
......@@ -51,6 +53,9 @@ namespace Nektar
{
namespace LibUtilities
{
typedef boost::unique_lock<boost::shared_mutex> WriteLock;
typedef boost::shared_lock<boost::shared_mutex> ReadLock;
template <typename KeyType>
struct defOpLessCreator
{
......@@ -76,9 +81,10 @@ namespace Nektar
typedef std::map<std::string, BoolSharedPtr> FlagContainerPool;
NekManager(std::string whichPool="") :
m_values(),
m_values(),
m_globalCreateFunc(),
m_keySpecificCreateFuncs()
{
if (!whichPool.empty())
{
......@@ -114,6 +120,7 @@ namespace Nektar
{
if (!whichPool.empty())
{
ReadLock v_rlock(m_mutex); // reading static members
typename ValueContainerPool::iterator iter = m_ValueContainerPool.find(whichPool);
if (iter != m_ValueContainerPool.end())
{
......@@ -122,6 +129,12 @@ namespace Nektar
}
else
{
v_rlock.unlock();
// now writing static members. Apparently upgrade_lock has less desirable properties
// than just dropping read lock, grabbing write lock.
// write will block until all reads are done, but reads cannot be acquired if write
// lock is blocking. In this context writes are supposed to be rare.
WriteLock v_wlock(m_mutex);
m_values = ValueContainerShPtr(new ValueContainer);
m_ValueContainerPool[whichPool] = m_values;
if (m_managementEnabledContainerPool.find(whichPool) == m_managementEnabledContainerPool.end())
......@@ -138,14 +151,14 @@ namespace Nektar
m_managementEnabled = BoolSharedPtr(new bool(true));
}
}
~NekManager()
{
}
/// Register the given function and associate it with the key.
/// The return value is just to facilitate calling statically.
bool RegisterCreator(typename boost::call_traits<KeyType>::const_reference key,
bool RegisterCreator(typename boost::call_traits<KeyType>::const_reference key,
const CreateFuncType& createFunc)
{
m_keySpecificCreateFuncs[key] = createFunc;
......@@ -170,7 +183,7 @@ namespace Nektar
{
value = true;
}
return value;
}
......@@ -227,6 +240,8 @@ namespace Nektar
typename ValueContainerPool::iterator x;
if (!whichPool.empty())
{
WriteLock v_wlock(m_mutex);
x = m_ValueContainerPool.find(whichPool);
ASSERTL1(x != m_ValueContainerPool.end(),
"Could not find pool " + whichPool);
......@@ -234,6 +249,8 @@ namespace Nektar
}
else
{
WriteLock v_wlock(m_mutex);
for (x = m_ValueContainerPool.begin(); x != m_ValueContainerPool.end(); ++x)
{
x->second->clear();
......@@ -246,6 +263,8 @@ namespace Nektar
typename FlagContainerPool::iterator x;
if (!whichPool.empty())
{
WriteLock v_wlock(m_mutex);
x = m_managementEnabledContainerPool.find(whichPool);
if (x != m_managementEnabledContainerPool.end())
{
......@@ -263,6 +282,8 @@ namespace Nektar
typename FlagContainerPool::iterator x;
if (!whichPool.empty())
{
WriteLock v_wlock(m_mutex);
x = m_managementEnabledContainerPool.find(whichPool);
if (x != m_managementEnabledContainerPool.end())
{
......@@ -276,8 +297,8 @@ namespace Nektar
}
private:
NekManager(const NekManager<KeyType, ValueType, opLessCreator>& rhs);
NekManager<KeyType, ValueType, opLessCreator>& operator=(const NekManager<KeyType, ValueType, opLessCreator>& rhs);
NekManager(const NekManager<KeyType, ValueType, opLessCreator>& rhs);
ValueContainerShPtr m_values;
BoolSharedPtr m_managementEnabled;
......@@ -285,9 +306,12 @@ namespace Nektar
static FlagContainerPool m_managementEnabledContainerPool;
CreateFuncType m_globalCreateFunc;
CreateFuncContainer m_keySpecificCreateFuncs;
static boost::shared_mutex m_mutex;
};
template <typename KeyType, typename ValueT, typename opLessCreator> typename NekManager<KeyType, ValueT, opLessCreator>::ValueContainerPool NekManager<KeyType, ValueT, opLessCreator>::m_ValueContainerPool;
template <typename KeyType, typename ValueT, typename opLessCreator> typename NekManager<KeyType, ValueT, opLessCreator>::FlagContainerPool NekManager<KeyType, ValueT, opLessCreator>::m_managementEnabledContainerPool;
template <typename KeyType, typename ValueT, typename opLessCreator>
typename boost::shared_mutex NekManager<KeyType, ValueT, opLessCreator>::m_mutex;
}
}
......
......@@ -55,7 +55,6 @@ using namespace std;
#include <LibUtilities/Memory/NekMemoryManager.hpp>
#include <LibUtilities/BasicUtils/MeshPartition.h>
#include <LibUtilities/BasicUtils/ParseUtils.hpp>
#include <LibUtilities/BasicUtils/Thread.h>
#include <LibUtilities/BasicUtils/FileSystem.h>
#include <boost/program_options.hpp>
......@@ -276,9 +275,6 @@ namespace Nektar
// Parse the XML data in #m_xmlDoc
ParseDocument();
// Start threads
StartThreads();
// Override SOLVERINFO and parameters with any specified on the
// command line.
CmdLineOverride();
......@@ -1537,7 +1533,6 @@ namespace Nektar
void SessionReader::PartitionMesh()
{
ASSERTL0(m_comm.get(), "Communication not initialised.");
ASSERTL0(m_threadManager, "Threading not initialised.");
// Get row of comm, or the whole comm if not split
CommSharedPtr vCommMesh = m_comm->GetRowComm();
......@@ -2636,23 +2631,6 @@ namespace Nektar
}
}
/**
*
*/
void SessionReader::StartThreads()
{
int nthreads;
LoadParameter("NThreads", nthreads, 1);
cerr << "Number of threads will be: " << nthreads << endl;
m_threadManager = Thread::GetThreadManager().CreateInstance("ThreadManagerBoost", nthreads);
}
Nektar::Thread::ThreadManagerSharedPtr SessionReader::GetThreadManager()
{
return m_threadManager;
}
void SessionReader::SetUpXmlDoc(void)
{
m_xmlDoc = MergeDoc(m_filenames);
......
......@@ -43,7 +43,6 @@
#include <LibUtilities/BasicConst/NektarUnivTypeDefs.hpp>
#include <LibUtilities/LibUtilitiesDeclspec.h>
#include <LibUtilities/Interpreter/AnalyticExpressionEvaluator.hpp>
#include <LibUtilities/BasicUtils/Thread.h>
#include <boost/algorithm/string.hpp>
#include <boost/enable_shared_from_this.hpp>
......@@ -52,7 +51,6 @@
class TiXmlElement;
class TiXmlDocument;
namespace Nektar
{
namespace LibUtilities
......@@ -428,8 +426,6 @@ namespace Nektar
LIB_UTILITIES_EXPORT void SetUpXmlDoc();
LIB_UTILITIES_EXPORT Nektar::Thread::ThreadManagerSharedPtr GetThreadManager();
private:
boost::program_options::variables_map m_cmdLineOptions;
......@@ -461,8 +457,6 @@ namespace Nektar
FilterMap m_filters;
/// Be verbose
bool m_verbose;
/// Thread Manager
Nektar::Thread::ThreadManagerSharedPtr m_threadManager;
/// Map of original composite ordering for parallel periodic bcs.
CompositeOrdering m_compOrder;
/// Map of original boundary region ordering for parallel periodic
......@@ -529,8 +523,6 @@ namespace Nektar
LIB_UTILITIES_EXPORT void ReadFunctions(TiXmlElement *conditions);
/// Reads the FILTERS section of the XML document.
LIB_UTILITIES_EXPORT void ReadFilters(TiXmlElement *filters);
/// Starts the required number of threads on this worker
void StartThreads();
/// Enforce parameters from command line arguments.
LIB_UTILITIES_EXPORT void CmdLineOverride();
......
......@@ -4,19 +4,19 @@
*/
#include <iostream>
#include "Thread.h"
#include <loki/Singleton.h>
#include "LibUtilities/BasicUtils/Thread.h"
namespace Nektar
{
namespace Thread
{
ThreadManagerSharedPtr ThreadManager::m_instance;
ThreadManagerFactory& GetThreadManager()
// ThreadManagerSharedPtr ThreadManager::m_instance;
ThreadManagerFactory& GetThreadManagerFactory()
{
typedef Loki::SingletonHolder<ThreadManagerFactory,
Loki::CreateUsingNew,
Loki::NoDestroy > Type;
Loki::NoDestroy,
Loki::SingleThreaded> Type;
return Type::Instance();
}
......@@ -38,42 +38,129 @@ namespace Nektar
unsigned int ThreadJob::GetWorkerNum()
{
return m_workerNum;
return m_workerNum;
}
// ThreadManager implementation.
bool ThreadManager::IsInitialised()
{
return true;
}
ThreadManager::~ThreadManager()
{
// empty
// empty
}
// ThreadHandle implementation.
ThreadHandle::ThreadHandle(SchedType sched)
// ThreadMaster implementation
ThreadMaster::ThreadMaster() : m_threadManagers(1), m_mutex(), m_threadingType()
{
// empty
}
ThreadMaster::~ThreadMaster()
{
// Locking is a bit pointless, since the map is empty after this call.
m_threadManagers.clear();
}
ThreadMaster& GetThreadMaster()
{
typedef Loki::SingletonHolder<ThreadMaster,
Loki::CreateUsingNew,
Loki::NoDestroy,
Loki::SingleThreaded> Type;
return Type::Instance();
}
void ThreadMaster::SetThreadingType(const std::string &p_type)
{
Setup(sched, 1);
ASSERTL0(m_threadingType.empty(), "Tried to SetThreadingType when it was already set");
m_threadingType = p_type;
}
ThreadHandle::ThreadHandle(SchedType sched, unsigned int chnk)
ThreadManagerSharedPtr& ThreadMaster::GetInstance(const ThreadManagerName t)
{
Setup(sched, chnk);
if ( !m_threadManagers[t] )
{
m_threadManagers[t] = ThreadManagerSharedPtr(new ThreadStartupManager());
return m_threadManagers[t];
}
return m_threadManagers[t];
}
void ThreadHandle::Setup(SchedType sched, unsigned int chnk)
{
m_tm = ThreadManager::GetInstance();
if (!m_tm)
{
std::cerr << "Attempted to construct a ThreadHandle before a ThreadManager has been created." << std::endl;
std::abort();
}
m_tm->SetSchedType(sched);
m_tm->SetChunkSize(chnk);
m_tm->SetNumWorkers();
ThreadManagerSharedPtr ThreadMaster::CreateInstance(const ThreadManagerName t,
unsigned int nThr)
{
ASSERTL0(!m_threadingType.empty(), "Trying to create a ThreadManager before SetThreadingType called");
return m_threadManagers[t] =
Thread::GetThreadManagerFactory().CreateInstance(m_threadingType, nThr);
}
ThreadHandle::~ThreadHandle()
// ThreadDefaultManager
ThreadStartupManager::ThreadStartupManager() : m_type("Threading starting up")
{
// empty
}
ThreadStartupManager::~ThreadStartupManager()
{
// empty
}
void ThreadStartupManager::QueueJobs(std::vector<ThreadJob*>& joblist)
{
NEKERROR(ErrorUtil::efatal, "Attempted to QueueJobs in ThreadDefaultManager");
}
void ThreadStartupManager::QueueJob(ThreadJob* job)
{
NEKERROR(ErrorUtil::efatal, "Attempted to QueueJob in ThreadDefaultManager");
}
unsigned int ThreadStartupManager::GetNumWorkers()
{
return 1;
}
unsigned int ThreadStartupManager::GetWorkerNum()
{
return 0;
}
void ThreadStartupManager::SetNumWorkers(const unsigned int num)
{
ASSERTL0(num==1, "Attempted to SetNumWorkers to != 1 in ThreadDefaultManager");
}
void ThreadStartupManager::SetNumWorkers()
{
return;
}
unsigned int ThreadStartupManager::GetMaxNumWorkers()
{
return 1;
}
void ThreadStartupManager::Wait()
{
return;
}
void ThreadStartupManager::SetChunkSize(unsigned int chnk)
{
NEKERROR(ErrorUtil::efatal, "Attempted to SetChunkSize in ThreadDefaultManager");
}
void ThreadStartupManager::SetSchedType(SchedType s)
{
NEKERROR(ErrorUtil::efatal, "Attempted to SetSchedType in ThreadDefaultManager");
}
bool ThreadStartupManager::InThread()
{
return false;
}
void ThreadStartupManager::Hold()
{
return;
}
bool ThreadStartupManager::IsInitialised()
{
return false;
}
const std::string& ThreadStartupManager::GetType() const
{
m_tm->Wait();
return m_type;
}
} // Thread
......
This diff is collapsed.
......@@ -3,7 +3,7 @@
*
*/
#include "ThreadBoost.h"
#include "LibUtilities/BasicUtils/ThreadBoost.h"
#include <iostream>
namespace Nektar
......@@ -11,9 +11,9 @@ namespace Nektar
namespace Thread
{
std::string ThreadManagerBoost::className =
GetThreadManager().RegisterCreatorFunction("ThreadManagerBoost",
ThreadManagerBoost::Create, "Threading using Boost.");
std::string ThreadManagerBoost::className =
GetThreadManagerFactory().RegisterCreatorFunction("ThreadManagerBoost",
ThreadManagerBoost::Create, "Threading using Boost.");
ThreadManagerBoost::~ThreadManagerBoost()
{
......@@ -37,12 +37,14 @@ namespace Nektar
delete []m_threadThreadList;
delete []m_threadActiveList;
delete []m_threadBusyList;
delete m_barrier;
}
ThreadManagerBoost::ThreadManagerBoost(unsigned int numT) :
m_numThreads(numT), m_numWorkers(numT-1), m_masterQueue(), m_masterQueueMutex(),
m_masterActiveMutex(), m_masterQueueCondVar(), m_masterActiveCondVar(),
m_chunkSize(1), m_schedType(e_dynamic)
m_chunkSize(1), m_schedType(e_dynamic),
m_threadMap()
{
using namespace std;
try {
......@@ -73,6 +75,8 @@ namespace Nektar
try {
m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
boost::thread::id id = m_threadThreadList[i]->get_id();
m_threadMap[id] = i;
} catch (...) {
std::cerr << "Exception while creating worker threads" << std::endl;
abort();
......@@ -81,6 +85,8 @@ namespace Nektar
}
m_threadActiveList[m_numThreads-1] = false;
m_masterThreadId = boost::this_thread::get_id();
m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
m_type = "Threading with Boost";
}
void ThreadManagerBoost::QueueJobs(std::vector<ThreadJob*> &joblist)
......@@ -124,8 +130,8 @@ namespace Nektar
bool ThreadManagerBoost::InThread()
{
boost::thread::id id = boost::this_thread::get_id();
return (id != m_masterThreadId);
boost::thread::id id = boost::this_thread::get_id();
return (id != m_masterThreadId);
}
void ThreadManagerBoost::Wait()
......@@ -147,27 +153,41 @@ namespace Nektar
unsigned int ThreadManagerBoost::GetNumWorkers()
{