60 #include <sys/types.h>
61 #include <sys/select.h>
62 #include <sys/socket.h>
63 #include <netinet/in.h>
69 #include <arpa/inet.h>
73 #define DEFAULT_PORT 9012
74 #define DEFAULT_HOST "127.0.0.1"
76 SocketStreamDrive::SocketStreamDrive(
unsigned int uL,
78 UseSocket *pUS,
bool c,
79 const std::string& sFileName,
80 integer nd,
const std::vector<doublereal>&
v0,
82 unsigned int ie,
bool bReceiveFirst,
84 const struct timeval& st,
87 InputEvery(ie), bReceiveFirst(bReceiveFirst), InputCounter(ie - 1),
88 pUS(pUS), recv_flags(flags),
101 if (!bReceiveFirst) {
102 InputCounter -= InputEvery;
106 pSDE->
Init(
"SocketStreamDrive", uLabel, nd);
110 SocketStreamDrive::~SocketStreamDrive(
void)
123 SocketStreamDrive::Restart(std::ostream& out)
const
125 out <<
" file: " << uLabel <<
", socket stream,"
126 " stream drive name, \"" << sFileName <<
"\"";
128 return out <<
", " << iNumDrives <<
";" << std::endl;
132 SocketStreamDrive::ServePending(
const doublereal& t)
137 if (pUS->Abandoned()) {
138 silent_cout(
"SocketStreamDrive(" << sFileName <<
"): "
139 "abandoned" << std::endl);
147 if (InputCounter != InputEvery) {
152 int sock_nr = pUS->GetSock();
155 if (SocketTimeout.tv_sec || SocketTimeout.tv_usec) {
164 FD_SET(sock_nr, &readfds);
167 struct timeval tv = SocketTimeout;
170 rc = select(sock_nr + 1, &readfds, NULL, NULL, &tv);
173 int save_errno = errno;
174 char *err_msg = strerror(save_errno);
176 silent_cout(
"SocketStreamDrive"
177 "(" << sFileName <<
"): select failed"
178 <<
" (" << save_errno <<
": "
179 << err_msg <<
")" << std::endl);
184 silent_cout(
"SocketStreamDrive"
185 "(" << sFileName <<
"): select timed out"
190 if (!FD_ISSET(sock_nr, &readfds)) {
191 silent_cout(
"SocketStreamDrive"
192 "(" << sFileName <<
"): "
193 "socket " << sock_nr <<
" reset"
204 rc = pUS->recv(&
buf[0], size, recv_flags);
210 silent_cout(
"SocketStreamDrive(" << sFileName <<
"): "
211 <<
"communication closed by host; abandoning..."
217 int save_errno = errno;
220 switch (save_errno) {
222 if (recv_flags & MSG_DONTWAIT) {
232 char *err_msg = strerror(save_errno);
233 silent_cout(
"SocketStreamDrive(" << sFileName <<
") failed "
234 "(" << save_errno <<
": " << err_msg <<
")"
248 pSDE->
Echo(&pdVal[1], iNumDrives);
260 bool bGotCreate(
false);
262 unsigned short int port = -1;
270 silent_cerr(
"SocketStreamDrive(" << uLabel <<
"): "
271 "unable to read stream drive name "
281 silent_cerr(
"SocketStreamDrive(" << uLabel <<
"):"
282 "missing stream drive name "
291 silent_cerr(
"SocketStreamDrive"
292 "(" << uLabel <<
", \"" << name <<
"\"): "
293 "\"create\" must be either \"yes\" or \"no\" "
304 silent_cerr(
"SocketStreamDrive"
305 "(" << uLabel <<
", \"" << name <<
"\"): "
306 "unable to read local path"
317 silent_cerr(
"SocketStreamDrive"
318 "(" << uLabel <<
", \"" << name <<
"\"): "
319 "cannot specify port "
320 "for a local socket "
328 #ifdef IPPORT_USERRESERVED
330 silent_cerr(
"SocketStreamDrive"
331 "(" << uLabel <<
", \"" << name <<
"\"): "
332 "cannot listen on reserved port "
333 << port <<
": less than "
349 silent_cerr(
"SocketStreamDrive"
350 "(" << uLabel <<
", \"" << name <<
"\"): "
351 "cannot specify host for a local socket "
361 silent_cerr(
"SocketStreamDrive"
362 "(" << uLabel <<
", \"" << name <<
"\"): "
363 "unable to read host "
371 }
else if (path.empty() && !bCreate) {
372 silent_cerr(
"SocketStreamDrive"
373 "(" << uLabel <<
", \"" << name <<
"\"): "
381 int socket_type = SOCK_STREAM;
384 socket_type = SOCK_DGRAM;
390 silent_cerr(
"SocketStreamDrive(" << uLabel <<
", \"" << name <<
"\"): "
391 "invalid socket type "
397 if ((socket_type == SOCK_DGRAM) && !bCreate) {
398 silent_cerr(
"SocketStreamDrive(" << uLabel <<
", \"" << name <<
"\"): "
399 "socket type=udp incompatible with create=no "
407 flags |= MSG_WAITALL;
408 #endif // MSG_WAITALL
414 #else // ! MSG_NOSIGNAL
415 silent_cout(
"SocketStreamDrive"
416 "(" << uLabel <<
", \"" << name <<
"\"): "
417 "MSG_NOSIGNAL not defined (ignored) "
420 #endif // ! MSG_NOSIGNAL
423 }
else if (HP.
IsKeyWord(
"no" "signal")) {
426 #else // ! MSG_NOSIGNAL
427 silent_cout(
"SocketStreamDrive"
428 "(" << uLabel <<
", \"" << name <<
"\"): "
429 "MSG_NOSIGNAL not defined (ignored) "
432 #endif // ! MSG_NOSIGNAL
436 flags |= MSG_WAITALL;
437 flags &= ~MSG_DONTWAIT;
439 }
else if (HP.
IsKeyWord(
"non" "blocking")) {
441 flags &= ~MSG_WAITALL;
442 flags |= MSG_DONTWAIT;
449 unsigned int InputEvery = 1;
453 silent_cerr(
"SocketStreamDrive"
454 "(" << uLabel <<
", \"" << name <<
"\"): "
455 "invalid \"input every\" value " << i
460 InputEvery = (
unsigned int)i;
463 bool bReceiveFirst(
true);
466 silent_cerr(
"SocketStreamDrive"
467 "(" << uLabel <<
", \"" << name <<
"\"): "
468 "\"receive first\" must be either \"yes\" or \"no\" "
475 struct timeval SocketTimeout = { 0, 0 };
479 silent_cerr(
"SocketStreamDrive"
480 "(" << uLabel <<
", \"" << name <<
"\"): "
481 "invalid socket timeout value " << st
486 SocketTimeout.tv_sec = long(st);
487 SocketTimeout.tv_usec = long((st - SocketTimeout.tv_sec)*1000000);
490 pedantic_cout(
"SocketStreamDrive"
491 "(" << uLabel <<
", \"" << name <<
"\"): "
492 "timeout: " << SocketTimeout.tv_sec <<
"s "
493 << SocketTimeout.tv_usec <<
"ns" << std::endl);
499 std::vector<doublereal>
v0;
506 pMod = it->second->Read(v0, HP, idrives);
510 silent_cerr(
"SocketStreamDrive"
511 "(" << uLabel <<
", \"" << name <<
"\"): "
512 "illegal number of channels " << idrives
520 for (
int i = 0; i < idrives; i++) {
535 out <<
"filedriver: " << uLabel <<
" stream";
538 if (port == (
unsigned short int)(-1)) {
558 if (socket_type == SOCK_STREAM) {
564 out <<
" " << bCreate;
567 if ((socket_type == SOCK_STREAM) && bCreate) {
568 const_cast<DataManager *
>(pDM)->RegisterSocketUser(pUS);
576 SocketStreamDrive(uLabel,
578 name, idrives,
v0, pMod,
579 InputEvery, bReceiveFirst,
580 flags, SocketTimeout,
586 #endif // MSG_NOSIGNAL
593 if (flags & MSG_DONTWAIT) {
596 #endif // MSG_DONTWAIT
604 <<
" " << bReceiveFirst
605 <<
" " << SocketTimeout.tv_sec
608 for (std::vector<doublereal>::iterator it = v0.begin(); it != v0.end(); ++it)
627 pedantic_cout(
"\"" << s <<
"\" is deprecated; "
628 "use \"stream\" instead at line "
633 if (::rtmbdyn_rtai_task != NULL){
634 silent_cout(
"starting RTMBDyn drive " << uLabel << std::endl);
640 silent_cout(
"starting stream drive " << uLabel << std::endl);
641 pDr = ReadStreamDrive(pDM, HP, uLabel);
642 #else // ! USE_SOCKET
643 silent_cerr(
"stream drive " << uLabel
645 <<
" because apparently the current architecture "
646 "does not support sockets" << std::endl);
648 #endif // ! USE_SOCKET
662 return fileDriveContentTypeMap.find(std::string(s)) != fileDriveContentTypeMap.end();
667 pedantic_cout(
"registering file drive content type \"" << name <<
"\""
669 return fileDriveContentTypeMap.insert(FileDriveContentTypeMap::value_type(name, rf)).second;
674 for (FileDriveContentTypeMap::iterator i = fileDriveContentTypeMap.begin(); i != fileDriveContentTypeMap.end(); ++i) {
677 fileDriveContentTypeMap.clear();
virtual bool IsWord(const std::string &s) const
#define MBDYN_EXCEPT_ARGS
virtual integer GetInt(integer iDefval=0)
void DestroyFileDriveContentTypes(void)
virtual const char * IsWord(const HighParser::WordSet &ws)
StreamDriveEcho * ReadStreamDriveEcho(const DataManager *pDM, MBDynParser &HP)
StreamDrive::Modifier * ReadStreamDriveModifier(MBDynParser &HP, integer nDrives)
virtual const char * GetFileName(enum Delims Del=DEFAULTDELIM)
const DriveHandler * pGetDrvHdl(void) const
bool SetFileDriveContentType(const char *name, FileDriveContentTypeReader *rf)
void Echo(const doublereal *pbuf, unsigned nChannels)
FileDriveContentTypeMap fileDriveContentTypeMap
virtual bool IsKeyWord(const char *sKeyWord)
virtual Drive * Read(unsigned uLabel, const DataManager *pDM, MBDynParser &HP)
virtual void Modify(doublereal *out, const void *in) const =0
virtual const char * GetStringWithDelims(enum Delims Del=DEFAULTDELIM, bool escape=true)
#define ASSERT(expression)
std::map< std::string, FileDriveContentTypeReader * > FileDriveContentTypeMap
#define SAFENEWWITHCONSTRUCTOR(pnt, item, constructor)
void EchoPrepare(const doublereal *pbuf, unsigned nChannels)
Drive * ReadRTMBDynInDrive(const DataManager *pDM, MBDynParser &HP, unsigned int uLabel)
static std::stack< cleanup * > c
bool Init(const std::string &msg, unsigned uLabel, unsigned nChannels)
virtual bool GetYesNo(bool &bRet)
std::ostream & GetLogFile(void) const
FileDriveContentTypeWordSetType fileDriveContentTypeWordSet
#define IPPORT_USERRESERVED
static const std::vector< doublereal > v0
static doublereal buf[BUFSIZE]
virtual HighParser::ErrOut GetLineData(void) const
virtual doublereal GetReal(const doublereal &dDefval=0.0)