PhoenixSwarm  5.1.1
Library to ease communication between daemons
Loading...
Searching...
No Matches
Daemon_impl.h
Go to the documentation of this file.
1/***************************************
2 Auteur : Pierre Aubert
3 Mail : pierre.aubert@lapp.in2p3.fr
4 Licence : CeCILL-C
5****************************************/
6
7#ifndef __DAEMON_H_IMPL__
8#define __DAEMON_H_IMPL__
9
10#include "phoenix_assert.h"
11#include "Daemon.h"
12
13using namespace Swarm;
14
16template<class _TBackend>
22
24template<class _TBackend>
28
30
32template<class _TBackend>
33void Daemon<_TBackend>::setSocketMode(PSocketMode::PSocketMode mode){
34 p_backend.socketManager.setMode(mode);
35 switch(mode){
36 case PSocketMode::MOCK:
37 p_isFullMock = true;
38 p_isFullMockRecord = false;
39 break;
40 case PSocketMode::MOCK_RECORD:
41 p_isFullMock = false;
42 p_isFullMockRecord = true;
43 break;
44 default:
45 p_isFullMock = false;
46 p_isFullMockRecord = false;
47 break;
48 }
49}
50
52
54template<class _TBackend>
55void Daemon<_TBackend>::setClockMode(PClockMode::PClockMode mode){
56 p_backend.clock.setMode(mode);
57 switch(mode){
58 case PClockMode::MOCK:
59 p_isFullMock = true;
60 p_isFullMockRecord = false;
61 break;
62 case PClockMode::MOCK_RECORD:
63 p_isFullMock = false;
64 p_isFullMockRecord = true;
65 break;
66 default:
67 p_isFullMock = false;
68 p_isFullMockRecord = false;
69 break;
70 }
71}
72
74
76template<class _TBackend>
77PRecvStatus::PRecvStatus Daemon<_TBackend>::checkRecvStatus(PRecvStatus::PRecvStatus recvStatus){
78 if(recvStatus == PRecvStatus::SOCKET_NOT_AVAILABLE){
79// p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket not available", "Socket not available");
80 p_log.getLogWarning() << "Daemon::checkRecvStatus : Socket not available" << std::endl;
81 }
82 if(recvStatus == PRecvStatus::SIGNAL_INTERRUPTION){
83 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket Backend caught a signal, aborting", "Socket Backend caught a signal, aborting");
84 }
85 if(recvStatus == PRecvStatus::BROKEN_BACKEND){
86 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Back-end is in invalid state", "Back-end is in invalid state");
87 }
88 if(recvStatus == PRecvStatus::BROKEN_SOCKET){
89 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Socket is in invalid state", "Socket is in invalid state");
90 }
91 if(recvStatus == PRecvStatus::CANNOT_DESERIALIZE_DATA){
92 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkRecvStatus : Cannot deserialize data", "Cannot deserialize data");
93 }
94 return recvStatus;
95}
96
97
99
101template<class _TBackend>
102void Daemon<_TBackend>::checkSendStatus(PSendStatus::PSendStatus sendStatus){
103 if(sendStatus == PSendStatus::SOCKET_NOT_AVAILABLE){
104 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket not available", "Socket not available");
105 }
106 if(sendStatus == PSendStatus::NO_ROUTE_TO_RECEIVER){
107 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Receiver not reachable", "Receiver not reachable");
108 }
109 if(sendStatus == PSendStatus::SIGNAL_INTERRUPTION){
110 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket Backend caught a signal, aborting.", "Socket Backend caught a signal, aborting.");
111 }
112 if(sendStatus == PSendStatus::BROKEN_BACKEND){
113 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Back-end is in invalid state", "Back-end is in invalid state");
114 }
115 if(sendStatus == PSendStatus::BROKEN_SOCKET){
116 p_log.criticalAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Socket is in invalid state", "Socket is in invalid state");
117 }
118 if(sendStatus == PSendStatus::CANNOT_SERIALIZE_DATA){
119 p_log.errorAndThrow<Phoenix::SocketStatusException>("Daemon::checkSendStatus : Cannot serialize data", "Cannot serialize data");
120 }
121}
122
124
126template<class _TBackend>
128 p_isRun = true;
129 //Let's initialise all connexions
131 p_startTimestamp = p_backend.clock.now();
132 p_statTimer.setEllapsedTime(p_config.getStatTimerPeriodMs());
133 p_statTimer.setStartTime(p_startTimestamp);
134 while(p_isRun){
135 //Let's get the last received message
136 //If there is not message, we treat the rest of the loop
137 Message message;
138 PRecvStatus::PRecvStatus recvStatus = checkRecvStatus(p_backend.socketManager.recvData("pull", message, p_mainConfig.recvFlag));
139 if(recvStatus == PRecvStatus::OK){
140 processInputMessage(message);
141 }
143
144 time_t currentTime = p_backend.clock.now();
145 checkMessageTimeout(currentTime);
146 if(p_statTimer.isTime(currentTime) || !p_isRun){
147 sendStatToStatDaemon(currentTime);
148 }
149 }
150 //Let's execute a last command before stop
152 return true;
153}
154
156
159template<class _TBackend>
161 bool b(true);
162 const PVecString & vecDestination = message.getVecRecver();
163 for(PVecString::const_iterator it(vecDestination.begin()); it != vecDestination.end(); ++it){
164 b &= sendMessage(*it, message);
165 }
166 return b;
167}
168
170
174template<class _TBackend>
175bool Daemon<_TBackend>::sendMessage(const PString & destinationName, const Message & message){
176 if(message.getIsConfirmationNeeded()){
177 addMessageToConfirm(message);
178 }
179 checkSendStatus(p_backend.socketManager.sendData(destinationName, message, p_mainConfig.sendFlag));
180 return true;
181}
182
184
189template<class _TBackend>
190bool Daemon<_TBackend>::sendData(const PString & destinationName, const Data & data, bool isConfirmationNeeded){
191 Message message;
192 message.setSendTime(p_backend.clock.now());
193 message.setData(data);
194 message.setId(getMessageId());
195 message.setIsConfirmationNeeded(isConfirmationNeeded);
197 message.setSender(p_config.getName());
198 message.getVecRecver().push_back(destinationName);
199 return sendMessage(destinationName, message);
200}
201
203template<class _TBackend>
205
207template<class _TBackend>
209
211template<class _TBackend>
215
217template<class _TBackend>
219 if(p_config.getName() == ""){
220 getLog().criticalAndThrow<Phoenix::ConfigException>("Daemon::initialisationDaemonSocket", "daemon has no name");
221 }
223 setSocketMode(PSocketMode::MOCK_RECORD);
224 setClockMode(PClockMode::MOCK_RECORD);
225 }else{
226 if(p_isFullMock){
227 setSocketMode(PSocketMode::MOCK);
228 setClockMode(PClockMode::MOCK);
229 }
230 }
231 //Let's send when we are a client and recv when we are a server
232 //By doing this way, we have one server per Daemon and as many clients as other Daemon
233 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : initialise pull socket for daemon '" << p_config.getName() << "'" << std::endl;
234 phoenix_assert(p_backend.socketManager.addServerSocket("pull",
235 PSocketParam{p_config.getHostName(), p_config.getReceivingPort(), p_mainConfig.recvTimeoutMs, p_mainConfig.sendTimeoutMs}, p_backend.extraServerParam,
236 "./" + p_config.getName() + "_", p_backend.extraMockServerParam));
237
238 //Now let's initialise connexions to other
239 for(MapDaemonConfig::iterator it(p_mapDaemon.begin()); it != p_mapDaemon.end(); ++it){
240 if(it->second.getName() == p_config.getName()){continue;}
241 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : initialise send socket for neighbour Daemon '"<<it->second.getName()<<"'" << std::endl;
242 phoenix_assert(p_backend.socketManager.addClientSocket(it->second.getName(),
243 PSocketParam{it->second.getHostName(), it->second.getReceivingPort(), p_mainConfig.recvTimeoutMs, p_mainConfig.sendTimeoutMs}, p_backend.extraClientParam,
244 "./" + p_config.getName() + "_", p_backend.extraMockClientParam));
245 // p_backend.socketManager.getSocket(it->second.getName())->waitUntilConnection(500000lu, 30lu); //0.5 s per check and 30 checks
246 }
247 getLog().getLogInfo() << "Daemon<_TBackend>::initialisationDaemonSocket() : all connection to neighbours Daemon initialised" << std::endl;
248 p_backend.clock.setMockPrefix("./" + p_config.getName() + "_");
249}
250
252
254template<class _TBackend>
256 //Check is the message needs a confirmation
257 if(message.getIsConfirmationNeeded()){
258 Message confirmation;
259 //We do not need to change the time of the message because it has to be in the clock reference of the sender
260 confirmation.setSendTime(message.getSendTime());
261 confirmation.setId(message.getId());
262 confirmation.setSender(p_config.getName());
264 confirmation.getVecRecver().push_back(message.getSender());
265 sendMessage(confirmation);
266 }
267
268 //Let's check is we receive a stop message
269 if(message.getType() == MessageType::STOP){stop();}
270 else if(message.getType() == MessageType::MESSAGE_CONFIRMATION){
271 processConfirmedMessage(message.getId(), p_backend.clock.now());
272 }else if(message.getType() == MessageType::RESULT_DATA){
273 AbstractDataFunction* function = getDataFunction(message.getData());
274 if(function == NULL){
275 return;
276 }
277 time_t functionStartTime = p_backend.clock.now();
278 if(!function->call(p_log, message.getData())){
279 getLog().getLogError() << "Daemon<_TBackend>::processInputMessage : cannot process data of type '"<<message.getData().getType()<<"' from Daemon '"<<message.getSender()<<"'" << std::endl;
280 return;
281 }
282 time_t functionEllapsedTime = p_backend.clock.now() - functionStartTime;
283 getLog().getLogDebug() << "Daemon<_TBackend>::processInputMessage : function '"<<message.getData().getType()<<"' processed data from daemon '"<<message.getSender()<<"' in "<<functionEllapsedTime<<" nanoseconds" << std::endl;
284 if(p_config.getStatDaemonName() != "" && p_config.getStatDaemonName() != p_config.getName()){
285 updateStatAccumulator(p_config.getDaemonStatAccumulator().getMapStatComputing().find(message.getData().getType())->second, functionEllapsedTime);
286 }
287 }
288}
289
291
293template<class _TBackend>
295 //Let's fill the Stat
296 Stat stat;
297 fillDaemonStat(stat, p_startTimestamp, currentTime);
298 stat.setName(p_config.getName());
299
300 //Let's send it to the Stat Daemon
301 if(p_config.getStatDaemonName() != "" && p_config.getStatDaemonName() != p_config.getName()){
302 getLog().getLogDebug() << "Daemon::sendStatToStatDaemon() : sending statistics to Stat Daemon '"<<p_config.getStatDaemonName()<<"'" << std::endl;
303 sendValue(p_config.getStatDaemonName(), stat, true);
304 }
305
306 clearStat();
307 p_startTimestamp = currentTime;
308}
309
310#endif
311
Exception for daemon configuration errors.
Exception for socket status errors.
Abstract function definition which will be callable in Daemon.
virtual bool call(PLog &log, const Swarm::Data &data)=0
bool p_isRun
True if the current BaseDaemon is running.
Definition BaseDaemon.h:168
void updateStatAccumulator(StatAccumulator &stat, float value)
Update a computing statistic with a new value.
void addMessageToConfirm(const Swarm::Message &message)
Add a message to confirm.
bool p_isFullMock
True if the daemon has to be executed in mock mode for socket and clock.
Definition BaseDaemon.h:170
void fillDaemonStat(Swarm::Stat &stat, time_t startTimestamp, time_t endTimestamp)
Fill the Stat with the current statistics of the daemon.
PLog p_log
Logger of the current Daemon.
Definition BaseDaemon.h:166
PLog & getLog()
Get the log of the current BaseDaemon.
void clearStat()
Clear all the statistics of the daemon.
void stop()
Stops the BaseDaemon.
void checkMessageTimeout(time_t currentTime)
Check if a message has reached the timeout.
size_t getMessageId()
Get current message id.
bool p_isFullMockRecord
True if the daemon has to be executed in mock record mode for socket and clock.
Definition BaseDaemon.h:172
BaseDaemon()
Default constructor of BaseDaemon.
DaemonConfig p_config
Configuration of the curent Daemon.
Definition BaseDaemon.h:162
Swarm::AbstractDataFunction * getDataFunction(const Swarm::Data &data)
Get the data function associated with the given data.
MapDaemonConfig p_mapDaemon
Map of the other Daemon of the Swarm.
Definition BaseDaemon.h:164
void processConfirmedMessage(size_t id, time_t currentTime)
Process confirmed message.
DaemonMainConfig p_mainConfig
Main configuration of the Daemon.
Definition BaseDaemon.h:160
virtual void extraLoopProcessing()
Computing Method for each event loop (when receiving message from other Daemon)
bool sendMessage(const Swarm::Message &message)
Send message to other Daemon.
void initialisationDaemonSocket()
Initialise the Daemon Sockets.
Daemon()
Default constructor of Daemon.
Definition Daemon_impl.h:17
virtual void executeOnStop()
Method which is called on stop of the Daemon.
bool sendData(const PString &destinationName, const Swarm::Data &data, bool isConfirmationNeeded)
Send data to other Daemon (specialization for Data)
void setClockMode(PClockMode::PClockMode mode)
Set the mode of the clock.
Definition Daemon_impl.h:55
void initialisationDaemon()
Initialisation function of the class Daemon.
void setSocketMode(PSocketMode::PSocketMode mode)
Set the mode of the sockets of the SocketManager.
Definition Daemon_impl.h:33
void checkSendStatus(PSendStatus::PSendStatus sendStatus)
Check the status of recv sockets.
PTimer p_statTimer
Timer to send statistics to the Stat Daemon.
Definition Daemon.h:51
void processInputMessage(Swarm::Message &message)
Process an input message.
PRecvStatus::PRecvStatus checkRecvStatus(PRecvStatus::PRecvStatus recvStatus)
Check the status of recv sockets.
Definition Daemon_impl.h:77
bool sendValue(const PString &destinationName, const T &data, bool isConfirmationNeeded=true)
Send data to other Daemon.
Definition Daemon.h:38
_TBackend p_backend
Full backend of the Daemon (for Socket and clock)
Definition Daemon.h:49
time_t p_startTimestamp
Start timestamp of the current accumulation of statistics.
Definition Daemon.h:53
bool run()
Run the Daemon.
virtual ~Daemon()
Destructor of Daemon.
Definition Daemon_impl.h:25
void sendStatToStatDaemon(time_t currentTime)
Send the statistics of the Daemon to the Stat Daemon.
Basic Data exchanged in the swarm.
const PString & getType() const
Gets the type of the Data.
Message exchanged by Daemons.
const PString & getSender() const
Gets the sender of the Message.
void setData(const Swarm::Data &data)
Sets the data of the Message.
void setType(const MessageType::MessageType &type)
Sets the type of the Message.
const std::vector< PString > & getVecRecver() const
Gets the vecRecver of the Message.
void setSendTime(const time_t &sendTime)
Sets the sendTime of the Message.
const Swarm::Data & getData() const
Gets the data of the Message.
void setSender(const PString &sender)
Sets the sender of the Message.
size_t getId() const
Gets the id of the Message.
bool getIsConfirmationNeeded() const
Gets the isConfirmationNeeded of the Message.
void setIsConfirmationNeeded(bool isConfirmationNeeded)
Sets the isConfirmationNeeded of the Message.
void setId(size_t id)
Sets the id of the Message.
const MessageType::MessageType & getType() const
Gets the type of the Message.
const time_t & getSendTime() const
Gets the sendTime of the Message.
Statistics of a Daemon.
void setName(const PString &name)
Sets the name of the Stat.
@ MESSAGE_CONFIRMATION
Definition MessageType.h:20