1
0
Fork 0

shared lock for universe access control

This commit is contained in:
Kevin Matz 2022-12-09 11:18:27 -05:00
parent d11e136247
commit 7f8e9ce4ac
6 changed files with 123 additions and 46 deletions

View File

@ -46,6 +46,7 @@ Universe::Universe(int timeout_period)
*/
long Universe::age()
{
std::shared_lock lk_ctl(mtx_control);
if (!last_updated_.time_since_epoch().count())
return -1; // universe has never been seen
@ -66,6 +67,7 @@ long Universe::age()
double Universe::rxRate()
{
rx_timeout_();
std::shared_lock lk_ctl(mtx_control);
// updates per second
return rx_times_.size() / (rx_timeout_period_ / 1000.0);
}
@ -81,6 +83,7 @@ double Universe::rxRate()
uint8_t Universe::status()
{
rx_timeout_(); // many things may have happened since the last observation
std::shared_lock lk_ctl(mtx_control);
return status_;
}
@ -113,7 +116,10 @@ void Universe::setStatus(uint8_t val)
if (val == status_)
return;
status_ = val;
{
std::unique_lock lk_ctl(mtx_control);
status_ = val;
}
do_callbacks_(cb_statusChange);
}
@ -135,9 +141,11 @@ void Universe::setValue(const uint16_t start, const uint16_t footprint,
std::unique_lock lk_data(null_start_mutex);
std::copy_n(data, footprint, null_start_data.begin() + start);
}
{
std::unique_lock lk_ctl(mtx_control);
last_updated_ = std::chrono::system_clock::now();
}
setStatus(DMX_ACTIVE);
last_updated_ = std::chrono::system_clock::now();
do_callbacks_(cb_dataChange);
}
@ -214,6 +222,7 @@ void Universe::setAltData(const std::vector<uint8_t> & data)
*/
std::shared_ptr<void> Universe::onDataChange(const std::function<void(Universe*)> cb)
{
std::unique_lock lk_ctl(mtx_control);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void(Universe*)>>(std::move(cb));
// add callback to list (as a weak pointer)
@ -230,6 +239,7 @@ std::shared_ptr<void> Universe::onDataChange(const std::function<void(Universe*)
*/
std::shared_ptr<void> Universe::onStatusChange(const std::function<void(Universe*)> cb)
{
std::unique_lock lk_ctl(mtx_control);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void(Universe*)>>(std::move(cb));
// add callback to list (as a weak pointer)
@ -250,6 +260,7 @@ void Universe::rx_timeout_(bool add_now)
if (add_now)
{
setStatus(DMX_ACTIVE);
std::unique_lock lk_ctl(mtx_control);
last_updated_ = now;
rx_times_.push(now);
}
@ -271,7 +282,10 @@ void Universe::rx_timeout_(bool add_now)
if (age < rx_timeout_period_)
break;
else
rx_times_.pop();
{
std::unique_lock lk_ctl(mtx_control);
rx_times_.pop();
}
}
}

View File

@ -77,6 +77,7 @@ class Universe {
DimmerData null_start_data; //!< NULL Start Code data
mutable std::shared_mutex null_start_mutex; //!< memory protect Null Start data
mutable std::shared_mutex mtx_control; //!< thread protected access control
std::chrono::system_clock::time_point last_updated_; //!< time of the latest update
private:

View File

@ -52,6 +52,7 @@ ArbitratingUniverse::~ArbitratingUniverse()
*/
void ArbitratingUniverse::setHoldLastLook(const bool state)
{
std::unique_lock lk_ctl(mtx_control);
HoldLastLook = state;
}
@ -62,12 +63,14 @@ void ArbitratingUniverse::setHoldLastLook(const bool state)
*/
bool ArbitratingUniverse::getHoldLastLook() const
{
std::shared_lock lk_ctl(mtx_control);
return HoldLastLook;
}
const std::vector<DATA::data_header> ArbitratingUniverse::sources() const
{
std::shared_lock lk_ctl(mtx_control);
std::vector<DATA::data_header> keys;
for (const auto& [key, _] : sources_)
keys.push_back(key);
@ -82,6 +85,7 @@ const std::vector<DATA::data_header> ArbitratingUniverse::sources() const
*/
std::shared_ptr<Universe> ArbitratingUniverse::sourceUniverse(const DATA::data_header &src)
{
std::shared_lock lk_ctl(mtx_control);
if (!hasSourceUniverse(src))
return nullptr;
@ -96,23 +100,28 @@ std::shared_ptr<Universe> ArbitratingUniverse::sourceUniverse(const DATA::data_h
*/
bool ArbitratingUniverse::hasSourceUniverse(const DATA::data_header& src) const
{
std::shared_lock lk_ctl(mtx_control);
return (sources_.count(src));
}
std::shared_ptr<Universe> ArbitratingUniverse::addNewSource(const DATA::data_header &src)
{
sources_.emplace(src, std::make_shared<Universe>());
auto data_token = sources_.at(src)->onDataChange(std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier,
this, std::placeholders::_1));
source_data_tokens.push_back(data_token);
doListChangeCallbacks();
{
std::unique_lock lk_ctl(mtx_control);
sources_.emplace(src, std::make_shared<Universe>());
auto data_token = sources_.at(src)->onDataChange(
std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier, this, std::placeholders::_1));
source_data_tokens.push_back(data_token);
auto status_token = sources_.at(src)->onStatusChange(
std::bind(&sACN::ArbitratingUniverse::doStatusCallbacks, this));
source_status_tokens.push_back(status_token);
}
auto status_token = sources_.at(src)->onStatusChange(std::bind(&sACN::ArbitratingUniverse::doStatusCallbacks,
this));
source_status_tokens.push_back(status_token);
doListChangeCallbacks();
doStatusCallbacks();
std::shared_lock lk_ctl(mtx_control);
return sources_.at(src);
}
@ -123,13 +132,15 @@ std::shared_ptr<Universe> ArbitratingUniverse::addNewSource(const DATA::data_hea
*/
void ArbitratingUniverse::deleteSourceUniverse(const DATA::data_header& src)
{
if (!hasSourceUniverse(src))
return;
if (sources_.size() == 1 && HoldLastLook)
{
std::shared_lock lk_ctl(mtx_control);
if (!hasSourceUniverse(src))
return;
sources_.erase(src);
if (sources_.size() == 1 && HoldLastLook)
return;
std::unique_lock lk_wctl(lk_ctl);
sources_.erase(src);
}
doListChangeCallbacks();
}
@ -156,6 +167,7 @@ void ArbitratingUniverse::dataChangedNotifier(DMX::Universe* dmx)
*/
std::shared_ptr<void> ArbitratingUniverse::onSourceListChange(std::function<void()> cb)
{
std::unique_lock lk_ctl(mtx_control);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void()>>(std::move(cb));
// add callback to list (as a weak pointer)
@ -170,6 +182,7 @@ std::shared_ptr<void> ArbitratingUniverse::onSourceListChange(std::function<void
*/
void ArbitratingUniverse::doListChangeCallbacks()
{
std::shared_lock lk_ctl(mtx_control);
for (auto it = cb_sourceListChange.begin(); it != cb_sourceListChange.end();)
{
if (auto sp = it->lock())
@ -178,8 +191,10 @@ void ArbitratingUniverse::doListChangeCallbacks()
++it;
}
else
// or remove the callback
{ // or remove the callback
std::unique_lock lk_wctl(lk_ctl);
it = cb_sourceListChange.erase(it);
}
}
}
@ -207,6 +222,7 @@ std::shared_ptr<DATA::data_header> ArbitratingUniverse::metadata()
if (universe)
return universe->metadata();
std::shared_lock lk_ctl(mtx_control);
auto metadata = std::make_shared<DATA::data_header>();
metadata->universe = expectedUniverse;
metadata->priority = 255; // invalid
@ -237,12 +253,14 @@ uint8_t ArbitratingUniverse::status()
*/
void ArbitratingUniverse::synchronize(uint8_t sequence_number)
{
std::shared_lock lk_ctl(mtx_control);
for ( auto& [_, uni] : sources_)
uni->synchronize(sequence_number);
}
bool ArbitratingUniverse::isEditable() const {
bool ArbitratingUniverse::isEditable() const
{
return false;
}
@ -301,25 +319,31 @@ void ArbitratingUniverse::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> mess
std::shared_ptr<Universe> ArbitratingUniverse::dominant_()
{
if (sources_.empty())
return nullptr;
return nullptr;
// cache the age of each universe
std::unordered_map<DATA::data_header,uint> ages;
for (const auto& [header, universe] : sources_)
ages.insert({header, universe->age()});
{
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, universe] : sources_)
ages.insert({header, universe->age()});
}
purge_stale_sources_(&ages);
// order universe into a two dimentional container; priority then age
std::map<uint,std::multimap<uint,std::shared_ptr<Universe>>> by_priority;
for (const auto& [header, universe] : sources_)
{
auto age = ages.at(header);
auto priority = header.priority;
if (!by_priority.count(priority))
by_priority.emplace(priority, std::multimap<uint,std::shared_ptr<Universe>>({{age,universe}}));
else
by_priority.at(priority).insert({age, universe});
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, universe] : sources_)
{
auto age = ages.at(header);
auto priority = header.priority;
if (!by_priority.count(priority))
by_priority.emplace(priority, std::multimap<uint,std::shared_ptr<Universe>>({{age,universe}}));
else
by_priority.at(priority).insert({age, universe});
}
}
// freshest universe at the hightest priority
@ -337,13 +361,19 @@ void ArbitratingUniverse::purge_stale_sources_(std::unordered_map<DATA::data_hea
if (age_cache)
ages = *age_cache;
else
{
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, universe] : sources_)
ages.insert({header, universe->age()});
}
// order the member universes by age
std::multimap<uint,DATA::data_header> by_age;
for (const auto& [header, _] : sources_)
{
std::shared_lock lk_ctl(mtx_control);
for (const auto& [header, _] : sources_)
by_age.insert({ages.at(header), header});
}
// clean up stale universes, oldest first
for(auto it = by_age.crbegin(); it != by_age.crend(); it++)
@ -357,7 +387,10 @@ void ArbitratingUniverse::purge_stale_sources_(std::unordered_map<DATA::data_hea
if (age < E131_NETWORK_DATA_LOSS_TIMEOUT)
break;
// erase the zombie universe
sources_.erase(key);
{
std::unique_lock lk_ctl(mtx_control);
sources_.erase(key);
}
}
// if zombies were erased, notify about the changes

View File

@ -107,8 +107,10 @@ void Universe::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
/// > DMX512-A [DMX] Slots (including the START Code slot).
if (range.count < 1 || range.count > null_start_data.size())
return;
active_data_slots_ = range.address + range.count;
{
std::unique_lock lk_ctl(mtx_control);
active_data_slots_ = range.address + range.count;
}
/// > \cite sACN 7.7 Property Values (DMX512-A Data)
/// >
/// > The DMP Layer's Property values field is used to encode the
@ -133,6 +135,7 @@ void Universe::rxDmpSubscribe(ACN::PDU::Message<ACN::DMP::Pdu> message)
*/
void Universe::setMetadata(std::shared_ptr<DATA::data_header> metadata)
{
std::unique_lock lk_ctl(mtx_control);
metadata_ = metadata;
}
@ -143,6 +146,7 @@ void Universe::setMetadata(std::shared_ptr<DATA::data_header> metadata)
*/
bool Universe::isSyncronized()
{
std::shared_lock lk_ctl(mtx_control);
return !sync_data_;
};
@ -153,6 +157,7 @@ bool Universe::isSyncronized()
*/
bool Universe::isEditable() const
{
std::shared_lock lk_ctl(mtx_control);
return sender_;
}
@ -163,6 +168,7 @@ bool Universe::isEditable() const
*/
const std::vector<DATA::data_header> Universe::sources() const
{
std::shared_lock lk_ctl(mtx_control);
std::vector<DATA::data_header> keys = {*metadata_};
return keys;
}
@ -187,6 +193,7 @@ std::shared_ptr<void> Universe::onSourceListChange(std::function<void()> callbac
*/
std::shared_ptr<Universe> Universe::sourceUniverse(const DATA::data_header& src)
{
std::shared_lock lk_ctl(mtx_control);
if (src == *metadata_)
return shared_from_this();
@ -210,6 +217,7 @@ std::shared_ptr<Universe> Universe::addNewSource(const DATA::data_header&)
*/
uint16_t Universe::activeSlots()
{
std::shared_lock lk_ctl(mtx_control);
return active_data_slots_;
}
@ -226,7 +234,10 @@ void Universe::setValue (const uint16_t start, const uint16_t footprint,
// set active_data_slots to at least end of footprint
if (start + footprint > active_data_slots_)
active_data_slots_ = start + footprint;
{
std::unique_lock lk_ctl(mtx_control);
active_data_slots_ = start + footprint;
}
// get a copy of the current values
uint8_t og[footprint];
@ -262,6 +273,7 @@ void Universe::setStatus(uint8_t status)
*/
std::shared_ptr<DATA::data_header> Universe::metadata()
{
std::shared_lock lk_ctl(mtx_control);
return metadata_;
}
@ -288,7 +300,10 @@ void Universe::synchronize(uint8_t sequence_number)
int8_t dif = b - a;
if (dif <= 0 && dif > -20)
return;
sync_sequence_ = sequence_number;
{
std::unique_lock lk_ctl(mtx_control);
sync_sequence_ = sequence_number;
}
DMX::Universe::setData(*sync_data_);
resetSynchronization();
}
@ -299,6 +314,7 @@ void Universe::synchronize(uint8_t sequence_number)
*/
void Universe::resetSynchronization()
{
std::unique_lock lk_ctl(mtx_control);
delete sync_data_;
sync_data_ = nullptr;
}
@ -311,6 +327,7 @@ void Universe::resetSynchronization()
void Universe::setSyncData(const std::vector<uint8_t> & data)
{
resetSynchronization();
std::unique_lock lk_ctl(mtx_control);
sync_data_ = new std::vector<uint8_t>(data);
}

View File

@ -90,6 +90,7 @@ UniverseSender::~UniverseSender()
*/
bool UniverseSender::isSending()
{
std::shared_lock lk_ctl(mtx_control);
return (worker_.joinable());
}
@ -101,6 +102,7 @@ bool UniverseSender::isSending()
*/
void UniverseSender::flush()
{
std::shared_lock lk_ctl(mtx_control);
request_.notify_all();
}
@ -111,7 +113,7 @@ void UniverseSender::flush()
void UniverseSender::kill()
{
{
std::unique_lock lk_ctl(mtx_control_);
std::unique_lock lk_ctl(mtx_control);
enable_ = false;
}
flush();
@ -138,7 +140,7 @@ void UniverseSender::loop_()
// check for control permission to continue looping
{
std::shared_lock lk_ctl(mtx_control_);
std::shared_lock lk_ctl(mtx_control);
enabled = enable_;
}
@ -164,19 +166,24 @@ void UniverseSender::loop_()
else
{
mUniverse->setStatus(Universe::sACN_TERMINATED); // note the changed operating status
std::unique_lock lk_univ(mUniverse->mtx_control);
mUniverse->metadata_->options.stream_terminated = true; // set the stream_terminated bit
std::unique_lock lk_ctl(mtx_control);
--terminated_resend; // stream_terminated must be sent 3 times
sleep = minimum_update_time; // stay throttled up through the termination sequence
}
// send the sACN message
mUniverse->sendDMP(dmp_);
mUniverse->last_updated_ = std::chrono::system_clock::now();
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
/// >
/// > ... The sequence number for a universe shall be incremented by one for
/// > every packet sent on that universe...
mUniverse->metadata_->sequence_number++;
{
std::unique_lock lk_ctl(mUniverse->mtx_control);
mUniverse->last_updated_ = std::chrono::system_clock::now();
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
/// >
/// > ... The sequence number for a universe shall be incremented by one for
/// > every packet sent on that universe...
mUniverse->metadata_->sequence_number++;
}
// sleep before the next cycle
request_.wait_for(lk_thread, sleep);
@ -189,6 +196,8 @@ void UniverseSender::loop_()
*/
void UniverseSender::update_dmp_()
{
std::unique_lock lk_ctl(mtx_control);
// header segment
auto addrtyp = std::static_pointer_cast<ACN::DMP::address_type>(dmp_->header());
@ -220,6 +229,9 @@ void UniverseSender::update_dmp_()
*/
void UniverseSender::dataFrameSender(ACN::PDU::Message<ACN::DMP::Pdu> dmp) const
{
std::shared_lock lk_ctl(mtx_control);
std::shared_lock lk_univ(mUniverse->mtx_control);
// sACN Framing Layer
frame_->setHeader(mUniverse->metadata_);
frame_->setData(dmp);

View File

@ -82,7 +82,7 @@ private:
std::chrono::milliseconds keep_alive_interval;
bool enable_;
mutable std::shared_mutex mtx_control_;
mutable std::shared_mutex mtx_control;
mutable std::mutex mtx_thread_;
std::condition_variable_any request_;
std::thread worker_;