1
0
Fork 0

callback tokens to signal when the caller has gone away

This commit is contained in:
Kevin Matz 2021-09-11 09:44:16 -04:00
parent abc7b35b70
commit 8e50d9292d
8 changed files with 132 additions and 69 deletions

View File

@ -12,12 +12,12 @@ QSacnUniverse::QSacnUniverse(QObject *parent, std::shared_ptr<sACN::Universe> un
if (!universe)
return;
universe_->onData([this](DMX::Universe*) {
data_change_token = universe_->onData([this](DMX::Universe*) {
emit changed();
});
if (!universe->isEditable())
universe_->onSourceListChange([this]() {
list_change_token = universe_->onSourceListChange([this]() {
syncSources();
});
};

View File

@ -46,5 +46,8 @@ signals:
private:
std::shared_ptr<sACN::Universe> universe_;
QMap<sACN::DATA::data_header, QSacnUniverse*> sources_;
std::shared_ptr<void> data_change_token;
std::shared_ptr<void> list_change_token;
};
Q_DECLARE_METATYPE(QSacnUniverse*)

View File

@ -74,9 +74,9 @@ uint8_t Universe::slot(const uint16_t address)
*/
double Universe::rxRate()
{
rx_timeout_();
// updates per second
return rx_times_.size() / (rx_timeout_period_ / 1000.0);
rx_timeout_();
// updates per second
return rx_times_.size() / (rx_timeout_period_ / 1000.0);
}
@ -130,8 +130,7 @@ void Universe::setData(const std::vector<uint8_t>& data)
rx_timeout_(true);
// notify callbacks
for (const auto &cb : callbacks_)
cb(this);
doDataCallbacks();
}
@ -143,7 +142,8 @@ void Universe::setData(const std::vector<uint8_t>& data)
*/
void Universe::altSCdata(const std::vector<uint8_t> & data)
{
switch (data.front()) {
switch (data.front())
{
case E111_ASC_TEXT_ASCII:
break;
case E111_ASC_TEST:
@ -154,7 +154,7 @@ void Universe::altSCdata(const std::vector<uint8_t> & data)
break;
case E111_ASC_SIP:
break;
}
}
}
@ -165,14 +165,14 @@ void Universe::altSCdata(const std::vector<uint8_t> & data)
*/
void Universe::setValue(const uint16_t address, const uint8_t value)
{
if (address == 0)
return;
if (address > null_start_data.size() - 1)
return;
if (address == 0)
return;
if (address > null_start_data.size() - 1)
return;
null_start_mutex.lock();
null_start_data[address] = value;
null_start_mutex.unlock();
null_start_mutex.lock();
null_start_data[address] = value;
null_start_mutex.unlock();
}
@ -185,24 +185,50 @@ void Universe::setValue(const uint16_t address, const uint8_t value)
void Universe::setValue(const uint16_t start, const uint16_t footprint,
const uint8_t* data)
{
if (start == 0)
return;
if (start + footprint > null_start_data.size() - 1)
return;
if (start == 0)
return;
if (start + footprint > null_start_data.size() - 1)
return;
null_start_mutex.lock();
std::copy(data, data + footprint, null_start_data.begin() + start);
null_start_mutex.unlock();
null_start_mutex.lock();
std::copy(data, data + footprint, null_start_data.begin() + start);
null_start_mutex.unlock();
}
/**
* @brief Universe::onData
* @param callback
* @param cb
* @return
*/
void Universe::onData(const DataHandler callback)
std::shared_ptr<void> Universe::onData(const DataHandler cb)
{
callbacks_.push_back(callback);
// wrap the callback with a shared pointer
auto sp = std::make_shared<DataHandler>(std::move(cb));
// add callback to list (as a weak pointer)
callbacks_.push_back(sp);
// return token that caller must keep throughout it's scope
return sp;
}
/**
* @brief Universe::notifyCallers
*/
void Universe::doDataCallbacks()
{
for (auto it = callbacks_.begin(); it != callbacks_.end();)
{
if (auto sp = it->lock())
{ // if the caller is still holding the token
(*sp)(this);
++it;
}
else
{ // or remove the callback
it = callbacks_.erase(it);
}
}
}

View File

@ -28,6 +28,7 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <vector>
@ -35,8 +36,8 @@
namespace DMX {
class Universe; // forward declare the Univserse class
using DataHandler = std::function<void(Universe *)>;
using DimmerData = std::array<uint8_t, 513>;
using DataHandler = std::function<void(Universe *)>;
/**
* @brief The Universe class
@ -59,7 +60,7 @@ class Universe {
const uint8_t* data);
void setData (const std::vector<uint8_t> &);
void onData (const DataHandler);
std::shared_ptr<void> onData (const DataHandler);
virtual void altSCdata(const std::vector<uint8_t> &);
@ -67,7 +68,8 @@ class Universe {
DimmerData null_start_data; //!< NULL Start Code data
mutable std::mutex null_start_mutex; //!< memory protect Null Start data
std::vector<DataHandler> callbacks_; //!< list of calback functions
std::vector<std::weak_ptr<DataHandler>> callbacks_; //!< list of calback functions
void doDataCallbacks(); //!< execute valid callbacks
private:
std::queue<std::chrono::time_point<std::chrono::system_clock>> rx_times_;

View File

@ -102,10 +102,10 @@ bool ArbitratingUniverse::hasSourceUniverse(const DATA::data_header& src) const
std::shared_ptr<Universe> ArbitratingUniverse::addNewSource(const DATA::data_header &src)
{
sources_.emplace(src, std::make_shared<Universe>());
sources_.at(src)->onData(std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier,
this, std::placeholders::_1));
for (const auto & cb : cb_sourceListChange)
cb();
auto token = sources_.at(src)->onData(std::bind(&sACN::ArbitratingUniverse::dataChangedNotifier,
this, std::placeholders::_1));
source_data_tokens.push_back(token);
doListChangeCallbacks();
return sources_.at(src);
}
@ -120,9 +120,10 @@ void ArbitratingUniverse::deleteSourceUniverse(const DATA::data_header& src)
if (!hasSourceUniverse(src))
return;
/// \todo also erase token for data change callbacks
sources_.erase(src);
for (const auto & cb : cb_sourceListChange)
cb();
doListChangeCallbacks();
}
@ -137,18 +138,43 @@ void ArbitratingUniverse::dataChangedNotifier(DMX::Universe* dmx)
if (!universe)
return;
if (sacn->provenance() == universe->provenance())
for (const auto &cb : callbacks_)
cb(this);
doDataCallbacks();
}
/**
* @brief ArbitratingUniverse::onSourceListChange
* @param cb
* @return
*/
void ArbitratingUniverse::onSourceListChange(std::function<void()> cb)
std::shared_ptr<void> ArbitratingUniverse::onSourceListChange(std::function<void()> cb)
{
cb_sourceListChange.push_back(cb);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void()>>(std::move(cb));
// add callback to list (as a weak pointer)
cb_sourceListChange.push_back(sp);
// return token that caller must keep throughout it's scope
return sp;
}
/**
* @brief ArbitratingUniverse::doListChangeCallbacks
*/
void ArbitratingUniverse::doListChangeCallbacks()
{
for (auto it = cb_sourceListChange.begin(); it != cb_sourceListChange.end();)
{
if (auto sp = it->lock())
{ // if the caller is still holding the token
(*sp)();
++it;
}
else
{ // or remove the callback
it = cb_sourceListChange.erase(it);
}
}
}
@ -264,40 +290,39 @@ std::shared_ptr<Universe> ArbitratingUniverse::dominant_()
std::shared_ptr<Universe> ret = nullptr;
for(auto it = sources_.begin(); it != sources_.end(); )
{
auto universe = it->second;
{
auto universe = it->second;
if (!ret && HoldLastLook)
{ // anything is better than nothing
++it;
ret = universe;
continue;
}
if (!ret && HoldLastLook)
{ // anything is better than nothing
++it;
ret = universe;
continue;
}
auto age = universe->rxAge();
if (age > E131_NETWORK_DATA_LOSS_TIMEOUT)
{ // clean up zombie universes
it = sources_.erase(it);
sourceListChanged = true;
continue;
}
{ // clean up zombie universes
it = sources_.erase(it);
sourceListChanged = true;
continue;
}
if (age > DMX::E111_DATA_LOSS_TIMEOUT)
{
++it;
continue; // stale universes cannot be dominant
}
{
++it;
continue; // stale universes cannot be dominant
}
if (!ret || universe->provenance() > ret->provenance())
if (universe->provenance()->priority > ret->provenance()->priority)
ret = universe; // rank by provenance
++it;
}
}
if (sourceListChanged)
for (const auto & cb : cb_sourceListChange)
cb();
doListChangeCallbacks();
return ret;
}

View File

@ -85,19 +85,24 @@ public:
const std::vector<DATA::data_header> sources() const override;
std::shared_ptr<Universe> sourceUniverse(const DATA::data_header&) override;
std::shared_ptr<Universe> addNewSource(const DATA::data_header&) override;
void onSourceListChange(std::function<void()>) override;
std::shared_ptr<void> onSourceListChange(std::function<void()>) override;
// DMX::Universe Overrides:
uint8_t slot(const uint16_t) override;
double rxRate() override;
private:
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::function<void()>> cb_sourceListChange; //!< list of calback functions
bool HoldLastLook;
protected:
void doListChangeCallbacks();
std::shared_ptr<Universe> dominant_();
bool hasSourceUniverse(const DATA::data_header&) const;
private:
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::shared_ptr<void>> source_data_tokens; //!< source universe data change tokens
std::vector<std::weak_ptr<std::function<void()>>> cb_sourceListChange; //!< list of calback functions
bool HoldLastLook;
std::shared_ptr<Universe> dominant_();
bool hasSourceUniverse(const DATA::data_header&) const;
};
} // SACN namespace

View File

@ -189,9 +189,11 @@ const std::vector<DATA::data_header> Universe::sources() const
/**
* @brief Add a callback to be notified when the source list changes.
* @return
*/
void Universe::onSourceListChange(std::function<void()>)
std::shared_ptr<void> Universe::onSourceListChange(std::function<void()>)
{
return nullptr;
};

View File

@ -67,7 +67,7 @@ public:
virtual const std::vector<DATA::data_header> sources() const;
virtual std::shared_ptr<Universe> sourceUniverse(const DATA::data_header&);
virtual std::shared_ptr<Universe> addNewSource(const DATA::data_header&);
virtual void onSourceListChange(std::function<void()>);
virtual std::shared_ptr<void> onSourceListChange(std::function<void()>);
// DMX::Universe overrides
void setValue (const uint16_t address, const uint8_t value) override;