31 PPath configurationFile;
32 defaultMode.getValue(configurationFile,
"daemonconfig");
34 defaultMode.getValue(daemonName,
"daemonname");
37 return load(configurationFile, daemonName);
46 const ConfigNode * mapMainConfig = dico.getChild(
"swarm");
47 if(mapMainConfig !=
nullptr){
48 p_mainConfig.recvTimeoutMs = phoenix_get_value<int>(*mapMainConfig,
"recv_timeout_ms", -1);
49 p_mainConfig.sendTimeoutMs = phoenix_get_value<int>(*mapMainConfig,
"send_timeout_ms", -1);
50 p_mainConfig.recvFlag = daemonRecvFlagFromString(phoenix_get_string(*mapMainConfig,
"recv_flag",
"NON_BLOCK"));
51 p_mainConfig.sendFlag = daemonSendFlagFromString(phoenix_get_string(*mapMainConfig,
"send_flag",
"NON_BLOCK"));
53 std::map<PString, time_t> mapTimeout;
54 const ConfigNode * extraConfigParam =
nullptr;
57 const ConfigNode * mapStatDaemon = dico.getChild(
"statistics");
58 if(mapStatDaemon !=
nullptr){
62 if(
p_config.getStatDaemonName() !=
""){
64 p_log.getLogError() <<
"BaseDaemon::loadConfigFromNode : stat daemon '"<<
p_config.getStatDaemonName()<<
"' not found in configuration" << std::endl;
68 p_log.getLogInfo() <<
"BaseDaemon::loadConfigFromNode : loading configuration successful" << std::endl;
69 p_log.getLogInfo() <<
"BaseDaemon::loadConfigFromNode : ready to call addCallMethod to subscribe on received data" << std::endl;
109 p_log.getLogWarning() <<
"BaseDaemon::addCallMethod : no redefined addCallMethod " << std::endl;
111 std::cerr <<
"BaseDaemon::addCallMethod : no redefined addCallMethod (and no log file so we use std::cerr)" << std::endl;
124 getLog().getLogError() <<
"BaseDaemon::callMethod : function '"<<name<<
"' not found" << std::endl;
140 p_log.getLogInfo() <<
"Stopping Daemon '"<<
p_config.getName()<<
"' at '"<<
p_config.getHostName()<<
"'" << std::endl;
163 MapDaemonConfig::const_iterator it(
p_mapDaemon.find(name));
192 message = it->second;
205 getLog().getLogInfo() <<
"BaseDaemon::processConfirmedMessage : adding statistics for processed transaction " <<
id << std::endl;
206 const Message & message = it->second;
207 time_t ellapsedTime = currentTime - message.
getSendTime();
209 getLog().getLogDebug() <<
"BaseDaemon::processConfirmedMessage : add statistic for receiver : " << receiver << std::endl;
213 getLog().getLogInfo() <<
"BaseDaemon::removeConfirmedMessage : remove confirmed transaction " <<
id << std::endl;
216 getLog().getLogError() <<
"BaseDaemon::processConfirmedMessage : transaction " <<
id <<
" not found in p_mapMessageToBeConfirmed" << std::endl;
224 std::vector<size_t> messagesToRemove;
226 const size_t & messageId = it->first;
227 const Message & message = it->second;
228 time_t elapsedTime = currentTime - message.
getSendTime();
230 getLog().getLogError() <<
"BaseDaemon::checkMessageTimeout() : message " << messageId <<
" has no receiver." << std::endl;
233 std::map<PString, time_t>::const_iterator timeoutIt =
p_config.getMapTimeout().find(message.
getVecRecver().front());
234 if(timeoutIt ==
p_config.getMapTimeout().end()){
235 getLog().getLogError() <<
"BaseDaemon::checkMessageTimeout() : no timeout found for receiver '" << message.
getVecRecver().front() <<
"' of message " << messageId <<
"." << std::endl;
238 time_t timeout = timeoutIt->second;
240 if (elapsedTime > timeout) {
241 messagesToRemove.push_back(messageId);
242 getLog().getLogWarning() <<
"BaseDaemon::checkMessageTimeout() : Timeout Reached for message " << messageId <<
" sent to '" << message.
getVecRecver().front() <<
"'. Removing unconfirmed transaction" << std::endl;
245 for(
const size_t & messageId : messagesToRemove){
263 PString prototype(data.
getType());
264 std::map<PString, AbstractDataFunction*>::iterator it(
p_mapDataFunction.find(prototype));
266 getLog().getLogError() <<
"BaseDaemon::getDataFunction : function to process data '"<<prototype<<
"' not found" << std::endl;
278 if(function ==
nullptr){
281 getLog().getLogDebug() <<
"BaseDaemon::processData : process data '"<<data.
getType()<<
"'" << std::endl;
292 getLog().getLogDebug() <<
"BaseDaemon::updateStatAccumulator : event nb is " << stat.
getNbEvent() << std::endl;
298 if(value < stat.
getMin()){
301 if(value > stat.
getMax()){
316 vecHistogram[bin] = vecHistogram[bin] + 1lu;
329 newStat.
setMin(std::nan(
""));
330 newStat.
setMax(std::nan(
""));
331 newStat.
setSum(std::nan(
""));
350 for(MapStatAccumulator::iterator it =
p_config.getDaemonStatAccumulator().getMapStatComputing().begin(); it !=
p_config.getDaemonStatAccumulator().getMapStatComputing().end(); ++it){
351 it->second.setNbEvent(0lu);
352 std::fill(it->second.getVecHistogram().begin(), it->second.getVecHistogram().end(), 0lu);
354 for(MapDaemonStatAccumulator::iterator itMap =
p_config.getDaemonStatAccumulator().getMapStatCommunication().begin(); itMap !=
p_config.getDaemonStatAccumulator().getMapStatCommunication().end(); ++itMap){
355 for(MapStatAccumulator::iterator it = itMap->second.begin(); it != itMap->second.end(); ++it){
356 it->second.setNbEvent(0lu);
357 std::fill(it->second.getVecHistogram().begin(), it->second.getVecHistogram().end(), 0lu);
392 p_optionParser.setExampleLongOption(
"phoenix_daemon --daemonconfig=daemon_config.toml --daemonname=main");
393 p_optionParser.setExampleShortOption(
"phoenix_daemon -c daemon_config.toml -n main");
394 p_optionParser.addOption(
"daemonconfig",
"c", OptionType::FILENAME,
true,
"Toml configuration file which define all Daemons of the swarm");
395 p_optionParser.addOption(
"daemonname",
"n", OptionType::STRING,
true,
"Name of the current Daemon of the swarm configuration defined with --daemonconfig");
396 p_optionParser.addOption(
"mock",
"m", OptionType::NONE,
false,
"Activate the Daemon in mock mode for the sockets and the clock");
397 p_optionParser.addOption(
"mockrecord",
"r", OptionType::NONE,
false,
"Activate the Daemon in mock record mode for the sockets and the clock");
412 std::map<PString, StatAccumulator> newStatMap;
413 newStatMap[dataType] =
createNewStat(nbBin, histLowerBound, histUpperBound);
426 std::map<PString, std::map<PString, StatAccumulator> > & mapCommunication =
p_config.getDaemonStatAccumulator().getMapStatCommunication();
427 MapDaemonStatAccumulator::iterator it = mapCommunication.find(destName);
428 if(it == mapCommunication.end()){
432 std::map<PString, StatAccumulator>::iterator itDataType = it->second.find(dataType);
433 if(itDataType == it->second.end()){
434 it->second[dataType] =
createNewStat(nbBin, histLowerBound, histUpperBound);
458 time_t rate = endTimestamp - startTimestamp;
479 for(MapStatAccumulator::iterator it = mapStatComputing.begin(); it != mapStatComputing.end(); ++it){
487 for(MapDaemonStatAccumulator::iterator itMap = mapStatCommunication.begin(); itMap != mapStatCommunication.end(); ++itMap){
488 const PString & destName = itMap->first;
490 for(MapStatAccumulator::iterator it = itMap->second.begin(); it != itMap->second.end(); ++it){
493 mapVecStat[it->first] = vecStat;
std::map< PString, std::map< PString, StatAccumulator > > MapDaemonStatAccumulator
std::map< PString, StatAccumulator > MapStatAccumulator
Abstract function definition which will be callable in Daemon.
virtual bool call(PLog &log, const Data &data)=0
PLog p_log
Logger of the current Daemon.
void clearCallableMethod()
Clear the map of callable methods.
bool p_isRun
True if the current BaseDaemon is running.
void loadConfigFromNode(const ConfigNode &dico, const PString &daemonName)
Load configuration form a node of ConfigNode.
bool isDaemonExist(const PString &name) const
Say if a neighbour Daemon does exist.
OptionParser p_optionParser
Option parser of the Daemon.
void updateStatAccumulator(StatAccumulator &stat, float value)
Update a computing statistic with a new value.
size_t p_messageId
Id counter of the message of the current Daemon.
OptionParser & getOptionParser()
Return the OptionParser of the current BaseDaemon.
std::map< PString, StatAccumulator > createNewCommunicationStatMap(const PString &dataType, size_t nbBin, float histLowerBound, float histUpperBound)
Create a new map of communication statistic.
bool p_isFullMockRecord
True if the daemon has to be executed in mock record mode for socket and clock.
bool getMessageToConfirm(Message &message, size_t id) const
Get a message to confirm by id if it exists.
VecStat fillVecStat(const StatAccumulator &accumulator, time_t startTimestamp, time_t endTimestamp)
Fill a VecStat from a StatAccumulator to send to the DamonStat.
bool processData(const Data &data)
Process given data with the proper method.
PLog & getLogger()
Get the logger of the current BaseDaemon.
bool load(const PString &configFileContent, const PString &daemonName, ConfigFormat::ConfigFormat format)
Load the Json configuration which define all BaseDaemons of the Swarm.
DaemonMainConfig p_mainConfig
Main configuration of the Daemon.
bool p_isFullMock
True if the daemon has to be executed in mock mode for socket and clock.
void initialisationBaseDaemon()
Initialisation function of the class BaseDaemon.
MapDaemonConfig & getMapDaemonConfig()
Get the map of all Daemon configurations.
DaemonConfig & getConfig()
Get the configuration of the current BaseDaemon.
bool callMethod(Data &result, const PString &name, const Data ¶meter)
Call an added callable method by name.
std::map< PString, AbstractFunction * > p_mapCallableMethod
Map of callable method of the Daemon.
DaemonMainConfig & getMainConfig()
Get the main configuration of the swarm.
void addMessageToConfirm(const Message &message)
Add a message to confirm.
virtual void addCallMethod()
Do the addCallableMethod here.
virtual bool extraLoad(const ConfigNode *config)
Extra call on load for the current Daemon.
AbstractDataFunction * getDataFunction(const Data &data)
Get the data function associated with the given data.
void fillDaemonStat(DaemonStat &stat, time_t startTimestamp, time_t endTimestamp)
Fill the DaemonStat with the current statistics of the daemon.
PLog & getLog()
Get the log of the current BaseDaemon.
void clearStat()
Clear all the statistics of the daemon.
void stop()
Stops the BaseDaemon.
std::map< PString, AbstractDataFunction * > p_mapDataFunction
Map of methods which have to be called when receiving data.
void checkMessageTimeout(time_t currentTime)
Check if a message has reached the timeout.
DaemonConfig p_config
Configuration of the curent Daemon.
MapDaemonConfig p_mapDaemon
Map of the other Daemon of the Swarm.
size_t getMessageId()
Get current message id.
void getCommunicationStat(const PString &destName, const PString &dataType, float latency, size_t nbBin, float histLowerBound, float histUpperBound)
Add a communication statistic.
BaseDaemon()
Default constructor of BaseDaemon.
StatAccumulator createNewStat(size_t nbBin, float histLowerBound, float histUpperBound)
Create a new clear computing statistic.
std::map< size_t, Message > p_mapMessageToBeConfirmed
Map of messages which have to be confirmed by destination Daemon.
const std::map< size_t, Message > & getMapMessageToBeConfirmed() const
Get the map of message to be confirmed.
void processConfirmedMessage(size_t id, time_t currentTime)
Process confirmed message.
virtual ~BaseDaemon()
Destructor of BaseDaemon.
bool parseArgument(int argc, char **argv)
Parse arguments given to the BaseDaemon with command line.
Describe a Daemon of the Swarm.
const std::map< PString, VecStat > & getMapStatComputing() const
Gets the mapStatComputing of the DaemonStat.
const std::map< PString, std::map< PString, VecStat > > & getMapStatCommunication() const
Gets the mapStatCommunication of the DaemonStat.
Basic Data exchanged in the swarm.
const PString & getType() const
Gets the type of the Data.
Message exchanged by Daemons.
const std::vector< PString > & getVecRecver() const
Gets the vecRecver of the Message.
const Data & getData() const
Gets the data of the Message.
size_t getId() const
Gets the id of the Message.
const time_t & getSendTime() const
Gets the sendTime of the Message.
Accumulator of event occurence to build swarm statistics over a time period.
float getHistUpperBound() const
Gets the histUpperBound of the StatAccumulator.
void setMin(float min)
Sets the min of the StatAccumulator.
float getSum() const
Gets the sum of the StatAccumulator.
void setVecHistogram(const std::vector< size_t > &vecHistogram)
Sets the vecHistogram of the StatAccumulator.
size_t getNbEvent() const
Gets the nbEvent of the StatAccumulator.
float getMax() const
Gets the max of the StatAccumulator.
size_t getNbEventAboveUpperBound() const
Gets the nbEventAboveUpperBound of the StatAccumulator.
float getMin() const
Gets the min of the StatAccumulator.
float getHistLowerBound() const
Gets the histLowerBound of the StatAccumulator.
void setHistUpperBound(float histUpperBound)
Sets the histUpperBound of the StatAccumulator.
void setMax(float max)
Sets the max of the StatAccumulator.
void setNbEventAboveUpperBound(size_t nbEventAboveUpperBound)
Sets the nbEventAboveUpperBound of the StatAccumulator.
void setHistLowerBound(float histLowerBound)
Sets the histLowerBound of the StatAccumulator.
void setNbEventBelowLowerBound(size_t nbEventBelowLowerBound)
Sets the nbEventBelowLowerBound of the StatAccumulator.
void setSum(float sum)
Sets the sum of the StatAccumulator.
size_t getNbEventBelowLowerBound() const
Gets the nbEventBelowLowerBound of the StatAccumulator.
const std::vector< size_t > & getVecHistogram() const
Gets the vecHistogram of the StatAccumulator.
void setNbEvent(size_t nbEvent)
Sets the nbEvent of the StatAccumulator.
General statistics in the swarm.
const std::vector< float > & getMax() const
Gets the max of the VecStat.
const std::vector< time_t > & getEndTimestamp() const
Gets the endTimestamp of the VecStat.
const std::vector< float > & getRate() const
Gets the rate of the VecStat.
const std::vector< time_t > & getStartTimestamp() const
Gets the startTimestamp of the VecStat.
const std::vector< float > & getAverage() const
Gets the average of the VecStat.
const std::vector< float > & getRateEventBelowLowerBound() const
Gets the rateEventBelowLowerBound of the VecStat.
const std::vector< float > & getRateEventAboveUpperBound() const
Gets the rateEventAboveUpperBound of the VecStat.
const std::vector< float > & getMin() const
Gets the min of the VecStat.
const std::vector< size_t > & getNbEvent() const
Gets the nbEvent of the VecStat.
const std::vector< std::vector< float > > & getVecRateQuantile() const
Gets the vecRateQuantile of the VecStat.
PString daemon_loadString(const ConfigNode &dico, const PString &attributeName)
Load a string value.
void daemon_load_config(PLog &log, ConfigNode &dico, const PString &inputConfig, ConfigFormat::ConfigFormat format)
Load the daemon config into a ConfigNode from a json string.
void daemon_read_configNode(DaemonConfig &daemonConfig, MapDaemonConfig &mapDaemon, PLog &log, MapTimeout &mapTimeout, const ConfigNode *&extraConfigParam, const ConfigNode &dico, const PString &daemonName)
Read the ConfigNode to initialise current Daemon.
std::map< PString, DaemonConfig > MapDaemonConfig
Main Daemon configuration which drives timeouts and flags of send and recv calls.