36 #ifdef USE_MULTITHREAD
40 #ifdef HAVE_SYS_TIMES_H
41 #include <sys/times.h>
48 #include <sys/types.h>
51 #ifdef HAVE_SYS_IOCTL_H
52 #include <sys/ioctl.h>
64 do_lock(
volatile AO_TS_t *p)
66 while (mbdyn_test_and_set(p) == AO_TS_SET);
82 for (
integer r = from; r < to; r++) {
89 for (
integer i = 0; i < nc; i++) {
125 MultiThreadDataManager::MultiThreadDataManager(
MBDynParser& HP,
129 const char* sOutputFileName,
130 const char* sInputFileName,
131 bool bAbortAfterInput,
134 DataManager(HP, OF, pS, dInitialTime, sOutputFileName, sInputFileName, bAbortAfterInput),
135 AssMode(ASS_UNKNOWN),
138 op(MultiThreadDataManager::OP_UNKNOWN),
140 propagate_ErrMatrixRebuild(AO_TS_INITIALIZER)
142 DataManager::nThreads = nThreads;
145 struct sched_param sp;
146 int policy = SCHED_FIFO;
149 rc = sched_getparam(0, &sp);
151 silent_cerr(
"sched_getparam() failed: " << errno << std::endl);
155 int pmin = sched_get_priority_min(policy);
156 int pmax = sched_get_priority_max(policy);
158 silent_cout(
"current priority is " << sp.sched_priority
159 <<
" {" << pmin <<
"," << pmax <<
"}" << std::endl);
161 if (sp.sched_priority > pmax || sp.sched_priority < pmin) {
162 sp.sched_priority = pmax;
165 rc = sched_setscheduler(0, policy, &sp);
167 silent_cerr(
"sched_setscheduler() unable "
168 "to set SCHED_FIFO scheduling policy: "
175 if (pthread_mutex_init(&thread_mutex, NULL)) {
176 silent_cerr(
"MultiThreadDataManager::MultiThreadDataManager(): "
177 "mutex init failed" << std::endl);
181 if (pthread_cond_init(&thread_cond, NULL)) {
182 silent_cerr(
"MultiThreadDataManager::MultiThreadDataManager(): "
183 "cond init failed" << std::endl);
190 MultiThreadDataManager::~MultiThreadDataManager(
void)
192 pthread_mutex_destroy(&thread_mutex);
193 pthread_cond_destroy(&thread_cond);
197 MultiThreadDataManager::ThreadDestroy(
void)
199 if (thread_data == 0) {
205 op = MultiThreadDataManager::OP_EXIT;
206 thread_count = nThreads - 1;
208 for (
unsigned i = 1;
i < nThreads;
i++) {
211 sem_post(&thread_data[
i].sem);
212 if (pthread_join(thread_data[
i].thread, &retval)) {
213 silent_cerr(
"pthread_join() failed on thread " <<
i
218 cputime += thread_data[
i].cputime;
221 if (thread_data[0].lock) {
224 thread_cleanup(&thread_data[0]);
234 MultiThreadDataManager::thread(
void *p)
236 MultiThreadDataManager::ThreadData *arg
237 = (MultiThreadDataManager::ThreadData *)p;
239 silent_cout(
"MultiThreadDataManager: thread " << arg->threadNumber
240 <<
" [self=" << pthread_self()
241 <<
",pid=" << getpid() <<
"]"
242 <<
" starting..." << std::endl);
244 bool bKeepGoing =
true;
246 #ifdef HAVE_PTHREAD_SIGMASK
249 sigemptyset(&newset);
250 sigaddset(&newset, SIGTERM);
251 sigaddset(&newset, SIGINT);
252 sigaddset(&newset, SIGHUP);
253 pthread_sigmask(SIG_BLOCK, &newset, NULL);
268 DEBUGCOUT(
"thread " << arg->threadNumber <<
": "
269 "op " << arg->pDM->op << std::endl);
272 switch (arg->pDM->op) {
273 case MultiThreadDataManager::OP_ASSJAC_CC:
276 arg->pDM->DataManager::AssJac(*arg->pJacHdl,
282 silent_cerr(
"thread " << arg->threadNumber
283 <<
" caught ErrRebuildMatrix"
286 mbdyn_test_and_set(&arg->pDM->propagate_ErrMatrixRebuild);
293 case MultiThreadDataManager::OP_ASSJAC_NAIVE:
295 arg->ppNaiveJacHdl[arg->threadNumber]->Reset();
299 arg->pDM->DataManager::AssJac(*arg->ppNaiveJacHdl[arg->threadNumber],
305 case MultiThreadDataManager::OP_SUM_NAIVE:
313 integer iFrom = (nn*(arg->threadNumber))/arg->pDM->nThreads;
314 integer iTo = (nn*(arg->threadNumber + 1))/arg->pDM->nThreads;
321 iFrom, iTo, arg->lock);
326 #ifdef MBDYN_X_MT_ASSRES
327 case MultiThreadDataManager::OP_ASSRES:
328 arg->pResHdl->Reset();
329 arg->pDM->DataManager::AssRes(*arg->pResHdl,
336 case MultiThreadDataManager::OP_EXIT:
343 silent_cerr(
"MultiThreadDataManager: unhandled op"
358 MultiThreadDataManager::thread_cleanup(ThreadData *arg)
364 if (arg->threadNumber > 0) {
371 if (arg->ppNaiveJacHdl) {
376 sem_destroy(&arg->sem);
378 #ifdef HAVE_SYS_TIMES_H
383 pedantic_cout(
"thread " << arg->threadNumber <<
":" << std::endl
384 <<
"\tutime: " << tmsbuf.tms_utime << std::endl
385 <<
"\tstime: " << tmsbuf.tms_stime << std::endl
386 <<
"\tcutime: " << tmsbuf.tms_cutime << std::endl
387 <<
"\tcstime: " << tmsbuf.tms_cstime << std::endl);
389 arg->cputime = tmsbuf.tms_utime + tmsbuf.tms_cutime
390 + tmsbuf.tms_stime + tmsbuf.tms_cstime;
395 MultiThreadDataManager::EndOfOp(
void)
400 pthread_mutex_lock(&thread_mutex);
402 last = (thread_count == 0);
406 pthread_cond_signal(&thread_cond);
410 pthread_mutex_unlock(&thread_mutex);
415 MultiThreadDataManager::ThreadSpawn(
void)
419 SAFENEWARRNOFILL(thread_data, MultiThreadDataManager::ThreadData, nThreads);
421 for (
unsigned i = 0;
i < nThreads;
i++) {
423 thread_data[
i].pDM =
this;
424 sem_init(&thread_data[
i].sem, 0, 0);
425 thread_data[
i].threadNumber =
i;
426 thread_data[
i].ElemIter.Init(&Elems[0], Elems.size());
427 thread_data[
i].lock = 0;
430 thread_data[
i].pWorkMatA = 0;
435 iMaxWorkNumItemsJac));
437 thread_data[
i].pWorkMatB = 0;
442 iMaxWorkNumItemsJac));
444 thread_data[
i].pWorkMat = thread_data[
i].pWorkMatA;
446 thread_data[
i].pWorkVec = 0;
452 thread_data[
i].pJacHdl = 0;
455 thread_data[
i].ppNaiveJacHdl = 0;
458 thread_data[
i].pResHdl = 0;
461 thread_data[
i].pMatA = 0;
462 thread_data[
i].pMatB = 0;
472 if (pthread_create(&thread_data[
i].thread, NULL, thread,
473 &thread_data[
i]) != 0) {
474 silent_cerr(
"pthread_create() failed "
476 <<
" of " << nThreads << std::endl);
490 CCAssJac(JacHdl, dCoef);
494 ASSERT(thread_data[0].ppNaiveJacHdl != 0);
495 if (&JacHdl == thread_data[0].ppNaiveJacHdl[0]) {
505 bool bDoAssemble(
false);
506 if (!pNaivePermJacHdl) {
511 ASSERT(pNaivePermJacHdl2 != 0);
512 if (&pNaivePermJacHdl->
GetPerm() == &pNaivePermJacHdl2->
GetPerm()) {
518 NaiveAssJac(JacHdl, dCoef);
523 for (
unsigned i = 1;
i < nThreads;
i++) {
524 if (thread_data[0].ppNaiveJacHdl[
i]) {
526 thread_data[0].ppNaiveJacHdl[
i] = 0;
531 thread_data[0].lock = 0;
548 thread_data[0].lock[
i] = AO_TS_INITIALIZER;
551 thread_data[0].ppNaiveJacHdl = 0;
554 thread_data[0].ppNaiveJacHdl[0] = pNaiveJacHdl;
557 for (
unsigned i = 1;
i < nThreads;
i++) {
558 thread_data[
i].lock = thread_data[0].lock;
559 thread_data[
i].ppNaiveJacHdl = thread_data[0].ppNaiveJacHdl;
560 thread_data[0].ppNaiveJacHdl[
i] = 0;
562 if (pNaivePermJacHdl) {
583 silent_cerr(
"unable to detect jacobian matrix type "
584 "for multithread assembly" << std::endl);
592 ASSERT(thread_data != NULL);
594 AO_CLEAR(&propagate_ErrMatrixRebuild);
602 for (
unsigned i = 1;
i < nThreads;
i++) {
604 thread_data[
i].pJacHdl = 0;
610 DEBUGCERR(
"CC_NO => CC_FIRST" << std::endl);
612 ASSERT(dynamic_cast<SpMapMatrixHandler *>(&JacHdl) != 0);
624 DEBUGCERR(
"CC_FIRST => CC_YES" << std::endl);
626 for (
unsigned i = 1;
i < nThreads;
i++) {
627 thread_data[
i].pJacHdl = pMH->
Copy();
648 thread_data[0].ElemIter.ResetAccessData();
649 op = MultiThreadDataManager::OP_ASSJAC_CC;
650 thread_count = nThreads - 1;
652 for (
unsigned i = 1;
i < nThreads;
i++) {
653 thread_data[
i].dCoef = dCoef;
655 sem_post(&thread_data[
i].sem);
660 *thread_data[0].pWorkMat);
663 silent_cerr(
"thread " << thread_data[0].threadNumber
664 <<
" caught ErrRebuildMatrix"
667 mbdyn_test_and_set(&propagate_ErrMatrixRebuild);
673 pthread_mutex_lock(&thread_mutex);
674 if (thread_count > 0) {
675 pthread_cond_wait(&thread_cond, &thread_mutex);
677 pthread_mutex_unlock(&thread_mutex);
679 if (propagate_ErrMatrixRebuild == AO_TS_SET) {
680 for (
unsigned i = 1;
i < nThreads;
i++) {
682 thread_data[
i].pJacHdl = 0;
689 for (
unsigned i = 1;
i < nThreads;
i++) {
697 ASSERT(thread_data != NULL);
700 thread_data[0].ElemIter.ResetAccessData();
701 op = MultiThreadDataManager::OP_ASSJAC_NAIVE;
702 thread_count = nThreads - 1;
704 for (
unsigned i = 1;
i < nThreads;
i++) {
705 thread_data[
i].dCoef = dCoef;
707 sem_post(&thread_data[
i].sem);
713 thread_data[0].ppNaiveJacHdl[0]->Reset();
717 thread_data[0].ElemIter,
718 *thread_data[0].pWorkMat);
720 pthread_mutex_lock(&thread_mutex);
721 if (thread_count > 0) {
722 pthread_cond_wait(&thread_cond, &thread_mutex);
724 pthread_mutex_unlock(&thread_mutex);
727 op = MultiThreadDataManager::OP_SUM_NAIVE;
728 thread_count = nThreads - 1;
729 for (
unsigned i = 1;
i < nThreads;
i++) {
730 sem_post(&thread_data[
i].sem);
743 iFrom, iTo, thread_data[0].lock);
746 pthread_mutex_lock(&thread_mutex);
747 if (thread_count > 0) {
748 pthread_cond_wait(&thread_cond, &thread_mutex);
751 pthread_mutex_unlock(&thread_mutex);
754 #ifdef MBDYN_X_MT_ASSRES
757 throw(ChangedEquationStructure)
759 ASSERT(thread_data != NULL);
761 thread_data[0].ElemIter.ResetAccessData();
762 op = MultiThreadDataManager::OP_ASSRES;
763 thread_count = nThreads - 1;
765 for (
unsigned i = 1;
i < nThreads;
i++) {
766 thread_data[
i].dCoef = dCoef;
768 sem_post(&thread_data[
i].sem);
772 *thread_data[0].pWorkVec);
774 pthread_mutex_lock(&thread_mutex);
775 if (thread_count > 0) {
776 pthread_cond_wait(&thread_cond, &thread_mutex);
778 pthread_mutex_unlock(&thread_mutex);
780 for (
unsigned i = 1;
i < nThreads;
i++) {
781 ResHdl += *thread_data[
i].pResHdl;
787 MultiThreadDataManager::GetCPUTime(
void)
const
789 return const_cast<MultiThreadDataManager *
>(
this)->ThreadDestroy();
integer iGetNumRows(void) const
virtual CompactSparseMatrixHandler * Copy(void) const =0
#define MBDYN_EXCEPT_ARGS
void AddUnchecked(const CompactSparseMatrixHandler &m)
#define SAFEDELETEARR(pnt)
virtual void AssRes(VectorHandler &ResHdl, doublereal dCoef)
int mbdyn_task2cpu(int cpu)
const std::vector< integer > & GetPerm(void) const
virtual void AssJac(MatrixHandler &JacHdl, doublereal dCoef)
#define ASSERT(expression)
#define SAFENEWWITHCONSTRUCTOR(pnt, item, constructor)
static std::stack< cleanup * > c
#define SAFENEWARRNOFILL(pnt, item, sz)
#define SAFENEWARR(pnt, item, sz)
static const doublereal a
const std::vector< integer > & GetInvPerm(void) const
virtual integer iGetNumRows(void) const =0