PhoenixSwarm  3.5.0
Library to ease communication between daemons
Loading...
Searching...
No Matches
Daemon_impl.h
Go to the documentation of this file.
1/***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5****************************************/
6
7#ifndef __DAEMON_H_IMPL__
8#define __DAEMON_H_IMPL__
9
10#include "phoenix_assert.h"
11#include "Daemon.h"
12
14template<class _TBackend>
20
22template<class _TBackend>
26
28
30template<class _TBackend>
31void Daemon<_TBackend>::setSocketMode(PSocketMode::PSocketMode mode){
32 p_backend.socketManager.setMode(mode);
33 switch(mode){
34 case PSocketMode::MOCK:
35 p_isFullMock = true;
36 p_isFullMockRecord = false;
37 break;
38 case PSocketMode::MOCK_RECORD:
39 p_isFullMock = false;
40 p_isFullMockRecord = true;
41 break;
42 default:
43 p_isFullMock = false;
44 p_isFullMockRecord = false;
45 break;
46 }
47}
48
50
52template<class _TBackend>
53void Daemon<_TBackend>::setClockMode(PClockMode::PClockMode mode){
54 p_backend.clock.setMode(mode);
55 switch(mode){
56 case PClockMode::MOCK:
57 p_isFullMock = true;
58 p_isFullMockRecord = false;
59 break;
60 case PClockMode::MOCK_RECORD:
61 p_isFullMock = false;
62 p_isFullMockRecord = true;
63 break;
64 default:
65 p_isFullMock = false;
66 p_isFullMockRecord = false;
67 break;
68 }
69}
70
72
74template<class _TBackend>
75PRecvStatus::PRecvStatus Daemon<_TBackend>::checkRecvStatus(PRecvStatus::PRecvStatus recvStatus){
76 if(recvStatus == PRecvStatus::SOCKET_NOT_AVAILABLE){
77// p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket not available", "Socket not available");
78 p_log.getLogWarning() << "Daemon::checkRecvStatus : Socket not available" << std::endl;
79 }
80 if(recvStatus == PRecvStatus::SIGNAL_INTERRUPTION){
81 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket Backend caught a signal, aborting", "Socket Backend caught a signal, aborting");
82 }
83 if(recvStatus == PRecvStatus::BROKEN_BACKEND){
84 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Back-end is in invalid state", "Back-end is in invalid state");
85 }
86 if(recvStatus == PRecvStatus::BROKEN_SOCKET){
87 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket is in invalid state", "Socket is in invalid state");
88 }
89 if(recvStatus == PRecvStatus::CANNOT_DESERIALIZE_DATA){
90 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Cannot deserialize data", "Cannot deserialize data");
91 }
92 return recvStatus;
93}
94
95
97
99template<class _TBackend>
100void Daemon<_TBackend>::checkSendStatus(PSendStatus::PSendStatus sendStatus){
101 if(sendStatus == PSendStatus::SOCKET_NOT_AVAILABLE){
102 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket not available", "Socket not available");
103 }
104 if(sendStatus == PSendStatus::NO_ROUTE_TO_RECEIVER){
105 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Receiver not reachable", "Receiver not reachable");
106 }
107 if(sendStatus == PSendStatus::SIGNAL_INTERRUPTION){
108 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket Backend caught a signal, aborting.", "Socket Backend caught a signal, aborting.");
109 }
110 if(sendStatus == PSendStatus::BROKEN_BACKEND){
111 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Back-end is in invalid state", "Back-end is in invalid state");
112 }
113 if(sendStatus == PSendStatus::BROKEN_SOCKET){
114 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket is in invalid state", "Socket is in invalid state");
115 }
116 if(sendStatus == PSendStatus::CANNOT_SERIALIZE_DATA){
117 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Cannot serialize data", "Cannot serialize data");
118 }
119}
120
122
124template<class _TBackend>
126 p_isRun = true;
127 //Let's initialise all connexions
129 p_startTimestamp = p_backend.clock.now();
130 p_statTimer.setEllapsedTime(p_config.getStatTimerPeriodMs());
131 p_statTimer.setStartTime(p_startTimestamp);
132 while(p_isRun){
133 //Let's get the last received message
134 //If there is not message, we treat the rest of the loop
135 Message message;
136 PRecvStatus::PRecvStatus recvStatus = checkRecvStatus(p_backend.socketManager.recvData("pull", message, p_mainConfig.recvFlag));
137 if(recvStatus == PRecvStatus::OK){
138 processInputMessage(message);
139 }
141
142 time_t currentTime = p_backend.clock.now();
143 checkMessageTimeout(currentTime);
144 if(p_statTimer.isTime(currentTime) || !p_isRun){
145 sendStatToStatDaemon(currentTime);
146 }
147 }
148 //Let's execute a last command before stop
150 return true;
151}
152
154
157template<class _TBackend>
159 bool b(true);
160 const PVecString & vecDestination = message.getVecRecver();
161 for(PVecString::const_iterator it(vecDestination.begin()); it != vecDestination.end(); ++it){
162 b &= sendMessage(*it, message);
163 }
164 return b;
165}
166
168
172template<class _TBackend>
173bool Daemon<_TBackend>::sendMessage(const PString & destinationName, const Message & message){
174 if(message.getIsConfirmationNeeded()){
175 addMessageToConfirm(message);
176 }
177 checkSendStatus(p_backend.socketManager.sendData(destinationName, message, p_mainConfig.sendFlag));
178 return true;
179}
180
182
187template<class _TBackend>
188bool Daemon<_TBackend>::sendData(const PString & destinationName, const Data & data, bool isConfirmationNeeded){
189 Message message;
190 message.setSendTime(p_backend.clock.now());
191 message.setData(data);
192 message.setId(getMessageId());
193 message.setIsConfirmationNeeded(isConfirmationNeeded);
195 message.setSender(p_config.getName());
196 message.getVecRecver().push_back(destinationName);
197 return sendMessage(destinationName, message);
198}
199
201template<class _TBackend>
203
205template<class _TBackend>
207
209template<class _TBackend>
213
215template<class _TBackend>
217 if(p_config.getName() == ""){
218 getLog().criticalAndThrow<Phoenix::ConfigException>("Daemon::initialisationDaemonSocket", "daemon has no name");
219 }
221 setSocketMode(PSocketMode::MOCK_RECORD);
222 setClockMode(PClockMode::MOCK_RECORD);
223 }else{
224 if(p_isFullMock){
225 setSocketMode(PSocketMode::MOCK);
226 setClockMode(PClockMode::MOCK);
227 }
228 }
229 //Let's send when we are a client and recv when we are a server
230 //By doing this way, we have one server per Daemon and as many clients as other Daemon
231 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : initialise pull socket for daemon '" << p_config.getName() << "'" << std::endl;
232 phoenix_assert(p_backend.socketManager.addServerSocket("pull",
233 PSocketParam{p_config.getHostName(), p_config.getReceivingPort(), p_mainConfig.recvTimeoutMs, p_mainConfig.sendTimeoutMs}, p_backend.extraServerParam,
234 "./" + p_config.getName() + "_", p_backend.extraMockServerParam));
235
236 //Now let's initialise connexions to other
237 for(MapDaemonConfig::iterator it(p_mapDaemon.begin()); it != p_mapDaemon.end(); ++it){
238 if(it->second.getName() == p_config.getName()){continue;}
239 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : initialise send socket for neighbour Daemon '"<<it->second.getName()<<"'" << std::endl;
240 phoenix_assert(p_backend.socketManager.addClientSocket(it->second.getName(),
241 PSocketParam{it->second.getHostName(), it->second.getReceivingPort(), p_mainConfig.recvTimeoutMs, p_mainConfig.sendTimeoutMs}, p_backend.extraClientParam,
242 "./" + p_config.getName() + "_", p_backend.extraMockClientParam));
243 // p_backend.socketManager.getSocket(it->second.getName())->waitUntilConnection(500000lu, 30lu); //0.5 s per check and 30 checks
244 }
245 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : all connection to neighbours Daemon initialised" << std::endl;
246 p_backend.clock.setMockPrefix("./" + p_config.getName() + "_");
247}
248
250
252template<class _TBackend>
254 //Check is the message needs a confirmation
255 if(message.getIsConfirmationNeeded()){
256 Message confirmation;
257 //We do not need to change the time of the message because it has to be in the clock reference of the sender
258 confirmation.setSendTime(message.getSendTime());
259 confirmation.setId(message.getId());
260 confirmation.setSender(p_config.getName());
262 confirmation.getVecRecver().push_back(message.getSender());
263 sendMessage(confirmation);
264 }
265
266 //Let's check is we receive a stop message
267 if(message.getType() == MessageType::STOP){stop();}
268 else if(message.getType() == MessageType::MESSAGE_CONFIRMATION){
269 processConfirmedMessage(message.getId(), p_backend.clock.now());
270 }else if(message.getType() == MessageType::RESULT_DATA){
271 AbstractDataFunction* function = getDataFunction(message.getData());
272 if(function == NULL){
273 return;
274 }
275 time_t functionStartTime = p_backend.clock.now();
276 if(!function->call(p_log, message.getData())){
277 getLog().getLogError() << "Daemon<_TBackend>::processInputMessage : cannot process data of type '"<<message.getData().getType()<<"' from Daemon '"<<message.getSender()<<"'" << std::endl;
278 return;
279 }
280 time_t functionEllapsedTime = p_backend.clock.now() - functionStartTime;
281 getLog().getLogDebug() << "Daemon<_TBackend>::processInputMessage : function '"<<message.getData().getType()<<"' processed data from daemon '"<<message.getSender()<<"' in "<<functionEllapsedTime<<" nanoseconds" << std::endl;
282 if(p_config.getStatDaemonName() != "" && p_config.getStatDaemonName() != p_config.getName()){
283 updateStatAccumulator(p_config.getDaemonStatAccumulator().getMapStatComputing().find(message.getData().getType())->second, functionEllapsedTime);
284 }
285 }
286}
287
289
291template<class _TBackend>
293 //Let's fill the DaemonStat
294 DaemonStat stat;
295 fillDaemonStat(stat, p_startTimestamp, currentTime);
296 stat.setName(p_config.getName());
297
298 //Let's send it to the Stat Daemon
299 if(p_config.getStatDaemonName() != "" && p_config.getStatDaemonName() != p_config.getName()){
300 getLog().getLogDebug() << "Daemon::sendStatToStatDaemon() : sending statistics to Stat Daemon '"<<p_config.getStatDaemonName()<<"'" << std::endl;
301 sendValue(p_config.getStatDaemonName(), stat, true);
302 }
303
304 clearStat();
305 p_startTimestamp = currentTime;
306}
307
308#endif
309
Abstract function definition which will be callable in Daemon.
virtual bool call(PLog &log, const Data &data)=0
PLog p_log
Logger of the current Daemon.
Definition BaseDaemon.h:166
bool p_isRun
True if the current BaseDaemon is running.
Definition BaseDaemon.h:168
void updateStatAccumulator(StatAccumulator &stat, float value)
Update a computing statistic with a new value.
bool p_isFullMockRecord
True if the daemon has to be executed in mock record mode for socket and clock.
Definition BaseDaemon.h:172
DaemonMainConfig p_mainConfig
Main configuration of the Daemon.
Definition BaseDaemon.h:160
bool p_isFullMock
True if the daemon has to be executed in mock mode for socket and clock.
Definition BaseDaemon.h:170
void addMessageToConfirm(const Message &message)
Add a message to confirm.
AbstractDataFunction * getDataFunction(const Data &data)
Get the data function associated with the given data.
void fillDaemonStat(DaemonStat &stat, time_t startTimestamp, time_t endTimestamp)
Fill the DaemonStat with the current statistics of the daemon.
PLog & getLog()
Get the log of the current BaseDaemon.
void clearStat()
Clear all the statistics of the daemon.
void stop()
Stops the BaseDaemon.
void checkMessageTimeout(time_t currentTime)
Check if a message has reached the timeout.
DaemonConfig p_config
Configuration of the curent Daemon.
Definition BaseDaemon.h:162
MapDaemonConfig p_mapDaemon
Map of the other Daemon of the Swarm.
Definition BaseDaemon.h:164
size_t getMessageId()
Get current message id.
BaseDaemon()
Default constructor of BaseDaemon.
void processConfirmedMessage(size_t id, time_t currentTime)
Process confirmed message.
Statistics of a Daemon.
void setName(const PString &name)
Sets the name of the DaemonStat.
_TBackend p_backend
Full backend of the Daemon (for Socket and clock)
Definition Daemon.h:49
bool sendValue(const PString &destinationName, const T &data, bool isConfirmationNeeded=true)
Send data to other Daemon.
Definition Daemon.h:38
PTimer p_statTimer
Timer to send statistics to the Stat Daemon.
Definition Daemon.h:51
Daemon()
Default constructor of Daemon.
Definition Daemon_impl.h:15
virtual void executeOnStop()
Method which is called on stop of the Daemon.
void sendStatToStatDaemon(time_t currentTime)
Send the statistics of the Daemon to the Stat Daemon.
PRecvStatus::PRecvStatus checkRecvStatus(PRecvStatus::PRecvStatus recvStatus)
Check the status of recv sockets.
Definition Daemon_impl.h:75
time_t p_startTimestamp
Start timestamp of the current accumulation of statistics.
Definition Daemon.h:53
virtual ~Daemon()
Destructor of Daemon.
Definition Daemon_impl.h:23
void setSocketMode(PSocketMode::PSocketMode mode)
Set the mode of the sockets of the SocketManager.
Definition Daemon_impl.h:31
bool sendData(const PString &destinationName, const Data &data, bool isConfirmationNeeded)
Send data to other Daemon (specialization for Data)
bool sendMessage(const Message &message)
Send message to other Daemon.
void processInputMessage(Message &message)
Process an input message.
virtual void extraLoopProcessing()
Computing Method for each event loop (when receiving message from other Daemon)
void setClockMode(PClockMode::PClockMode mode)
Set the mode of the clock.
Definition Daemon_impl.h:53
void initialisationDaemon()
Initialisation function of the class Daemon.
bool run()
Run the Daemon.
void initialisationDaemonSocket()
Initialise the Daemon Sockets.
void checkSendStatus(PSendStatus::PSendStatus sendStatus)
Check the status of recv sockets.
Basic Data exchanged in the swarm.
const PString & getType() const
Gets the type of the Data.
Message exchanged by Daemons.
const PString & getSender() const
Gets the sender of the Message.
void setType(const MessageType::MessageType &type)
Sets the type of the Message.
const std::vector< PString > & getVecRecver() const
Gets the vecRecver of the Message.
void setSendTime(const time_t &sendTime)
Sets the sendTime of the Message.
void setSender(const PString &sender)
Sets the sender of the Message.
const Data & getData() const
Gets the data of the Message.
size_t getId() const
Gets the id of the Message.
bool getIsConfirmationNeeded() const
Gets the isConfirmationNeeded of the Message.
void setIsConfirmationNeeded(bool isConfirmationNeeded)
Sets the isConfirmationNeeded of the Message.
void setId(size_t id)
Sets the id of the Message.
const MessageType::MessageType & getType() const
Gets the type of the Message.
const time_t & getSendTime() const
Gets the sendTime of the Message.
void setData(const Data &data)
Sets the data of the Message.
Exception for daemon configuration errors.
Exception for socket status errors.
@ MESSAGE_CONFIRMATION
Definition MessageType.h:20