1
0
Fork 0

purge zombie sources

This commit is contained in:
Kevin Matz 2021-09-10 12:48:33 -04:00
parent 220a70d51b
commit 6f6ec334ca
6 changed files with 54 additions and 23 deletions

View File

@ -50,7 +50,7 @@ Universe::~Universe()
* @param address
* @return
*/
uint8_t Universe::slot(const uint16_t address) const
uint8_t Universe::slot(const uint16_t address)
{
if (address == 0)
return 0;

View File

@ -50,7 +50,7 @@ class Universe {
Universe (int timeout_period = E111_DATA_LOSS_TIMEOUT);
virtual ~Universe ();
virtual uint8_t slot (const uint16_t) const;
virtual uint8_t slot (const uint16_t);
virtual double rxRate();
virtual uint rxAge();

View File

@ -135,7 +135,7 @@ void ArbitratingUniverse::onSourceListChange(std::function<void()> cb)
* @brief MergeProxyUniverse::isSyncronized
* @return
*/
bool ArbitratingUniverse::isSyncronized() const
bool ArbitratingUniverse::isSyncronized()
{
auto universe = dominant_();
if (!universe)
@ -148,7 +148,7 @@ bool ArbitratingUniverse::isSyncronized() const
* @brief MergeProxyUniverse::provenance
* @return
*/
std::shared_ptr<DATA::data_header> ArbitratingUniverse::provenance() const
std::shared_ptr<DATA::data_header> ArbitratingUniverse::provenance()
{
auto universe = dominant_();
if (universe)
@ -197,7 +197,7 @@ bool ArbitratingUniverse::isEditable() const {
}
uint16_t ArbitratingUniverse::activeSlots() const
uint16_t ArbitratingUniverse::activeSlots()
{
auto universe = dominant_();
if (!universe)
@ -211,7 +211,7 @@ uint16_t ArbitratingUniverse::activeSlots() const
* @param s
* @return
*/
uint8_t ArbitratingUniverse::slot(const uint16_t s) const
uint8_t ArbitratingUniverse::slot(const uint16_t s)
{
auto universe = dominant_();
if (!universe)
@ -237,16 +237,47 @@ double ArbitratingUniverse::rxRate()
* @brief MergeProxyUniverse::dominant_
* @return
*/
std::shared_ptr<Universe> ArbitratingUniverse::dominant_() const
std::shared_ptr<Universe> ArbitratingUniverse::dominant_()
{
bool sourceListChanged = false;
std::shared_ptr<Universe> ret = nullptr;
for (auto& [_, uni] : sources_)
for(auto it = sources_.begin(); it != sources_.end(); )
{
if (uni->rxRate() < (DMX::E111_DATA_LOSS_TIMEOUT / 1000.0))
continue; // stale universes cannot be dominant
if (!ret || uni->provenance() > ret->provenance())
ret = uni;
auto universe = it->second;
if (!ret)
{ // anything is better than nothing!
++it;
ret = universe;
continue;
}
auto age = universe->rxAge();
if (age > E131_NETWORK_DATA_LOSS_TIMEOUT)
{ // clean up zombie universes
it = sources_.erase(it);
sourceListChanged = true;
continue;
}
if (age > DMX::E111_DATA_LOSS_TIMEOUT)
{
++it;
continue; // stale universes cannot be dominant
}
if (!ret || universe->provenance() > ret->provenance())
ret = universe; // rank by provenance
++it;
}
if (sourceListChanged)
for (const auto & cb : cb_sourceListChange)
cb();
return ret;
}

View File

@ -68,15 +68,15 @@ public:
// SACN::Universe overrides:
void set(std::shared_ptr<ACN::DMP::Pdu>, std::shared_ptr<DATA::data_header>) override;
std::shared_ptr<DATA::data_header> provenance() const override;
std::shared_ptr<DATA::data_header> provenance() override;
void setProvenance(std::shared_ptr<DATA::data_header>) override {};
bool isSyncronized() const override;
bool isSyncronized() override;
void synchronize(uint8_t = 0) override;
bool isEditable() const override;
uint16_t activeSlots() const override;
uint16_t activeSlots() override;
// api for poly-source universes
const std::vector<DATA::data_header> sources() const override;
@ -85,14 +85,14 @@ public:
void onSourceListChange(std::function<void()>) override;
// DMX::Universe Overrides:
uint8_t slot(const uint16_t) const override;
uint8_t slot(const uint16_t) override;
double rxRate() override;
private:
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::function<void()>> cb_sourceListChange; //!< list of calback functions
std::shared_ptr<Universe> dominant_() const;
std::shared_ptr<Universe> dominant_();
bool hasSourceUniverse(const DATA::data_header&) const;
};

View File

@ -159,7 +159,7 @@ void Universe::setProvenance(std::shared_ptr<DATA::data_header> source)
* @brief Universe::isSyncronized
* @return
*/
bool Universe::isSyncronized() const
bool Universe::isSyncronized()
{
return (!sync_data_);
};
@ -223,7 +223,7 @@ std::shared_ptr<Universe> Universe::addNewSource(const DATA::data_header&)
* @brief Universe::activeSlots
* @return
*/
uint16_t Universe::activeSlots() const
uint16_t Universe::activeSlots()
{
return active_data_slots;
}
@ -291,7 +291,7 @@ void Universe::setValue (const uint16_t start, const uint16_t footprint,
* @brief Universe::source
* @return
*/
std::shared_ptr<DATA::data_header> Universe::provenance() const
std::shared_ptr<DATA::data_header> Universe::provenance()
{
return provenance_;
}

View File

@ -53,15 +53,15 @@ public:
virtual void set(ACN::PDU::Message<ACN::DMP::Pdu>, std::shared_ptr<DATA::data_header>);
virtual std::shared_ptr<DATA::data_header> provenance() const;
virtual std::shared_ptr<DATA::data_header> provenance();
virtual void setProvenance(std::shared_ptr<DATA::data_header>);
virtual bool isSyncronized() const;
virtual bool isSyncronized();
virtual void synchronize(uint8_t = 0);
virtual bool isEditable() const;
virtual uint16_t activeSlots() const;
virtual uint16_t activeSlots();
// api for poly-source universes
virtual const std::vector<DATA::data_header> sources() const;