1
0
Fork 0

opportunistically perform maintenance tasks

This commit is contained in:
Kevin Matz 2022-12-10 11:15:02 -05:00
parent dbcde09616
commit 8ce86353c9
2 changed files with 27 additions and 25 deletions

View File

@ -102,13 +102,14 @@ std::shared_ptr<Universe> ArbitratingUniverse::addNewSource(const DATA::data_hea
{
{
std::unique_lock lk_ctl(mtx_control);
sources_.emplace(src, std::make_shared<Universe>());
auto data_token = sources_.at(src)->onDataChange(
std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier, this, std::placeholders::_1));
source_data_tokens.push_back(data_token);
auto status_token = sources_.at(src)->onStatusChange(
std::bind(&sACN::ArbitratingUniverse::doStatusCallbacks, this));
source_status_tokens.push_back(status_token);
auto [itr, ok] = sources_.try_emplace(src, std::make_shared<Universe>());
if (!ok)
return nullptr;
auto univ = itr->second;
source_data_tokens.push_back(univ->onDataChange(
std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier, this, std::placeholders::_1)));
source_status_tokens.push_back(univ->onStatusChange(
std::bind(&sACN::ArbitratingUniverse::doStatusCallbacks, this)));
}
doListChangeCallbacks();
@ -137,6 +138,7 @@ void ArbitratingUniverse::deleteSourceUniverse(const DATA::data_header& src)
sources_.erase(src);
}
doListChangeCallbacks();
doStatusCallbacks();
}
@ -284,6 +286,7 @@ uint8_t ArbitratingUniverse::slot(const uint16_t s) const
*/
double ArbitratingUniverse::rxRate()
{
purge_stale_sources_();
auto universe = dominant_();
if (!universe)
return 0.0;
@ -342,19 +345,18 @@ std::shared_ptr<Universe> ArbitratingUniverse::dominant_() const
/**
* @brief ArbitratingUniverse::purge_stale_sources_
* @param age_cache
*/
void ArbitratingUniverse::purge_stale_sources_(std::unordered_map<DATA::data_header,uint> *age_cache)
void ArbitratingUniverse::purge_stale_sources_()
{
std::unordered_map<DATA::data_header,uint> ages;
if (age_cache)
ages = *age_cache;
else
{
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, universe] : sources_)
ages.insert({header, universe->age()});
}
{
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, universe] : sources_)
{
universe->rxRate(); // DMX::Universe::rx_timeout for maintenance tasks
ages.insert({header, universe->age()});
}
}
// order the member universes by age
std::multimap<uint,DATA::data_header> by_age;
@ -376,15 +378,8 @@ void ArbitratingUniverse::purge_stale_sources_(std::unordered_map<DATA::data_hea
if (age < E131_NETWORK_DATA_LOSS_TIMEOUT)
break;
// erase the zombie universe
{
std::unique_lock lk_ctl(mtx_control);
sources_.erase(key);
}
deleteSourceUniverse(key);
}
// if zombies were erased, notify about the changes
if (sources_.size() != by_age.size())
doListChangeCallbacks();
}
} // namespace SACN

View File

@ -89,7 +89,11 @@ std::shared_ptr<Universe> Receiver::universe(const uint16_t num) const
{
if (!universes_.count(num))
return nullptr;
#ifdef RTTI_ENABLED
return std::dynamic_pointer_cast<Universe>(universes_.at(num));
#else
return std::static_pointer_cast<Universe>(universes_.at(num));
#endif
}
@ -322,6 +326,9 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame)
#endif
universe->setMetadata(metadata);
universe->DmpReceiver(block);
// do ArbitratingUniverse maintence tasks
universes_.at(metadata->universe)->rxRate();
}