33 PPath configurationFile;
34 defaultMode.getValue(configurationFile,
"daemonconfig");
36 defaultMode.getValue(daemonName,
"daemonname");
39 return load(configurationFile, daemonName);
48 const ConfigNode * mapMainConfig = dico.getChild(
"swarm");
49 if(mapMainConfig !=
nullptr){
50 p_mainConfig.recvTimeoutMs = phoenix_get_value<int>(*mapMainConfig,
"recv_timeout_ms", -1);
51 p_mainConfig.sendTimeoutMs = phoenix_get_value<int>(*mapMainConfig,
"send_timeout_ms", -1);
52 p_mainConfig.recvFlag = fromString<PRecvFlag::PRecvFlag>(phoenix_get_string(*mapMainConfig,
"recv_flag",
"NON_BLOCK"));
53 p_mainConfig.sendFlag = fromString<PSendFlag::PSendFlag>(phoenix_get_string(*mapMainConfig,
"send_flag",
"NON_BLOCK"));
55 std::map<PString, time_t> mapTimeout;
56 const ConfigNode * extraConfigParam =
nullptr;
59 const ConfigNode * mapStatDaemon = dico.getChild(
"statistics");
60 if(mapStatDaemon !=
nullptr){
65 if(
p_config.getStatDaemonName() !=
""){
67 p_log.getLogError() <<
"BaseDaemon::loadConfigFromNode : stat daemon '"<<
p_config.getStatDaemonName()<<
"' not found in configuration" << std::endl;
71 p_log.getLogInfo() <<
"BaseDaemon::loadConfigFromNode : loading configuration successful" << std::endl;
72 p_log.getLogInfo() <<
"BaseDaemon::loadConfigFromNode : ready to call addCallMethod to subscribe on received data" << std::endl;
74 if(extraConfigParam !=
nullptr){
115 p_log.getLogWarning() <<
"BaseDaemon::addCallMethod : no redefined addCallMethod " << std::endl;
117 std::cerr <<
"BaseDaemon::addCallMethod : no redefined addCallMethod (and no log file so we use std::cerr)" << std::endl;
130 getLog().getLogError() <<
"BaseDaemon::callMethod : function '"<<name<<
"' not found" << std::endl;
146 p_log.getLogInfo() <<
"Stopping Daemon '"<<
p_config.getName()<<
"' at '"<<
p_config.getHostName()<<
"'" << std::endl;
169 MapDaemonConfig::const_iterator it(
p_mapDaemon.find(name));
198 message = it->second;
211 getLog().getLogInfo() <<
"BaseDaemon::processConfirmedMessage : adding statistics for processed transaction " <<
id << std::endl;
212 const Message & message = it->second;
213 time_t ellapsedTime = currentTime - message.
getSendTime();
215 getLog().getLogDebug() <<
"BaseDaemon::processConfirmedMessage : add statistic for receiver : " << receiver << std::endl;
219 getLog().getLogInfo() <<
"BaseDaemon::removeConfirmedMessage : remove confirmed transaction " <<
id << std::endl;
222 getLog().getLogError() <<
"BaseDaemon::processConfirmedMessage : transaction " <<
id <<
" not found in p_mapMessageToBeConfirmed" << std::endl;
230 std::vector<size_t> messagesToRemove;
232 const size_t & messageId = it->first;
233 const Message & message = it->second;
234 time_t elapsedTime = currentTime - message.
getSendTime();
236 getLog().getLogError() <<
"BaseDaemon::checkMessageTimeout() : message " << messageId <<
" has no receiver." << std::endl;
239 std::map<PString, time_t>::const_iterator timeoutIt =
p_config.getMapTimeout().find(message.
getVecRecver().front());
240 if(timeoutIt ==
p_config.getMapTimeout().end()){
241 getLog().getLogError() <<
"BaseDaemon::checkMessageTimeout() : no timeout found for receiver '" << message.
getVecRecver().front() <<
"' of message " << messageId <<
"." << std::endl;
244 time_t timeout = timeoutIt->second;
246 if (elapsedTime > timeout) {
247 messagesToRemove.push_back(messageId);
248 getLog().getLogWarning() <<
"BaseDaemon::checkMessageTimeout() : Timeout Reached for message " << messageId <<
" sent to '" << message.
getVecRecver().front() <<
"'. Removing unconfirmed transaction" << std::endl;
251 for(
const size_t & messageId : messagesToRemove){
269 PString prototype(data.
getType());
270 std::map<PString, AbstractDataFunction*>::iterator it(
p_mapDataFunction.find(prototype));
272 getLog().getLogError() <<
"BaseDaemon::getDataFunction : function to process data '"<<prototype<<
"' not found" << std::endl;
284 if(function ==
nullptr){
287 getLog().getLogDebug() <<
"BaseDaemon::processData : process data '"<<data.
getType()<<
"'" << std::endl;
298 getLog().getLogDebug() <<
"BaseDaemon::updateStatAccumulator : event nb is " << stat.
getNbEvent() << std::endl;
304 if(value < stat.
getMin()){
307 if(value > stat.
getMax()){
322 vecHistogram[bin] = vecHistogram[bin] + 1lu;
335 newStat.
setMin(std::nan(
""));
336 newStat.
setMax(std::nan(
""));
337 newStat.
setSum(std::nan(
""));
356 for(MapStatAccumulator::iterator it =
p_config.getDaemonStatAccumulator().getMapStatComputing().begin(); it !=
p_config.getDaemonStatAccumulator().getMapStatComputing().end(); ++it){
357 it->second.setNbEvent(0lu);
358 std::fill(it->second.getVecHistogram().begin(), it->second.getVecHistogram().end(), 0lu);
360 for(MapDaemonStatAccumulator::iterator itMap =
p_config.getDaemonStatAccumulator().getMapStatCommunication().begin(); itMap !=
p_config.getDaemonStatAccumulator().getMapStatCommunication().end(); ++itMap){
361 for(MapStatAccumulator::iterator it = itMap->second.begin(); it != itMap->second.end(); ++it){
362 it->second.setNbEvent(0lu);
363 std::fill(it->second.getVecHistogram().begin(), it->second.getVecHistogram().end(), 0lu);
398 p_optionParser.setExampleLongOption(
"phoenix_daemon --daemonconfig=daemon_config.toml --daemonname=main");
399 p_optionParser.setExampleShortOption(
"phoenix_daemon -c daemon_config.toml -n main");
400 p_optionParser.addOption(
"daemonconfig",
"c", OptionType::FILENAME,
true,
"Toml configuration file which define all Daemons of the swarm");
401 p_optionParser.addOption(
"daemonname",
"n", OptionType::STRING,
true,
"Name of the current Daemon of the swarm configuration defined with --daemonconfig");
402 p_optionParser.addOption(
"mock",
"m", OptionType::NONE,
false,
"Activate the Daemon in mock mode for the sockets and the clock");
403 p_optionParser.addOption(
"mockrecord",
"r", OptionType::NONE,
false,
"Activate the Daemon in mock record mode for the sockets and the clock");
418 std::map<PString, StatAccumulator> newStatMap;
419 newStatMap[dataType] =
createNewStat(nbBin, histLowerBound, histUpperBound);
432 std::map<PString, std::map<PString, StatAccumulator> > & mapCommunication =
p_config.getDaemonStatAccumulator().getMapStatCommunication();
433 MapDaemonStatAccumulator::iterator it = mapCommunication.find(destName);
434 if(it == mapCommunication.end()){
438 std::map<PString, StatAccumulator>::iterator itDataType = it->second.find(dataType);
439 if(itDataType == it->second.end()){
440 it->second[dataType] =
createNewStat(nbBin, histLowerBound, histUpperBound);
464 time_t rate = endTimestamp - startTimestamp;
485 for(MapStatAccumulator::iterator it = mapStatComputing.begin(); it != mapStatComputing.end(); ++it){
493 for(MapDaemonStatAccumulator::iterator itMap = mapStatCommunication.begin(); itMap != mapStatCommunication.end(); ++itMap){
494 const PString & destName = itMap->first;
496 for(MapStatAccumulator::iterator it = itMap->second.begin(); it != itMap->second.end(); ++it){
499 mapVecStat[it->first] = vecStat;
std::map< PString, Swarm::StatAccumulator > MapStatAccumulator
std::map< PString, std::map< PString, Swarm::StatAccumulator > > MapDaemonStatAccumulator
Abstract function definition which will be callable in Daemon.
virtual bool call(PLog &log, const Swarm::Data &data)=0
std::map< PString, Swarm::AbstractDataFunction * > p_mapDataFunction
Map of methods which have to be called when receiving data.
void clearCallableMethod()
Clear the map of callable methods.
bool p_isRun
True if the current BaseDaemon is running.
void loadConfigFromNode(const ConfigNode &dico, const PString &daemonName)
Load configuration form a node of ConfigNode.
OptionParser p_optionParser
Option parser of the Daemon.
bool isDaemonExist(const PString &name) const
Say if a neighbour Daemon does exist.
void updateStatAccumulator(StatAccumulator &stat, float value)
Update a computing statistic with a new value.
std::map< PString, Swarm::AbstractFunction * > p_mapCallableMethod
Map of callable method of the Daemon.
void addMessageToConfirm(const Swarm::Message &message)
Add a message to confirm.
OptionParser & getOptionParser()
Return the OptionParser of the current BaseDaemon.
std::map< PString, StatAccumulator > createNewCommunicationStatMap(const PString &dataType, size_t nbBin, float histLowerBound, float histUpperBound)
Create a new map of communication statistic.
PLog & getLogger()
Get the logger of the current BaseDaemon.
bool load(const PString &configFileContent, const PString &daemonName, ConfigFormat::ConfigFormat format)
Load the Json configuration which define all BaseDaemons of the Swarm.
bool callMethod(Swarm::Data &result, const PString &name, const Swarm::Data ¶meter)
Call an added callable method by name.
void initialisationBaseDaemon()
Initialisation function of the class BaseDaemon.
bool p_isFullMock
True if the daemon has to be executed in mock mode for socket and clock.
MapDaemonConfig & getMapDaemonConfig()
Get the map of all Daemon configurations.
DaemonConfig & getConfig()
Get the configuration of the current BaseDaemon.
VecStat fillVecStat(const Swarm::StatAccumulator &accumulator, time_t startTimestamp, time_t endTimestamp)
Fill a VecStat from a StatAccumulator to send to the DamonStat.
void fillDaemonStat(Swarm::Stat &stat, time_t startTimestamp, time_t endTimestamp)
Fill the Stat with the current statistics of the daemon.
DaemonMainConfig & getMainConfig()
Get the main configuration of the swarm.
virtual void addCallMethod()
Do the addCallableMethod here.
bool getMessageToConfirm(Swarm::Message &message, size_t id) const
Get a message to confirm by id if it exists.
PLog p_log
Logger of the current Daemon.
virtual bool extraLoad(const ConfigNode *config)
Extra call on load for the current Daemon.
PLog & getLog()
Get the log of the current BaseDaemon.
void clearStat()
Clear all the statistics of the daemon.
void stop()
Stops the BaseDaemon.
void checkMessageTimeout(time_t currentTime)
Check if a message has reached the timeout.
size_t p_messageId
Id counter of the message of the current Daemon.
std::map< size_t, Swarm::Message > p_mapMessageToBeConfirmed
Map of messages which have to be confirmed by destination Daemon.
size_t getMessageId()
Get current message id.
void getCommunicationStat(const PString &destName, const PString &dataType, float latency, size_t nbBin, float histLowerBound, float histUpperBound)
Add a communication statistic.
bool p_isFullMockRecord
True if the daemon has to be executed in mock record mode for socket and clock.
BaseDaemon()
Default constructor of BaseDaemon.
StatAccumulator createNewStat(size_t nbBin, float histLowerBound, float histUpperBound)
Create a new clear computing statistic.
DaemonConfig p_config
Configuration of the curent Daemon.
Swarm::AbstractDataFunction * getDataFunction(const Swarm::Data &data)
Get the data function associated with the given data.
const std::map< size_t, Swarm::Message > & getMapMessageToBeConfirmed() const
Get the map of message to be confirmed.
MapDaemonConfig p_mapDaemon
Map of the other Daemon of the Swarm.
void processConfirmedMessage(size_t id, time_t currentTime)
Process confirmed message.
DaemonMainConfig p_mainConfig
Main configuration of the Daemon.
virtual ~BaseDaemon()
Destructor of BaseDaemon.
bool parseArgument(int argc, char **argv)
Parse arguments given to the BaseDaemon with command line.
bool processData(const Swarm::Data &data)
Process given data with the proper method.
Describe a Daemon of the Swarm.
Basic Data exchanged in the swarm.
const PString & getType() const
Gets the type of the Data.
Message exchanged by Daemons.
const std::vector< PString > & getVecRecver() const
Gets the vecRecver of the Message.
const Swarm::Data & getData() const
Gets the data of the Message.
size_t getId() const
Gets the id of the Message.
const time_t & getSendTime() const
Gets the sendTime of the Message.
Accumulator of event occurence to build swarm statistics over a time period.
float getHistUpperBound() const
Gets the histUpperBound of the StatAccumulator.
void setMin(float min)
Sets the min of the StatAccumulator.
float getSum() const
Gets the sum of the StatAccumulator.
void setVecHistogram(const std::vector< size_t > &vecHistogram)
Sets the vecHistogram of the StatAccumulator.
size_t getNbEvent() const
Gets the nbEvent of the StatAccumulator.
float getMax() const
Gets the max of the StatAccumulator.
size_t getNbEventAboveUpperBound() const
Gets the nbEventAboveUpperBound of the StatAccumulator.
float getMin() const
Gets the min of the StatAccumulator.
float getHistLowerBound() const
Gets the histLowerBound of the StatAccumulator.
void setHistUpperBound(float histUpperBound)
Sets the histUpperBound of the StatAccumulator.
void setMax(float max)
Sets the max of the StatAccumulator.
void setNbEventAboveUpperBound(size_t nbEventAboveUpperBound)
Sets the nbEventAboveUpperBound of the StatAccumulator.
void setHistLowerBound(float histLowerBound)
Sets the histLowerBound of the StatAccumulator.
void setNbEventBelowLowerBound(size_t nbEventBelowLowerBound)
Sets the nbEventBelowLowerBound of the StatAccumulator.
void setSum(float sum)
Sets the sum of the StatAccumulator.
size_t getNbEventBelowLowerBound() const
Gets the nbEventBelowLowerBound of the StatAccumulator.
const std::vector< size_t > & getVecHistogram() const
Gets the vecHistogram of the StatAccumulator.
void setNbEvent(size_t nbEvent)
Sets the nbEvent of the StatAccumulator.
const std::map< DaemonName, std::map< DataType, Swarm::VecStat > > & getMapStatCommunication() const
Gets the mapStatCommunication of the Stat.
const std::map< PString, Swarm::VecStat > & getMapStatComputing() const
Gets the mapStatComputing of the Stat.
General statistics in the swarm.
const std::vector< float > & getMax() const
Gets the max of the VecStat.
const std::vector< time_t > & getEndTimestamp() const
Gets the endTimestamp of the VecStat.
const std::vector< float > & getRate() const
Gets the rate of the VecStat.
const std::vector< time_t > & getStartTimestamp() const
Gets the startTimestamp of the VecStat.
const std::vector< float > & getAverage() const
Gets the average of the VecStat.
const std::vector< float > & getRateEventBelowLowerBound() const
Gets the rateEventBelowLowerBound of the VecStat.
const std::vector< float > & getRateEventAboveUpperBound() const
Gets the rateEventAboveUpperBound of the VecStat.
const std::vector< float > & getMin() const
Gets the min of the VecStat.
const std::vector< size_t > & getNbEvent() const
Gets the nbEvent of the VecStat.
const std::vector< std::vector< float > > & getVecRateQuantile() const
Gets the vecRateQuantile of the VecStat.
PString daemon_loadString(const ConfigNode &dico, const PString &attributeName)
Load a string value.
void daemon_load_config(PLog &log, ConfigNode &dico, const PString &inputConfig, ConfigFormat::ConfigFormat format)
Load the daemon config into a ConfigNode from a json string.
void daemon_read_configNode(Swarm::DaemonConfig &daemonConfig, MapDaemonConfig &mapDaemon, PLog &log, MapTimeout &mapTimeout, const ConfigNode *&extraConfigParam, const ConfigNode &dico, const PString &daemonName)
Read the ConfigNode to initialise current Daemon.
std::map< PString, Swarm::DaemonConfig > MapDaemonConfig
Main Daemon configuration which drives timeouts and flags of send and recv calls.