diff --git a/example/sACN Explorer/multiversewindow.cpp b/example/sACN Explorer/multiversewindow.cpp index 380c395..6ecd6bb 100644 --- a/example/sACN Explorer/multiversewindow.cpp +++ b/example/sACN Explorer/multiversewindow.cpp @@ -130,9 +130,9 @@ MultiverseWindow::MultiverseWindow(QWidget *parent, QSacnNode *node) auto data = selected.data(Qt::EditRole); auto univ = data.value(); if (state) - univ->setMergeMode(sACN::Universe::MERGE_HTP); + univ->setMergeMode(sACN::UniverseArbitrator::MERGE_HTP); else - univ->setMergeMode(sACN::Universe::MERGE_LTP); + univ->setMergeMode(sACN::UniverseArbitrator::MERGE_LTP); }); connect(ui->actionAbout, &QAction::triggered, this, [this](){ @@ -208,10 +208,10 @@ void MultiverseWindow::selectionChanged(const QModelIndex ¤t, ui->actionInspect->setEnabled(true); ui->actionUniverseHoldLastLook->setChecked(univ->getHoldLastLook()); switch (univ->getMergeMode()){ - case sACN::Universe::MERGE_HTP: + case sACN::UniverseArbitrator::MERGE_HTP: ui->actionMergeModeHTP->setChecked(true); break; - case sACN::Universe::MERGE_LTP: + case sACN::UniverseArbitrator::MERGE_LTP: ui->actionMergeModeLTP->setChecked(true); break; default: diff --git a/platform/qt/qsacnuniverse.cpp b/platform/qt/qsacnuniverse.cpp index 3eec3d2..5ee296e 100644 --- a/platform/qt/qsacnuniverse.cpp +++ b/platform/qt/qsacnuniverse.cpp @@ -16,8 +16,8 @@ QSacnUniverse::QSacnUniverse(QObject *parent, std::shared_ptr un data_change_token = universe_->onDataChange([this](DMX::Universe*){emit dataChanged();}); status_change_token = universe_->onStatusChange([this](DMX::Universe*){emit statusChanged();}); - if (!universe->isEditable()) - list_change_token = universe_->onSourceListChange([this](DMX::Universe*){syncSources();}); + if (universe->arbitrator()) + list_change_token = universe_->arbitrator()->onSourceListChange([this](DMX::Universe*){syncSources();}); // set the status watchdog to update the status if the universe // isn't showing frequent activity @@ -176,7 +176,7 @@ uint16_t QSacnUniverse::activeSlots() const */ bool QSacnUniverse::getHoldLastLook() const { - return universe_ ? universe_->getHoldLastLook() : false; + return universe_ && universe_->arbitrator() ? universe_->arbitrator()->getHoldLastLook() : false; } @@ -184,9 +184,9 @@ bool QSacnUniverse::getHoldLastLook() const * @brief QSacnUniverse::getMergeMode * @return */ -sACN::Universe::MergeMode QSacnUniverse::getMergeMode() const +sACN::UniverseArbitrator::MergeMode QSacnUniverse::getMergeMode() const { - return universe_ ? universe_->getMergeMode() : sACN::Universe::MERGE_OTHER; + return universe_ && universe_->arbitrator() ? universe_->arbitrator()->getMergeMode() : sACN::UniverseArbitrator::MERGE_OTHER; } @@ -196,11 +196,8 @@ sACN::Universe::MergeMode QSacnUniverse::getMergeMode() const */ void QSacnUniverse::setHoldLastLook(bool state) { - if (isEditable()) - return; - - if (universe_) - universe_->setHoldLastLook(state); + if (isEditable() && universe_ && universe_->arbitrator()) + universe_->arbitrator()->setHoldLastLook(state); } @@ -208,27 +205,10 @@ void QSacnUniverse::setHoldLastLook(bool state) * @brief QSacnUniverse::setMergeMode * @param mode */ -void QSacnUniverse::setMergeMode(const sACN::Universe::MergeMode mode) +void QSacnUniverse::setMergeMode(const sACN::UniverseArbitrator::MergeMode mode) { - if (isEditable()) - return; - - if (universe_) - universe_->setMergeMode(mode); -} - - -/** - * @brief QSacnUniverse::setOptions - * @param o - */ -void QSacnUniverse::setOptions(sACN::DATA::data_options o) -{ - if (!isEditable()) - return; - - universe_->metadata()->options = o; - emit dataChanged(); + if (isEditable() && universe_ && universe_->arbitrator()) + universe_->arbitrator()->setMergeMode(mode); } @@ -304,7 +284,7 @@ void QSacnUniverse::syncSources() newSources.insert(metadata, sources_.take(metadata)); else { // make a new QSacnUniverse to add to the new list - auto universe = universe_->sourceUniverse(metadata); + auto universe = universe_->arbitrator()->sourceUniverse(metadata); newSources.insert(metadata, new QSacnUniverse(this, universe)); } } diff --git a/platform/qt/qsacnuniverse.h b/platform/qt/qsacnuniverse.h index e9867c3..3b2f35d 100644 --- a/platform/qt/qsacnuniverse.h +++ b/platform/qt/qsacnuniverse.h @@ -35,12 +35,11 @@ public: const QList sources() const; QSacnUniverse* sourceUniverse(const sACN::DATA::data_header&) const; bool getHoldLastLook() const; - sACN::Universe::MergeMode getMergeMode() const; + sACN::UniverseArbitrator::MergeMode getMergeMode() const; public slots: void setHoldLastLook(bool state); - void setMergeMode(const sACN::Universe::MergeMode); - void setOptions(sACN::DATA::data_options o); + void setMergeMode(const sACN::UniverseArbitrator::MergeMode); void setPriority(uint8_t p); void setSyncAddress(uint16_t a); void setValue (const uint16_t addr, const uint8_t level); diff --git a/protocol/esta/sacn/receiver.cpp b/protocol/esta/sacn/receiver.cpp index 591072e..134ca4c 100644 --- a/protocol/esta/sacn/receiver.cpp +++ b/protocol/esta/sacn/receiver.cpp @@ -25,6 +25,8 @@ #include "config.h" #include "pdu.h" #include "receiver.h" +#include "sacn.h" +#include "universearbitrator.h" namespace sACN { @@ -64,8 +66,11 @@ void Receiver::subscribe(const uint16_t num) { if (num == sACN::E131_DISCOVERY_UNIVERSE || universes_.count(num)) return; - universes_.emplace(num, std::make_shared()); - universes_.at(num)->expectedUniverse = num; + const auto [it, ok] = universes_.emplace(num, std::make_shared(nullptr, this)); + if (!ok) + return; + auto univ = it->second; + univ->metadata()->universe = num; } @@ -221,9 +226,9 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) if (!universes_.count(metadata->universe)) return; // not subscribed to this universe - auto universe = universes_.at(metadata->universe)->sourceUniverse(*metadata); + auto universe = universes_.at(metadata->universe)->arbitrator()->sourceUniverse(*metadata); if (!universe) - universe = universes_.at(metadata->universe)->addNewSource(*metadata); + universe = universes_.at(metadata->universe)->arbitrator()->addNewSource(*metadata); /// > \cite sACN 6.2.6 E1.31 Data Packet: Options /// > @@ -245,7 +250,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) { /// Remove the terminated universe as sources. /// The universe may keep itself, in a terminated state, for hold-last-look. - universes_[metadata->universe]->deleteSourceUniverse(*metadata); + universes_.at(metadata->universe)->arbitrator()->deleteSourceUniverse(*metadata); /// > Any property values in an E1.31 Data Packet containing this bit /// > shall be ignored. return; @@ -323,7 +328,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) universe->DmpReceiver(block); // do ArbitratingUniverse maintence tasks - universes_.at(metadata->universe)->refresh(); + universes_.at(metadata->universe)->arbitrator()->refresh(); } diff --git a/protocol/esta/sacn/receiver.h b/protocol/esta/sacn/receiver.h index bbcf36c..cc61a00 100644 --- a/protocol/esta/sacn/receiver.h +++ b/protocol/esta/sacn/receiver.h @@ -26,7 +26,7 @@ #include "rlp/component.h" #include "data.h" #include "extended.h" -#include "universearbitrator.h" +#include "universe.h" #include #include @@ -96,7 +96,7 @@ protected: void discoveryListHanlder(ACN::PDU::Message); private: - std::unordered_map> universes_; + std::unordered_map> universes_; std::vector> discoveryCallbacks_; bool discovery_enabled_; }; diff --git a/protocol/esta/sacn/universe.cpp b/protocol/esta/sacn/universe.cpp index 6284f8f..398fcd5 100644 --- a/protocol/esta/sacn/universe.cpp +++ b/protocol/esta/sacn/universe.cpp @@ -30,12 +30,13 @@ namespace sACN { /** * @brief Universe::Universe * @param src + * @param rsv */ -Universe::Universe(Source* src) +Universe::Universe(Source* src, Receiver* rsv) : DMX::Universe(E131_NETWORK_DATA_LOSS_TIMEOUT) , ACN::DMP::Component(UUID::uuid(), "OpenLCP sACN Universe") + , arbitrator_(rsv ? new UniverseArbitrator(this) : nullptr) , sender_(src ? new UniverseSender(src, this) : nullptr) - , mergeMode_(MERGE_HTP) , metadata_(std::make_shared()) , sync_data_(nullptr) , active_data_slots_(1) // start code @@ -50,22 +51,134 @@ Universe::Universe(Source* src) */ Universe::~Universe() { - // shut down tx worker thread + delete arbitrator_; delete sender_; - - // delete sync data pointer delete sync_data_; } +/** + * @brief Universe::age + * @return + */ +long Universe::age() const +{ + if (arbitrator_) + return arbitrator_->age(); + + return DMX::Universe::age(); +} + + +/** + * @brief Universe::rxRate + * @return + */ +double Universe::rxRate() +{ + if (arbitrator_) + return arbitrator_->rxRate(); + + return DMX::Universe::rxRate(); +} + + +/** + * @brief Universe::status + * @return + */ +uint8_t Universe::status() const +{ + if (arbitrator_) + return arbitrator_->status(); + + return DMX::Universe::status(); +} + + +/** + * @brief Universe::slot + * @param address + * @return + */ +uint8_t Universe::slot(const uint16_t address) const +{ + if (arbitrator_) + return arbitrator_->slot(address); + + return DMX::Universe::slot(address); +} + + +/** + * @brief Universe::setStatus + * @param status + */ +void Universe::setStatus(uint8_t status) +{ + if (status == RX_TIMEOUT) + status = sACN_TERMINATED; + + DMX::Universe::setStatus(status); +} + + +/** + * @brief Universe::setValue + * @param start + * @param footprint + * @param data + */ +void Universe::setValue (const uint16_t start, const uint16_t footprint, + const uint8_t* data) +{ + if (!isEditable()) + return; + + if (arbitrator_) + return; + + // start and footprint valid? + if (start < 1 || start + footprint > null_start_data.size()) + return; + + // set active_data_slots to at least end of footprint + if (start + footprint > active_data_slots_) + { + std::unique_lock lk_ctl(mtx_control); + active_data_slots_ = start + footprint; + } + + // get a copy of the current values + uint8_t og[footprint]; + { + std::unique_lock lk_data(mtx_data); + std::copy_n(std::begin(null_start_data) + start, footprint, og); + } + + // data not changed! + if (memcmp(data, og, footprint) == 0) + return; + + // set the values + DMX::Universe::setValue(start, footprint, data); + + // request sACN message to be sent + sender_->flush(); +} + + /** * @brief Universe::metadata * @return */ std::shared_ptr Universe::metadata() const { - std::shared_lock lk_ctl(mtx_control); - return metadata_; + if (arbitrator_) + return arbitrator_->metadata(); + + std::shared_lock lk_ctl(mtx_control); + return metadata_; } @@ -75,8 +188,21 @@ std::shared_ptr Universe::metadata() const */ void Universe::setMetadata(std::shared_ptr metadata) { + if (arbitrator_) + return; + +// bool update = false; +// if (metadata_->source_name != metadata->source_name || +// metadata_->priority != metadata->priority) +// update = true; + + { std::unique_lock lk_ctl(mtx_control); metadata_ = metadata; + } + +// if (update) +// doStatusCallbacks(); } @@ -86,8 +212,11 @@ void Universe::setMetadata(std::shared_ptr metadata) */ bool Universe::isSyncronized() const { - std::shared_lock lk_ctl(mtx_control); - return !sync_data_; + if (arbitrator_) + return true; + + std::shared_lock lk_ctl(mtx_control); + return !sync_data_; }; @@ -97,6 +226,8 @@ bool Universe::isSyncronized() const */ void Universe::synchronize(uint8_t sequence_number) { + if (arbitrator_) + return; if (!sync_data_) return; int8_t dif; @@ -131,9 +262,9 @@ void Universe::synchronize(uint8_t sequence_number) */ void Universe::resetSynchronization() { - std::unique_lock lk_ctl(mtx_control); - delete sync_data_; - sync_data_ = nullptr; + std::unique_lock lk_ctl(mtx_control); + delete sync_data_; + sync_data_ = nullptr; } @@ -143,9 +274,12 @@ void Universe::resetSynchronization() */ void Universe::setSyncData(const std::vector & data) { - resetSynchronization(); - std::unique_lock lk_ctl(mtx_control); - sync_data_ = new std::vector(data); + if (arbitrator_) + return; + + resetSynchronization(); + std::unique_lock lk_ctl(mtx_control); + sync_data_ = new std::vector(data); } @@ -155,8 +289,8 @@ void Universe::setSyncData(const std::vector & data) */ bool Universe::isEditable() const { - std::shared_lock lk_ctl(mtx_control); - return sender_; + std::shared_lock lk_ctl(mtx_control); + return sender_; } @@ -166,8 +300,22 @@ bool Universe::isEditable() const */ uint16_t Universe::activeSlots() const { - std::shared_lock lk_ctl(mtx_control); - return active_data_slots_; + if (arbitrator_) + return arbitrator_->activeSlots(); + + std::shared_lock lk_ctl(mtx_control); + return active_data_slots_; +} + + +/** + * @brief Universe::hasSources + * @return + */ +bool Universe::hasSources() const +{ + std::shared_lock lk_ctl(mtx_control); + return arbitrator_; } @@ -177,119 +325,20 @@ uint16_t Universe::activeSlots() const */ const std::vector Universe::sources() const { - return std::vector(); + if (arbitrator_) + return arbitrator_->sources(); + + return std::vector(); } /** - * @brief sourceUniverse - * @param src + * @brief Universe::arbitrator * @return */ -std::shared_ptr Universe::sourceUniverse(const DATA::data_header& src) +UniverseArbitrator* Universe::arbitrator() const { - std::shared_lock lk_ctl(mtx_control); - if (src == *metadata_) - return shared_from_this(); - - return nullptr; -} - - -/** - * @brief Add a callback to be notified when the source list changes. - * @param callback - * @return - */ -std::shared_ptr Universe::onSourceListChange(const std::function callback) -{ - (void)callback; - return nullptr; -}; - - -/** - * @brief Universe::setHoldLastLook - * @param state - */ -void Universe::setHoldLastLook(const bool state) -{ - (void)state; -} - - -/** - * @brief Universe::getHoldLastLook - * @return - */ -bool Universe::getHoldLastLook() const -{ - return false; -} - - -/** - * @brief Universe::setMergeMode - * @param mode - */ -void Universe::setMergeMode(const MergeMode mode) -{ - mergeMode_ = mode; -} - - -/** - * @brief Universe::getMergeMode - * @return - */ -Universe::MergeMode Universe::getMergeMode() const -{ - return mergeMode_; -} - - -void Universe::setValue (const uint16_t start, const uint16_t footprint, - const uint8_t* data) -{ - if (!isEditable()) - return; - - // start and footprint valid? - if (start < 1 || start + footprint > null_start_data.size()) - return; - - // set active_data_slots to at least end of footprint - if (start + footprint > active_data_slots_) - { - std::unique_lock lk_ctl(mtx_control); - active_data_slots_ = start + footprint; - } - - // get a copy of the current values - uint8_t og[footprint]; - { - std::unique_lock lk_data(mtx_data); - std::copy_n(std::begin(null_start_data) + start, footprint, og); - } - - // data not changed! - if (memcmp(data, og, footprint) == 0) - return; - - // set the values - DMX::Universe::setValue(start, footprint, data); - - // request sACN message to be sent - sender_->flush(); -} - - -void Universe::setStatus(uint8_t status) -{ - if (status == RX_TIMEOUT) - status = sACN_TERMINATED; - - DMX::Universe::setStatus(status); + return arbitrator_; } @@ -299,7 +348,7 @@ void Universe::setStatus(uint8_t status) */ void Universe::rxDmpGetProperty(ACN::PDU::Message message) { - (void)message; + (void)message; } @@ -309,6 +358,9 @@ void Universe::rxDmpGetProperty(ACN::PDU::Message message) */ void Universe::rxDmpSetProperty(ACN::PDU::Message message) { + if (arbitrator_) + return; + // only act on the first property pair in the data if (!message->data()) return; diff --git a/protocol/esta/sacn/universe.h b/protocol/esta/sacn/universe.h index 2e2c3da..a9f369a 100644 --- a/protocol/esta/sacn/universe.h +++ b/protocol/esta/sacn/universe.h @@ -27,6 +27,7 @@ #include "dmp/component.h" #include "../dmx/universe.h" #include "sacn.h" +#include "universearbitrator.h" #include "universesender.h" #include @@ -34,6 +35,10 @@ namespace sACN { +class Receiver; +class Source; + + /** * @brief \cite sACN 3.2 Universe * @@ -46,47 +51,42 @@ class Universe , public std::enable_shared_from_this { public: - explicit Universe(Source* = nullptr); + explicit Universe(Source* = nullptr, Receiver* = nullptr); virtual ~Universe(); + /// Merging and Arbitration capabilities of the unverse + /// are implimented in the helper class sACN::UniverseSender + friend class UniverseArbitrator; /// The Tx capabilities of the universe are impimented /// in the helper class sACN::UniverseSender. friend class UniverseSender; + // DMX::Universe overrides + virtual long age() const override; + virtual double rxRate() override; + virtual uint8_t status() const override; + virtual uint8_t slot(const uint16_t) const override; + + virtual void setStatus(uint8_t) override; + virtual void setValue (const uint16_t start, const uint16_t footprint, + const uint8_t* data) override; + + // metadata virtual std::shared_ptr metadata() const; virtual void setMetadata(std::shared_ptr); + // synchronization virtual bool isSyncronized() const; virtual void synchronize(uint8_t = 0); virtual void resetSynchronization(); virtual void setSyncData(const std::vector &); + virtual bool isEditable() const; - virtual uint16_t activeSlots() const; - - // api for poly-source universes - virtual bool hasSources() const {return false;} //!< is not a poly-source universe @return + virtual bool hasSources() const; virtual const std::vector sources() const; - virtual std::shared_ptr sourceUniverse(const DATA::data_header&); - virtual std::shared_ptr onSourceListChange(const std::function); - virtual void setHoldLastLook(const bool); - virtual bool getHoldLastLook() const; - /** - * @brief The MergeMode enum - */ - enum MergeMode { - MERGE_OTHER, - MERGE_HTP, - MERGE_LTP - }; - virtual void setMergeMode(const MergeMode); - virtual MergeMode getMergeMode() const; - - // DMX::Universe overrides - virtual void setValue (const uint16_t start, const uint16_t footprint, - const uint8_t* data) override; - virtual void setStatus(uint8_t) override; + UniverseArbitrator* arbitrator() const; /** * @brief destination IP address @@ -113,8 +113,8 @@ protected: virtual void rxDmpSubscribe(ACN::PDU::Message) override; private: + UniverseArbitrator * arbitrator_; UniverseSender * sender_; - MergeMode mergeMode_; std::shared_ptr metadata_; std::vector * sync_data_; diff --git a/protocol/esta/sacn/universearbitrator.cpp b/protocol/esta/sacn/universearbitrator.cpp index 8c94085..cd62e4e 100644 --- a/protocol/esta/sacn/universearbitrator.cpp +++ b/protocol/esta/sacn/universearbitrator.cpp @@ -22,6 +22,7 @@ SOFTWARE. */ +#include "universe.h" #include "universearbitrator.h" #include "universemerger.h" #include @@ -31,10 +32,10 @@ namespace sACN { /** * @brief UniverseArbitrator::UniverseArbitrator + * @param universe */ -UniverseArbitrator::UniverseArbitrator() - : sACN::Universe() - , expectedUniverse(0) +UniverseArbitrator::UniverseArbitrator(sACN::Universe * universe) + : m_universe(universe) , hold_last_look_(true) , m_dominant(std::weak_ptr()) { @@ -52,161 +53,15 @@ void UniverseArbitrator::refresh() /** - * @brief UniverseArbitrator::setHoldLastLook - * @param state - */ -void UniverseArbitrator::setHoldLastLook(const bool state) -{ - std::unique_lock lk_ctl(mtx_control); - hold_last_look_ = state; -} - - -/** - * @brief UniverseArbitrator::getHoldLastLook + * @brief UniverseArbitrator::age * @return */ -bool UniverseArbitrator::getHoldLastLook() const +long UniverseArbitrator::age() const { - std::shared_lock lk_ctl(mtx_control); - return hold_last_look_; -} - - -/** - * @brief UniverseArbitrator::sources - * @return - */ -const std::vector UniverseArbitrator::sources() const -{ - std::shared_lock lk_ctl(mtx_control); - std::vector keys; - for (const auto& [key, _] : sources_) - keys.push_back(key); - return keys; -} - - -/** - * @brief UniverseArbitrator::sourceUniverse - * @param src - * @return - */ -std::shared_ptr UniverseArbitrator::sourceUniverse(const DATA::data_header &src) -{ - std::shared_lock lk_ctl(mtx_control); - if (!hasSourceUniverse(src)) - return nullptr; - - return sources_.at(src); -} - - -/** - * @brief UniverseArbitrator::hasSourceUniverse - * @param src - * @return - */ -bool UniverseArbitrator::hasSourceUniverse(const DATA::data_header& src) const -{ - std::shared_lock lk_ctl(mtx_control); - return (sources_.count(src)); -} - - -/** - * @brief UniverseArbitrator::addNewSource - * @param src - * @return - */ -std::shared_ptr UniverseArbitrator::addNewSource(const DATA::data_header &src) -{ - std::shared_ptr univ; - { - std::unique_lock lk_ctl(mtx_control); - auto [itr, ok] = sources_.try_emplace(src, std::make_shared()); - if (!ok) - return nullptr; - univ = itr->second; - } - doListChangeCallbacks(); - return univ; -} - - -/** - * @brief UniverseArbitrator::deleteSourceUniverse - * @param src - */ -void UniverseArbitrator::deleteSourceUniverse(const DATA::data_header& src) -{ - if (!hasSourceUniverse(src)) - return; - - if (sources_.size() > 1 || !hold_last_look_) - { - std::unique_lock lk_ctl(mtx_control); - sources_.erase(src); - } - else - { - std::shared_lock lk_ctl(mtx_control); - /// Set the universe's status to TERMINATED. - sources_.at(src)->setStatus(Universe::sACN_TERMINATED); - /// Resetting the sequencing on terminated universes results in faster reaquisition - /// from the same source without waiting for the sequence to realign. - sources_.at(src)->metadata()->sequence_number = 0; - } - find_dominant_(); - doListChangeCallbacks(); -} - - -/** - * @brief UniverseArbitrator::onSourceListChange - * @param cb - * @return - */ -std::shared_ptr UniverseArbitrator::onSourceListChange(const std::function cb) -{ - std::unique_lock lk_ctl(mtx_control); - // wrap the callback with a shared pointer - auto sp = std::make_shared>(std::move(cb)); - // add callback to list (as a weak pointer) - cb_sourceListChange.push_back(sp); - // return token that caller must keep throughout it's scope - return sp; -} - - -/** - * @brief UniverseArbitrator::isSyncronized - * @return - */ -bool UniverseArbitrator::isSyncronized() const -{ - auto universe = m_dominant.lock(); - if (!universe) - return false; - return universe->isSyncronized(); -} - - -/** - * @brief UniverseArbitrator::metadata - * @return - */ -std::shared_ptr UniverseArbitrator::metadata() const -{ - auto universe = m_dominant.lock(); - if (universe) - return universe->metadata(); - - std::shared_lock lk_ctl(mtx_control); - auto metadata = std::make_shared(); - metadata->universe = expectedUniverse; - metadata->priority = 255; // invalid - return metadata; + auto universe = m_dominant.lock(); + if (!universe) + return 0; + return universe->age(); } @@ -216,45 +71,10 @@ std::shared_ptr UniverseArbitrator::metadata() const */ uint8_t UniverseArbitrator::status() const { - auto universe = m_dominant.lock(); - if (!universe) - return Universe::DMX_NULL; - return universe->status(); -} - - -/** - * @brief UniverseArbitrator::synchronize - * @param sequence_number - */ -void UniverseArbitrator::synchronize(uint8_t sequence_number) -{ - std::shared_lock lk_ctl(mtx_control); - for ( auto& [_, uni] : sources_) - uni->synchronize(sequence_number); -} - - -/** - * @brief UniverseArbitrator::isEditable - * @return - */ -bool UniverseArbitrator::isEditable() const -{ - return false; -} - - -/** - * @brief UniverseArbitrator::activeSlots - * @return - */ -uint16_t UniverseArbitrator::activeSlots() const -{ - auto universe = m_dominant.lock(); - if (!universe) - return 0; - return universe->activeSlots(); + auto universe = m_dominant.lock(); + if (!universe) + return Universe::DMX_NULL; + return universe->status(); } @@ -265,10 +85,10 @@ uint16_t UniverseArbitrator::activeSlots() const */ uint8_t UniverseArbitrator::slot(const uint16_t address) const { - auto universe = m_dominant.lock(); - if (!universe) - return 0; - return universe->slot(address); + auto universe = m_dominant.lock(); + if (!universe) + return 0; + return universe->slot(address); } @@ -278,24 +98,210 @@ uint8_t UniverseArbitrator::slot(const uint16_t address) const */ double UniverseArbitrator::rxRate() { - refresh(); - auto universe = m_dominant.lock(); - if (!universe) - return 0.0; - return universe->rxRate(); + refresh(); + auto universe = m_dominant.lock(); + if (!universe) + return 0.0; + return universe->rxRate(); } /** - * @brief UniverseArbitrator::rxDmpSetProperty - * @param message + * @brief UniverseArbitrator::metadata + * @return */ -void UniverseArbitrator::rxDmpSetProperty(ACN::PDU::Message message) +std::shared_ptr UniverseArbitrator::metadata() const { - (void)message; + auto universe = m_dominant.lock(); + if (universe) + return universe->metadata(); + + m_universe->metadata_->priority = 255; // invalid + return m_universe->metadata_; } +/** + * @brief UniverseArbitrator::isEditable + * @return + */ +bool UniverseArbitrator::isEditable() const +{ + return false; +} + + +/** + * @brief UniverseArbitrator::activeSlots + * @return + */ +uint16_t UniverseArbitrator::activeSlots() const +{ + auto universe = m_dominant.lock(); + if (!universe) + return 0; + return universe->activeSlots(); +} + + +/** + * @brief UniverseArbitrator::sources + * @return + */ +const std::vector UniverseArbitrator::sources() const +{ + std::shared_lock lk_ctl(mtx_arbitrate); + std::vector keys; + for (const auto& [key, _] : sources_) + keys.push_back(key); + return keys; +} + + +/** + * @brief UniverseArbitrator::hasSourceUniverse + * @param src + * @return + */ +bool UniverseArbitrator::hasSourceUniverse(const DATA::data_header& src) const +{ + std::shared_lock lk_ctl(mtx_arbitrate); + return (sources_.count(src)); +} + + +/** + * @brief UniverseArbitrator::sourceUniverse + * @param src + * @return + */ +std::shared_ptr UniverseArbitrator::sourceUniverse(const DATA::data_header &src) +{ + std::shared_lock lk_ctl(mtx_arbitrate); + if (!hasSourceUniverse(src)) + return nullptr; + + return sources_.at(src); +} + + +/** + * @brief UniverseArbitrator::addNewSource + * @param src + * @return + */ +std::shared_ptr UniverseArbitrator::addNewSource(const DATA::data_header &src) +{ + std::shared_ptr univ; + { + std::unique_lock lk_ctl(mtx_arbitrate); + auto [itr, ok] = sources_.emplace(src, std::make_shared()); + univ = itr->second; + if (!ok) + return univ; + // source_tokens_.push_back(univ->onStatusChange([this](DMX::Universe*) { + // refresh(); + // })); + } + return univ; +} + + +/** + * @brief UniverseArbitrator::deleteSourceUniverse + * @param src + */ +void UniverseArbitrator::deleteSourceUniverse(const DATA::data_header& src) +{ + if (!hasSourceUniverse(src)) + return; + + if (sources_.size() > 1 || !hold_last_look_) + { + std::unique_lock lk_ctl(mtx_arbitrate); + sources_.erase(src); + } + else + { + std::shared_lock lk_ctl(mtx_arbitrate); + /// Set the universe's status to TERMINATED. + sources_.at(src)->setStatus(Universe::sACN_TERMINATED); + /// Resetting the sequencing on terminated universes results in faster reaquisition + /// from the same source without waiting for the sequence to realign. + sources_.at(src)->metadata()->sequence_number = 0; + } + find_dominant_(); + doListChangeCallbacks(); +} + + +/** + * @brief UniverseArbitrator::onSourceListChange + * @param cb + * @return + */ +std::shared_ptr UniverseArbitrator::onSourceListChange(const std::function cb) +{ + std::unique_lock lk_ctl(mtx_arbitrate); + // wrap the callback with a shared pointer + auto sp = std::make_shared>(std::move(cb)); + // add callback to list (as a weak pointer) + cb_sourceListChange.push_back(sp); + // return token that caller must keep throughout it's scope + return sp; +} + + +/** + * @brief UniverseArbitrator::setHoldLastLook + * @param state + */ +void UniverseArbitrator::setHoldLastLook(const bool state) +{ + std::unique_lock lk_ctl(mtx_arbitrate); + hold_last_look_ = state; +} + + +/** + * @brief UniverseArbitrator::getHoldLastLook + * @return + */ +bool UniverseArbitrator::getHoldLastLook() const +{ + std::shared_lock lk_ctl(mtx_arbitrate); + return hold_last_look_; +} + + +/** + * @brief UniverseArbitrator::setMergeMode + * @param mode + */ +void UniverseArbitrator::setMergeMode(const MergeMode mode) +{ + merge_mode_ = mode; +} + + +/** + * @brief UniverseArbitrator::getMergeMode + * @return + */ +UniverseArbitrator::MergeMode UniverseArbitrator::getMergeMode() const +{ + return merge_mode_; +} + + +/** + * @brief UniverseArbitrator::doSourceListChange + */ +void UniverseArbitrator::doListChangeCallbacks() +{ + m_universe->do_callbacks_(cb_sourceListChange); +} + /** * @brief UniverseArbitrator::find_dominant_ @@ -311,7 +317,7 @@ void UniverseArbitrator::find_dominant_() // cache the age of each universe std::unordered_map ages; { - std::shared_lock lk_ctl(mtx_control); + std::shared_lock lk_ctl(mtx_arbitrate); for (const auto& [header, universe] : sources_) ages.insert({header, universe->age()}); } @@ -320,7 +326,7 @@ void UniverseArbitrator::find_dominant_() std::map>> by_priority; { - std::shared_lock lk_ctl(mtx_control); + std::shared_lock lk_ctl(mtx_arbitrate); for (const auto& [header, universe] : sources_) { auto age = ages.at(header); @@ -339,16 +345,16 @@ void UniverseArbitrator::find_dominant_() if (universe != m_dominant.lock()) { { - std::unique_lock lk_ctl(mtx_control); + std::unique_lock lk_ctl(mtx_arbitrate); m_dominant = universe; } cb_tokens_.clear(); - cb_tokens_.push_back(universe->onDataChange([this](DMX::Universe*){doDataCallbacks();})); - cb_tokens_.push_back(universe->onStatusChange([this](DMX::Universe*){doStatusCallbacks();})); + cb_tokens_.push_back(universe->onDataChange([this](DMX::Universe*){m_universe->doDataCallbacks();})); + cb_tokens_.push_back(universe->onStatusChange([this](DMX::Universe*){m_universe->doStatusCallbacks();})); doListChangeCallbacks(); - doDataCallbacks(); - doStatusCallbacks(); + m_universe->doDataCallbacks(); + m_universe->doStatusCallbacks(); } return; } @@ -376,18 +382,18 @@ void UniverseArbitrator::find_dominant_() #endif merged->clear(); } - merged->Universe::metadata()->universe = expectedUniverse; + merged->Universe::metadata()->universe = m_universe->metadata_->universe; merged->Universe::metadata()->priority = by_priority.crbegin()->first; for (const auto & [_, universe] : by_priority.crbegin()->second) merged->addSource(universe); m_dominant = merged; cb_tokens_.clear(); - cb_tokens_.push_back(merged->onDataChange([this](DMX::Universe*){doDataCallbacks();})); - cb_tokens_.push_back(merged->onStatusChange([this](DMX::Universe*){doStatusCallbacks();})); + cb_tokens_.push_back(merged->onDataChange([this](DMX::Universe*){m_universe->doDataCallbacks();})); + cb_tokens_.push_back(merged->onStatusChange([this](DMX::Universe*){m_universe->doStatusCallbacks();})); - doDataCallbacks(); - doStatusCallbacks(); + m_universe->doDataCallbacks(); + m_universe->doStatusCallbacks(); doListChangeCallbacks(); } @@ -399,7 +405,7 @@ void UniverseArbitrator::purge_stale_sources_() { std::unordered_map ages; { - std::shared_lock lk_ctl(mtx_control); + std::shared_lock lk_ctl(mtx_arbitrate); for (const auto& [header, universe] : sources_) { universe->rxRate(); // DMX::Universe::rx_timeout for maintenance tasks @@ -410,7 +416,7 @@ void UniverseArbitrator::purge_stale_sources_() // order the member universes by age std::multimap by_age; { - std::shared_lock lk_ctl(mtx_control); + std::shared_lock lk_ctl(mtx_arbitrate); for (const auto& [header, _] : sources_) by_age.insert({ages.at(header), header}); } diff --git a/protocol/esta/sacn/universearbitrator.h b/protocol/esta/sacn/universearbitrator.h index b9f1faa..bbf66b3 100644 --- a/protocol/esta/sacn/universearbitrator.h +++ b/protocol/esta/sacn/universearbitrator.h @@ -23,13 +23,19 @@ */ #pragma once -#include "universe.h" - +#include "data.h" #include #include +#include + +namespace DMX { + class Universe; +} namespace sACN { +class Universe; + /** * @brief Priority based selection of multiple source universes * @@ -52,55 +58,64 @@ namespace sACN { * dominant source. */ class UniverseArbitrator - : public sACN::Universe { public: - explicit UniverseArbitrator(); + explicit UniverseArbitrator(sACN::Universe*); - uint16_t expectedUniverse; ///< Expected universe number void refresh(); - // Source universes: - void deleteSourceUniverse(const DATA::data_header&); + + // DMX::Universe API: + long age() const; + uint8_t status() const; + uint8_t slot(const uint16_t) const; + double rxRate(); + + // sACN::Universe API + std::shared_ptr metadata() const; + bool isEditable() const; + uint16_t activeSlots() const; + const std::vector sources() const; + + // Source universes + bool hasSourceUniverse(const DATA::data_header&) const; + std::shared_ptr sourceUniverse(const DATA::data_header&); std::shared_ptr addNewSource(const DATA::data_header&); + void deleteSourceUniverse(const DATA::data_header&); + std::shared_ptr onSourceListChange(const std::function); - // DMX::Universe overrides: - virtual uint8_t status() const override; - uint8_t slot(const uint16_t) const override; - double rxRate() override; + // hold-last-look + void setHoldLastLook(const bool); + bool getHoldLastLook() const; - // sACN::Universe Overrides - std::shared_ptr metadata() const override; - void setMetadata(std::shared_ptr) override {}; - bool isSyncronized() const override; - void synchronize(uint8_t = 0) override; - bool isEditable() const override; - uint16_t activeSlots() const override; - // api for poly-source universes - virtual bool hasSources() const override {return true;} //!< is a poly-source universe @return - const std::vector sources() const override; - std::shared_ptr sourceUniverse(const DATA::data_header&) override; - std::shared_ptr onSourceListChange(const std::function) override; - virtual void setHoldLastLook(const bool) override; - virtual bool getHoldLastLook() const override; + // merge-mode + /** + * @brief The MergeMode enum + */ + enum MergeMode { + MERGE_OTHER, + MERGE_HTP, + MERGE_LTP + }; + void setMergeMode(const MergeMode); + MergeMode getMergeMode() const; protected: - inline void doListChangeCallbacks() {do_callbacks_(cb_sourceListChange);} //!< List Change Callbacks - - // ACN::DMP::Component overrides - virtual void rxDmpSetProperty(ACN::PDU::Message) override; + inline void doListChangeCallbacks(); private: - std::unordered_map> sources_; - std::vector> cb_tokens_; //!< source universe callback tokens - std::vector>> cb_sourceListChange; //!< list of calback functions - - bool hold_last_look_; - - std::weak_ptr m_dominant; void find_dominant_(); void purge_stale_sources_(); - bool hasSourceUniverse(const DATA::data_header&) const; + + sACN::Universe * m_universe; + std::unordered_map> sources_; + std::vector> cb_tokens_; //!< dominant universe callback tokens + std::vector> source_tokens_; //!< source universe status changes + std::vector>> cb_sourceListChange; //!< list of calback functions + bool hold_last_look_; + MergeMode merge_mode_; + std::weak_ptr m_dominant; + mutable std::shared_mutex mtx_arbitrate; }; } // SACN namespace