1
0
Fork 0

universe arbitration is a helper not a subclass

This commit is contained in:
Kevin Matz 2022-12-12 13:39:10 -05:00
parent 64e6ee1c0a
commit 7dfb0a81d7
9 changed files with 512 additions and 455 deletions

View File

@ -130,9 +130,9 @@ MultiverseWindow::MultiverseWindow(QWidget *parent, QSacnNode *node)
auto data = selected.data(Qt::EditRole);
auto univ = data.value<QSacnUniverse*>();
if (state)
univ->setMergeMode(sACN::Universe::MERGE_HTP);
univ->setMergeMode(sACN::UniverseArbitrator::MERGE_HTP);
else
univ->setMergeMode(sACN::Universe::MERGE_LTP);
univ->setMergeMode(sACN::UniverseArbitrator::MERGE_LTP);
});
connect(ui->actionAbout, &QAction::triggered,
this, [this](){
@ -208,10 +208,10 @@ void MultiverseWindow::selectionChanged(const QModelIndex &current,
ui->actionInspect->setEnabled(true);
ui->actionUniverseHoldLastLook->setChecked(univ->getHoldLastLook());
switch (univ->getMergeMode()){
case sACN::Universe::MERGE_HTP:
case sACN::UniverseArbitrator::MERGE_HTP:
ui->actionMergeModeHTP->setChecked(true);
break;
case sACN::Universe::MERGE_LTP:
case sACN::UniverseArbitrator::MERGE_LTP:
ui->actionMergeModeLTP->setChecked(true);
break;
default:

View File

@ -16,8 +16,8 @@ QSacnUniverse::QSacnUniverse(QObject *parent, std::shared_ptr<sACN::Universe> un
data_change_token = universe_->onDataChange([this](DMX::Universe*){emit dataChanged();});
status_change_token = universe_->onStatusChange([this](DMX::Universe*){emit statusChanged();});
if (!universe->isEditable())
list_change_token = universe_->onSourceListChange([this](DMX::Universe*){syncSources();});
if (universe->arbitrator())
list_change_token = universe_->arbitrator()->onSourceListChange([this](DMX::Universe*){syncSources();});
// set the status watchdog to update the status if the universe
// isn't showing frequent activity
@ -176,7 +176,7 @@ uint16_t QSacnUniverse::activeSlots() const
*/
bool QSacnUniverse::getHoldLastLook() const
{
return universe_ ? universe_->getHoldLastLook() : false;
return universe_ && universe_->arbitrator() ? universe_->arbitrator()->getHoldLastLook() : false;
}
@ -184,9 +184,9 @@ bool QSacnUniverse::getHoldLastLook() const
* @brief QSacnUniverse::getMergeMode
* @return
*/
sACN::Universe::MergeMode QSacnUniverse::getMergeMode() const
sACN::UniverseArbitrator::MergeMode QSacnUniverse::getMergeMode() const
{
return universe_ ? universe_->getMergeMode() : sACN::Universe::MERGE_OTHER;
return universe_ && universe_->arbitrator() ? universe_->arbitrator()->getMergeMode() : sACN::UniverseArbitrator::MERGE_OTHER;
}
@ -196,11 +196,8 @@ sACN::Universe::MergeMode QSacnUniverse::getMergeMode() const
*/
void QSacnUniverse::setHoldLastLook(bool state)
{
if (isEditable())
return;
if (universe_)
universe_->setHoldLastLook(state);
if (isEditable() && universe_ && universe_->arbitrator())
universe_->arbitrator()->setHoldLastLook(state);
}
@ -208,27 +205,10 @@ void QSacnUniverse::setHoldLastLook(bool state)
* @brief QSacnUniverse::setMergeMode
* @param mode
*/
void QSacnUniverse::setMergeMode(const sACN::Universe::MergeMode mode)
void QSacnUniverse::setMergeMode(const sACN::UniverseArbitrator::MergeMode mode)
{
if (isEditable())
return;
if (universe_)
universe_->setMergeMode(mode);
}
/**
* @brief QSacnUniverse::setOptions
* @param o
*/
void QSacnUniverse::setOptions(sACN::DATA::data_options o)
{
if (!isEditable())
return;
universe_->metadata()->options = o;
emit dataChanged();
if (isEditable() && universe_ && universe_->arbitrator())
universe_->arbitrator()->setMergeMode(mode);
}
@ -304,7 +284,7 @@ void QSacnUniverse::syncSources()
newSources.insert(metadata, sources_.take(metadata));
else
{ // make a new QSacnUniverse to add to the new list
auto universe = universe_->sourceUniverse(metadata);
auto universe = universe_->arbitrator()->sourceUniverse(metadata);
newSources.insert(metadata, new QSacnUniverse(this, universe));
}
}

View File

@ -35,12 +35,11 @@ public:
const QList<sACN::DATA::data_header> sources() const;
QSacnUniverse* sourceUniverse(const sACN::DATA::data_header&) const;
bool getHoldLastLook() const;
sACN::Universe::MergeMode getMergeMode() const;
sACN::UniverseArbitrator::MergeMode getMergeMode() const;
public slots:
void setHoldLastLook(bool state);
void setMergeMode(const sACN::Universe::MergeMode);
void setOptions(sACN::DATA::data_options o);
void setMergeMode(const sACN::UniverseArbitrator::MergeMode);
void setPriority(uint8_t p);
void setSyncAddress(uint16_t a);
void setValue (const uint16_t addr, const uint8_t level);

View File

@ -25,6 +25,8 @@
#include "config.h"
#include "pdu.h"
#include "receiver.h"
#include "sacn.h"
#include "universearbitrator.h"
namespace sACN {
@ -64,8 +66,11 @@ void Receiver::subscribe(const uint16_t num)
{
if (num == sACN::E131_DISCOVERY_UNIVERSE || universes_.count(num))
return;
universes_.emplace(num, std::make_shared<UniverseArbitrator>());
universes_.at(num)->expectedUniverse = num;
const auto [it, ok] = universes_.emplace(num, std::make_shared<Universe>(nullptr, this));
if (!ok)
return;
auto univ = it->second;
univ->metadata()->universe = num;
}
@ -221,9 +226,9 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame)
if (!universes_.count(metadata->universe))
return; // not subscribed to this universe
auto universe = universes_.at(metadata->universe)->sourceUniverse(*metadata);
auto universe = universes_.at(metadata->universe)->arbitrator()->sourceUniverse(*metadata);
if (!universe)
universe = universes_.at(metadata->universe)->addNewSource(*metadata);
universe = universes_.at(metadata->universe)->arbitrator()->addNewSource(*metadata);
/// > \cite sACN 6.2.6 E1.31 Data Packet: Options
/// >
@ -245,7 +250,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame)
{
/// Remove the terminated universe as sources.
/// The universe may keep itself, in a terminated state, for hold-last-look.
universes_[metadata->universe]->deleteSourceUniverse(*metadata);
universes_.at(metadata->universe)->arbitrator()->deleteSourceUniverse(*metadata);
/// > Any property values in an E1.31 Data Packet containing this bit
/// > shall be ignored.
return;
@ -323,7 +328,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame)
universe->DmpReceiver(block);
// do ArbitratingUniverse maintence tasks
universes_.at(metadata->universe)->refresh();
universes_.at(metadata->universe)->arbitrator()->refresh();
}

View File

@ -26,7 +26,7 @@
#include "rlp/component.h"
#include "data.h"
#include "extended.h"
#include "universearbitrator.h"
#include "universe.h"
#include <unordered_map>
#include <set>
@ -96,7 +96,7 @@ protected:
void discoveryListHanlder(ACN::PDU::Message<EXTENDED::DISCOVERY::Pdu>);
private:
std::unordered_map<uint16_t, std::shared_ptr<UniverseArbitrator>> universes_;
std::unordered_map<uint16_t, std::shared_ptr<Universe>> universes_;
std::vector<std::function<void()>> discoveryCallbacks_;
bool discovery_enabled_;
};

View File

@ -30,12 +30,13 @@ namespace sACN {
/**
* @brief Universe::Universe
* @param src
* @param rsv
*/
Universe::Universe(Source* src)
Universe::Universe(Source* src, Receiver* rsv)
: DMX::Universe(E131_NETWORK_DATA_LOSS_TIMEOUT)
, ACN::DMP::Component(UUID::uuid(), "OpenLCP sACN Universe")
, arbitrator_(rsv ? new UniverseArbitrator(this) : nullptr)
, sender_(src ? new UniverseSender(src, this) : nullptr)
, mergeMode_(MERGE_HTP)
, metadata_(std::make_shared<DATA::data_header>())
, sync_data_(nullptr)
, active_data_slots_(1) // start code
@ -50,22 +51,134 @@ Universe::Universe(Source* src)
*/
Universe::~Universe()
{
// shut down tx worker thread
delete arbitrator_;
delete sender_;
// delete sync data pointer
delete sync_data_;
}
/**
* @brief Universe::age
* @return
*/
long Universe::age() const
{
if (arbitrator_)
return arbitrator_->age();
return DMX::Universe::age();
}
/**
* @brief Universe::rxRate
* @return
*/
double Universe::rxRate()
{
if (arbitrator_)
return arbitrator_->rxRate();
return DMX::Universe::rxRate();
}
/**
* @brief Universe::status
* @return
*/
uint8_t Universe::status() const
{
if (arbitrator_)
return arbitrator_->status();
return DMX::Universe::status();
}
/**
* @brief Universe::slot
* @param address
* @return
*/
uint8_t Universe::slot(const uint16_t address) const
{
if (arbitrator_)
return arbitrator_->slot(address);
return DMX::Universe::slot(address);
}
/**
* @brief Universe::setStatus
* @param status
*/
void Universe::setStatus(uint8_t status)
{
if (status == RX_TIMEOUT)
status = sACN_TERMINATED;
DMX::Universe::setStatus(status);
}
/**
* @brief Universe::setValue
* @param start
* @param footprint
* @param data
*/
void Universe::setValue (const uint16_t start, const uint16_t footprint,
const uint8_t* data)
{
if (!isEditable())
return;
if (arbitrator_)
return;
// start and footprint valid?
if (start < 1 || start + footprint > null_start_data.size())
return;
// set active_data_slots to at least end of footprint
if (start + footprint > active_data_slots_)
{
std::unique_lock lk_ctl(mtx_control);
active_data_slots_ = start + footprint;
}
// get a copy of the current values
uint8_t og[footprint];
{
std::unique_lock lk_data(mtx_data);
std::copy_n(std::begin(null_start_data) + start, footprint, og);
}
// data not changed!
if (memcmp(data, og, footprint) == 0)
return;
// set the values
DMX::Universe::setValue(start, footprint, data);
// request sACN message to be sent
sender_->flush();
}
/**
* @brief Universe::metadata
* @return
*/
std::shared_ptr<DATA::data_header> Universe::metadata() const
{
std::shared_lock lk_ctl(mtx_control);
return metadata_;
if (arbitrator_)
return arbitrator_->metadata();
std::shared_lock lk_ctl(mtx_control);
return metadata_;
}
@ -75,8 +188,21 @@ std::shared_ptr<DATA::data_header> Universe::metadata() const
*/
void Universe::setMetadata(std::shared_ptr<DATA::data_header> metadata)
{
if (arbitrator_)
return;
// bool update = false;
// if (metadata_->source_name != metadata->source_name ||
// metadata_->priority != metadata->priority)
// update = true;
{
std::unique_lock lk_ctl(mtx_control);
metadata_ = metadata;
}
// if (update)
// doStatusCallbacks();
}
@ -86,8 +212,11 @@ void Universe::setMetadata(std::shared_ptr<DATA::data_header> metadata)
*/
bool Universe::isSyncronized() const
{
std::shared_lock lk_ctl(mtx_control);
return !sync_data_;
if (arbitrator_)
return true;
std::shared_lock lk_ctl(mtx_control);
return !sync_data_;
};
@ -97,6 +226,8 @@ bool Universe::isSyncronized() const
*/
void Universe::synchronize(uint8_t sequence_number)
{
if (arbitrator_)
return;
if (!sync_data_)
return;
int8_t dif;
@ -131,9 +262,9 @@ void Universe::synchronize(uint8_t sequence_number)
*/
void Universe::resetSynchronization()
{
std::unique_lock lk_ctl(mtx_control);
delete sync_data_;
sync_data_ = nullptr;
std::unique_lock lk_ctl(mtx_control);
delete sync_data_;
sync_data_ = nullptr;
}
@ -143,9 +274,12 @@ void Universe::resetSynchronization()
*/
void Universe::setSyncData(const std::vector<uint8_t> & data)
{
resetSynchronization();
std::unique_lock lk_ctl(mtx_control);
sync_data_ = new std::vector<uint8_t>(data);
if (arbitrator_)
return;
resetSynchronization();
std::unique_lock lk_ctl(mtx_control);
sync_data_ = new std::vector<uint8_t>(data);
}
@ -155,8 +289,8 @@ void Universe::setSyncData(const std::vector<uint8_t> & data)
*/
bool Universe::isEditable() const
{
std::shared_lock lk_ctl(mtx_control);
return sender_;
std::shared_lock lk_ctl(mtx_control);
return sender_;
}
@ -166,8 +300,22 @@ bool Universe::isEditable() const
*/
uint16_t Universe::activeSlots() const
{
std::shared_lock lk_ctl(mtx_control);
return active_data_slots_;
if (arbitrator_)
return arbitrator_->activeSlots();
std::shared_lock lk_ctl(mtx_control);
return active_data_slots_;
}
/**
* @brief Universe::hasSources
* @return
*/
bool Universe::hasSources() const
{
std::shared_lock lk_ctl(mtx_control);
return arbitrator_;
}
@ -177,119 +325,20 @@ uint16_t Universe::activeSlots() const
*/
const std::vector<DATA::data_header> Universe::sources() const
{
return std::vector<DATA::data_header>();
if (arbitrator_)
return arbitrator_->sources();
return std::vector<DATA::data_header>();
}
/**
* @brief sourceUniverse
* @param src
* @brief Universe::arbitrator
* @return
*/
std::shared_ptr<Universe> Universe::sourceUniverse(const DATA::data_header& src)
UniverseArbitrator* Universe::arbitrator() const
{
std::shared_lock lk_ctl(mtx_control);
if (src == *metadata_)
return shared_from_this();
return nullptr;
}
/**
* @brief Add a callback to be notified when the source list changes.
* @param callback
* @return
*/
std::shared_ptr<void> Universe::onSourceListChange(const std::function<void(DMX::Universe*)> callback)
{
(void)callback;
return nullptr;
};
/**
* @brief Universe::setHoldLastLook
* @param state
*/
void Universe::setHoldLastLook(const bool state)
{
(void)state;
}
/**
* @brief Universe::getHoldLastLook
* @return
*/
bool Universe::getHoldLastLook() const
{
return false;
}
/**
* @brief Universe::setMergeMode
* @param mode
*/
void Universe::setMergeMode(const MergeMode mode)
{
mergeMode_ = mode;
}
/**
* @brief Universe::getMergeMode
* @return
*/
Universe::MergeMode Universe::getMergeMode() const
{
return mergeMode_;
}
void Universe::setValue (const uint16_t start, const uint16_t footprint,
const uint8_t* data)
{
if (!isEditable())
return;
// start and footprint valid?
if (start < 1 || start + footprint > null_start_data.size())
return;
// set active_data_slots to at least end of footprint
if (start + footprint > active_data_slots_)
{
std::unique_lock lk_ctl(mtx_control);
active_data_slots_ = start + footprint;
}
// get a copy of the current values
uint8_t og[footprint];
{
std::unique_lock lk_data(mtx_data);
std::copy_n(std::begin(null_start_data) + start, footprint, og);
}
// data not changed!
if (memcmp(data, og, footprint) == 0)
return;
// set the values
DMX::Universe::setValue(start, footprint, data);
// request sACN message to be sent
sender_->flush();
}
void Universe::setStatus(uint8_t status)
{
if (status == RX_TIMEOUT)
status = sACN_TERMINATED;
DMX::Universe::setStatus(status);
return arbitrator_;
}
@ -299,7 +348,7 @@ void Universe::setStatus(uint8_t status)
*/
void Universe::rxDmpGetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
(void)message;
(void)message;
}
@ -309,6 +358,9 @@ void Universe::rxDmpGetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
*/
void Universe::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
if (arbitrator_)
return;
// only act on the first property pair in the data
if (!message->data())
return;

View File

@ -27,6 +27,7 @@
#include "dmp/component.h"
#include "../dmx/universe.h"
#include "sacn.h"
#include "universearbitrator.h"
#include "universesender.h"
#include <cstdint>
@ -34,6 +35,10 @@
namespace sACN {
class Receiver;
class Source;
/**
* @brief \cite sACN 3.2 Universe
*
@ -46,47 +51,42 @@ class Universe
, public std::enable_shared_from_this<Universe>
{
public:
explicit Universe(Source* = nullptr);
explicit Universe(Source* = nullptr, Receiver* = nullptr);
virtual ~Universe();
/// Merging and Arbitration capabilities of the unverse
/// are implimented in the helper class sACN::UniverseSender
friend class UniverseArbitrator;
/// The Tx capabilities of the universe are impimented
/// in the helper class sACN::UniverseSender.
friend class UniverseSender;
// DMX::Universe overrides
virtual long age() const override;
virtual double rxRate() override;
virtual uint8_t status() const override;
virtual uint8_t slot(const uint16_t) const override;
virtual void setStatus(uint8_t) override;
virtual void setValue (const uint16_t start, const uint16_t footprint,
const uint8_t* data) override;
// metadata
virtual std::shared_ptr<DATA::data_header> metadata() const;
virtual void setMetadata(std::shared_ptr<DATA::data_header>);
// synchronization
virtual bool isSyncronized() const;
virtual void synchronize(uint8_t = 0);
virtual void resetSynchronization();
virtual void setSyncData(const std::vector<uint8_t> &);
virtual bool isEditable() const;
virtual uint16_t activeSlots() const;
// api for poly-source universes
virtual bool hasSources() const {return false;} //!< is not a poly-source universe @return
virtual bool hasSources() const;
virtual const std::vector<DATA::data_header> sources() const;
virtual std::shared_ptr<Universe> sourceUniverse(const DATA::data_header&);
virtual std::shared_ptr<void> onSourceListChange(const std::function<void(DMX::Universe*)>);
virtual void setHoldLastLook(const bool);
virtual bool getHoldLastLook() const;
/**
* @brief The MergeMode enum
*/
enum MergeMode {
MERGE_OTHER,
MERGE_HTP,
MERGE_LTP
};
virtual void setMergeMode(const MergeMode);
virtual MergeMode getMergeMode() const;
// DMX::Universe overrides
virtual void setValue (const uint16_t start, const uint16_t footprint,
const uint8_t* data) override;
virtual void setStatus(uint8_t) override;
UniverseArbitrator* arbitrator() const;
/**
* @brief destination IP address
@ -113,8 +113,8 @@ protected:
virtual void rxDmpSubscribe(ACN::PDU::Message<ACN::DMP::Pdu>) override;
private:
UniverseArbitrator * arbitrator_;
UniverseSender * sender_;
MergeMode mergeMode_;
std::shared_ptr<DATA::data_header> metadata_;
std::vector<uint8_t> * sync_data_;

View File

@ -22,6 +22,7 @@
SOFTWARE.
*/
#include "universe.h"
#include "universearbitrator.h"
#include "universemerger.h"
#include <map>
@ -31,10 +32,10 @@ namespace sACN {
/**
* @brief UniverseArbitrator::UniverseArbitrator
* @param universe
*/
UniverseArbitrator::UniverseArbitrator()
: sACN::Universe()
, expectedUniverse(0)
UniverseArbitrator::UniverseArbitrator(sACN::Universe * universe)
: m_universe(universe)
, hold_last_look_(true)
, m_dominant(std::weak_ptr<Universe>())
{
@ -52,161 +53,15 @@ void UniverseArbitrator::refresh()
/**
* @brief UniverseArbitrator::setHoldLastLook
* @param state
*/
void UniverseArbitrator::setHoldLastLook(const bool state)
{
std::unique_lock lk_ctl(mtx_control);
hold_last_look_ = state;
}
/**
* @brief UniverseArbitrator::getHoldLastLook
* @brief UniverseArbitrator::age
* @return
*/
bool UniverseArbitrator::getHoldLastLook() const
long UniverseArbitrator::age() const
{
std::shared_lock lk_ctl(mtx_control);
return hold_last_look_;
}
/**
* @brief UniverseArbitrator::sources
* @return
*/
const std::vector<DATA::data_header> UniverseArbitrator::sources() const
{
std::shared_lock lk_ctl(mtx_control);
std::vector<DATA::data_header> keys;
for (const auto& [key, _] : sources_)
keys.push_back(key);
return keys;
}
/**
* @brief UniverseArbitrator::sourceUniverse
* @param src
* @return
*/
std::shared_ptr<Universe> UniverseArbitrator::sourceUniverse(const DATA::data_header &src)
{
std::shared_lock lk_ctl(mtx_control);
if (!hasSourceUniverse(src))
return nullptr;
return sources_.at(src);
}
/**
* @brief UniverseArbitrator::hasSourceUniverse
* @param src
* @return
*/
bool UniverseArbitrator::hasSourceUniverse(const DATA::data_header& src) const
{
std::shared_lock lk_ctl(mtx_control);
return (sources_.count(src));
}
/**
* @brief UniverseArbitrator::addNewSource
* @param src
* @return
*/
std::shared_ptr<Universe> UniverseArbitrator::addNewSource(const DATA::data_header &src)
{
std::shared_ptr<Universe> univ;
{
std::unique_lock lk_ctl(mtx_control);
auto [itr, ok] = sources_.try_emplace(src, std::make_shared<Universe>());
if (!ok)
return nullptr;
univ = itr->second;
}
doListChangeCallbacks();
return univ;
}
/**
* @brief UniverseArbitrator::deleteSourceUniverse
* @param src
*/
void UniverseArbitrator::deleteSourceUniverse(const DATA::data_header& src)
{
if (!hasSourceUniverse(src))
return;
if (sources_.size() > 1 || !hold_last_look_)
{
std::unique_lock lk_ctl(mtx_control);
sources_.erase(src);
}
else
{
std::shared_lock lk_ctl(mtx_control);
/// Set the universe's status to TERMINATED.
sources_.at(src)->setStatus(Universe::sACN_TERMINATED);
/// Resetting the sequencing on terminated universes results in faster reaquisition
/// from the same source without waiting for the sequence to realign.
sources_.at(src)->metadata()->sequence_number = 0;
}
find_dominant_();
doListChangeCallbacks();
}
/**
* @brief UniverseArbitrator::onSourceListChange
* @param cb
* @return
*/
std::shared_ptr<void> UniverseArbitrator::onSourceListChange(const std::function<void(DMX::Universe*)> cb)
{
std::unique_lock lk_ctl(mtx_control);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void(DMX::Universe*)>>(std::move(cb));
// add callback to list (as a weak pointer)
cb_sourceListChange.push_back(sp);
// return token that caller must keep throughout it's scope
return sp;
}
/**
* @brief UniverseArbitrator::isSyncronized
* @return
*/
bool UniverseArbitrator::isSyncronized() const
{
auto universe = m_dominant.lock();
if (!universe)
return false;
return universe->isSyncronized();
}
/**
* @brief UniverseArbitrator::metadata
* @return
*/
std::shared_ptr<DATA::data_header> UniverseArbitrator::metadata() const
{
auto universe = m_dominant.lock();
if (universe)
return universe->metadata();
std::shared_lock lk_ctl(mtx_control);
auto metadata = std::make_shared<DATA::data_header>();
metadata->universe = expectedUniverse;
metadata->priority = 255; // invalid
return metadata;
auto universe = m_dominant.lock();
if (!universe)
return 0;
return universe->age();
}
@ -216,45 +71,10 @@ std::shared_ptr<DATA::data_header> UniverseArbitrator::metadata() const
*/
uint8_t UniverseArbitrator::status() const
{
auto universe = m_dominant.lock();
if (!universe)
return Universe::DMX_NULL;
return universe->status();
}
/**
* @brief UniverseArbitrator::synchronize
* @param sequence_number
*/
void UniverseArbitrator::synchronize(uint8_t sequence_number)
{
std::shared_lock lk_ctl(mtx_control);
for ( auto& [_, uni] : sources_)
uni->synchronize(sequence_number);
}
/**
* @brief UniverseArbitrator::isEditable
* @return
*/
bool UniverseArbitrator::isEditable() const
{
return false;
}
/**
* @brief UniverseArbitrator::activeSlots
* @return
*/
uint16_t UniverseArbitrator::activeSlots() const
{
auto universe = m_dominant.lock();
if (!universe)
return 0;
return universe->activeSlots();
auto universe = m_dominant.lock();
if (!universe)
return Universe::DMX_NULL;
return universe->status();
}
@ -265,10 +85,10 @@ uint16_t UniverseArbitrator::activeSlots() const
*/
uint8_t UniverseArbitrator::slot(const uint16_t address) const
{
auto universe = m_dominant.lock();
if (!universe)
return 0;
return universe->slot(address);
auto universe = m_dominant.lock();
if (!universe)
return 0;
return universe->slot(address);
}
@ -278,24 +98,210 @@ uint8_t UniverseArbitrator::slot(const uint16_t address) const
*/
double UniverseArbitrator::rxRate()
{
refresh();
auto universe = m_dominant.lock();
if (!universe)
return 0.0;
return universe->rxRate();
refresh();
auto universe = m_dominant.lock();
if (!universe)
return 0.0;
return universe->rxRate();
}
/**
* @brief UniverseArbitrator::rxDmpSetProperty
* @param message
* @brief UniverseArbitrator::metadata
* @return
*/
void UniverseArbitrator::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
std::shared_ptr<DATA::data_header> UniverseArbitrator::metadata() const
{
(void)message;
auto universe = m_dominant.lock();
if (universe)
return universe->metadata();
m_universe->metadata_->priority = 255; // invalid
return m_universe->metadata_;
}
/**
* @brief UniverseArbitrator::isEditable
* @return
*/
bool UniverseArbitrator::isEditable() const
{
return false;
}
/**
* @brief UniverseArbitrator::activeSlots
* @return
*/
uint16_t UniverseArbitrator::activeSlots() const
{
auto universe = m_dominant.lock();
if (!universe)
return 0;
return universe->activeSlots();
}
/**
* @brief UniverseArbitrator::sources
* @return
*/
const std::vector<DATA::data_header> UniverseArbitrator::sources() const
{
std::shared_lock lk_ctl(mtx_arbitrate);
std::vector<DATA::data_header> keys;
for (const auto& [key, _] : sources_)
keys.push_back(key);
return keys;
}
/**
* @brief UniverseArbitrator::hasSourceUniverse
* @param src
* @return
*/
bool UniverseArbitrator::hasSourceUniverse(const DATA::data_header& src) const
{
std::shared_lock lk_ctl(mtx_arbitrate);
return (sources_.count(src));
}
/**
* @brief UniverseArbitrator::sourceUniverse
* @param src
* @return
*/
std::shared_ptr<Universe> UniverseArbitrator::sourceUniverse(const DATA::data_header &src)
{
std::shared_lock lk_ctl(mtx_arbitrate);
if (!hasSourceUniverse(src))
return nullptr;
return sources_.at(src);
}
/**
* @brief UniverseArbitrator::addNewSource
* @param src
* @return
*/
std::shared_ptr<Universe> UniverseArbitrator::addNewSource(const DATA::data_header &src)
{
std::shared_ptr<Universe> univ;
{
std::unique_lock lk_ctl(mtx_arbitrate);
auto [itr, ok] = sources_.emplace(src, std::make_shared<Universe>());
univ = itr->second;
if (!ok)
return univ;
// source_tokens_.push_back(univ->onStatusChange([this](DMX::Universe*) {
// refresh();
// }));
}
return univ;
}
/**
* @brief UniverseArbitrator::deleteSourceUniverse
* @param src
*/
void UniverseArbitrator::deleteSourceUniverse(const DATA::data_header& src)
{
if (!hasSourceUniverse(src))
return;
if (sources_.size() > 1 || !hold_last_look_)
{
std::unique_lock lk_ctl(mtx_arbitrate);
sources_.erase(src);
}
else
{
std::shared_lock lk_ctl(mtx_arbitrate);
/// Set the universe's status to TERMINATED.
sources_.at(src)->setStatus(Universe::sACN_TERMINATED);
/// Resetting the sequencing on terminated universes results in faster reaquisition
/// from the same source without waiting for the sequence to realign.
sources_.at(src)->metadata()->sequence_number = 0;
}
find_dominant_();
doListChangeCallbacks();
}
/**
* @brief UniverseArbitrator::onSourceListChange
* @param cb
* @return
*/
std::shared_ptr<void> UniverseArbitrator::onSourceListChange(const std::function<void(DMX::Universe*)> cb)
{
std::unique_lock lk_ctl(mtx_arbitrate);
// wrap the callback with a shared pointer
auto sp = std::make_shared<std::function<void(DMX::Universe*)>>(std::move(cb));
// add callback to list (as a weak pointer)
cb_sourceListChange.push_back(sp);
// return token that caller must keep throughout it's scope
return sp;
}
/**
* @brief UniverseArbitrator::setHoldLastLook
* @param state
*/
void UniverseArbitrator::setHoldLastLook(const bool state)
{
std::unique_lock lk_ctl(mtx_arbitrate);
hold_last_look_ = state;
}
/**
* @brief UniverseArbitrator::getHoldLastLook
* @return
*/
bool UniverseArbitrator::getHoldLastLook() const
{
std::shared_lock lk_ctl(mtx_arbitrate);
return hold_last_look_;
}
/**
* @brief UniverseArbitrator::setMergeMode
* @param mode
*/
void UniverseArbitrator::setMergeMode(const MergeMode mode)
{
merge_mode_ = mode;
}
/**
* @brief UniverseArbitrator::getMergeMode
* @return
*/
UniverseArbitrator::MergeMode UniverseArbitrator::getMergeMode() const
{
return merge_mode_;
}
/**
* @brief UniverseArbitrator::doSourceListChange
*/
void UniverseArbitrator::doListChangeCallbacks()
{
m_universe->do_callbacks_(cb_sourceListChange);
}
/**
* @brief UniverseArbitrator::find_dominant_
@ -311,7 +317,7 @@ void UniverseArbitrator::find_dominant_()
// cache the age of each universe
std::unordered_map<DATA::data_header,uint> ages;
{
std::shared_lock lk_ctl(mtx_control);
std::shared_lock lk_ctl(mtx_arbitrate);
for (const auto& [header, universe] : sources_)
ages.insert({header, universe->age()});
}
@ -320,7 +326,7 @@ void UniverseArbitrator::find_dominant_()
std::map<uint,std::multimap<uint,std::shared_ptr<Universe>>> by_priority;
{
std::shared_lock lk_ctl(mtx_control);
std::shared_lock lk_ctl(mtx_arbitrate);
for (const auto& [header, universe] : sources_)
{
auto age = ages.at(header);
@ -339,16 +345,16 @@ void UniverseArbitrator::find_dominant_()
if (universe != m_dominant.lock())
{
{
std::unique_lock lk_ctl(mtx_control);
std::unique_lock lk_ctl(mtx_arbitrate);
m_dominant = universe;
}
cb_tokens_.clear();
cb_tokens_.push_back(universe->onDataChange([this](DMX::Universe*){doDataCallbacks();}));
cb_tokens_.push_back(universe->onStatusChange([this](DMX::Universe*){doStatusCallbacks();}));
cb_tokens_.push_back(universe->onDataChange([this](DMX::Universe*){m_universe->doDataCallbacks();}));
cb_tokens_.push_back(universe->onStatusChange([this](DMX::Universe*){m_universe->doStatusCallbacks();}));
doListChangeCallbacks();
doDataCallbacks();
doStatusCallbacks();
m_universe->doDataCallbacks();
m_universe->doStatusCallbacks();
}
return;
}
@ -376,18 +382,18 @@ void UniverseArbitrator::find_dominant_()
#endif
merged->clear();
}
merged->Universe::metadata()->universe = expectedUniverse;
merged->Universe::metadata()->universe = m_universe->metadata_->universe;
merged->Universe::metadata()->priority = by_priority.crbegin()->first;
for (const auto & [_, universe] : by_priority.crbegin()->second)
merged->addSource(universe);
m_dominant = merged;
cb_tokens_.clear();
cb_tokens_.push_back(merged->onDataChange([this](DMX::Universe*){doDataCallbacks();}));
cb_tokens_.push_back(merged->onStatusChange([this](DMX::Universe*){doStatusCallbacks();}));
cb_tokens_.push_back(merged->onDataChange([this](DMX::Universe*){m_universe->doDataCallbacks();}));
cb_tokens_.push_back(merged->onStatusChange([this](DMX::Universe*){m_universe->doStatusCallbacks();}));
doDataCallbacks();
doStatusCallbacks();
m_universe->doDataCallbacks();
m_universe->doStatusCallbacks();
doListChangeCallbacks();
}
@ -399,7 +405,7 @@ void UniverseArbitrator::purge_stale_sources_()
{
std::unordered_map<DATA::data_header,uint> ages;
{
std::shared_lock lk_ctl(mtx_control);
std::shared_lock lk_ctl(mtx_arbitrate);
for (const auto& [header, universe] : sources_)
{
universe->rxRate(); // DMX::Universe::rx_timeout for maintenance tasks
@ -410,7 +416,7 @@ void UniverseArbitrator::purge_stale_sources_()
// order the member universes by age
std::multimap<uint,DATA::data_header> by_age;
{
std::shared_lock lk_ctl(mtx_control);
std::shared_lock lk_ctl(mtx_arbitrate);
for (const auto& [header, _] : sources_)
by_age.insert({ages.at(header), header});
}

View File

@ -23,13 +23,19 @@
*/
#pragma once
#include "universe.h"
#include "data.h"
#include <memory>
#include <unordered_map>
#include <shared_mutex>
namespace DMX {
class Universe;
}
namespace sACN {
class Universe;
/**
* @brief Priority based selection of multiple source universes
*
@ -52,55 +58,64 @@ namespace sACN {
* dominant source.
*/
class UniverseArbitrator
: public sACN::Universe
{
public:
explicit UniverseArbitrator();
explicit UniverseArbitrator(sACN::Universe*);
uint16_t expectedUniverse; ///< Expected universe number
void refresh();
// Source universes:
void deleteSourceUniverse(const DATA::data_header&);
// DMX::Universe API:
long age() const;
uint8_t status() const;
uint8_t slot(const uint16_t) const;
double rxRate();
// sACN::Universe API
std::shared_ptr<DATA::data_header> metadata() const;
bool isEditable() const;
uint16_t activeSlots() const;
const std::vector<DATA::data_header> sources() const;
// Source universes
bool hasSourceUniverse(const DATA::data_header&) const;
std::shared_ptr<Universe> sourceUniverse(const DATA::data_header&);
std::shared_ptr<Universe> addNewSource(const DATA::data_header&);
void deleteSourceUniverse(const DATA::data_header&);
std::shared_ptr<void> onSourceListChange(const std::function<void(DMX::Universe*)>);
// DMX::Universe overrides:
virtual uint8_t status() const override;
uint8_t slot(const uint16_t) const override;
double rxRate() override;
// hold-last-look
void setHoldLastLook(const bool);
bool getHoldLastLook() const;
// sACN::Universe Overrides
std::shared_ptr<DATA::data_header> metadata() const override;
void setMetadata(std::shared_ptr<DATA::data_header>) override {};
bool isSyncronized() const override;
void synchronize(uint8_t = 0) override;
bool isEditable() const override;
uint16_t activeSlots() const override;
// api for poly-source universes
virtual bool hasSources() const override {return true;} //!< is a poly-source universe @return
const std::vector<DATA::data_header> sources() const override;
std::shared_ptr<Universe> sourceUniverse(const DATA::data_header&) override;
std::shared_ptr<void> onSourceListChange(const std::function<void(DMX::Universe*)>) override;
virtual void setHoldLastLook(const bool) override;
virtual bool getHoldLastLook() const override;
// merge-mode
/**
* @brief The MergeMode enum
*/
enum MergeMode {
MERGE_OTHER,
MERGE_HTP,
MERGE_LTP
};
void setMergeMode(const MergeMode);
MergeMode getMergeMode() const;
protected:
inline void doListChangeCallbacks() {do_callbacks_(cb_sourceListChange);} //!< List Change Callbacks
// ACN::DMP::Component overrides
virtual void rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu>) override;
inline void doListChangeCallbacks();
private:
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::shared_ptr<void>> cb_tokens_; //!< source universe callback tokens
std::vector<std::weak_ptr<const std::function<void(DMX::Universe*)>>> cb_sourceListChange; //!< list of calback functions
bool hold_last_look_;
std::weak_ptr<Universe> m_dominant;
void find_dominant_();
void purge_stale_sources_();
bool hasSourceUniverse(const DATA::data_header&) const;
sACN::Universe * m_universe;
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::shared_ptr<void>> cb_tokens_; //!< dominant universe callback tokens
std::vector<std::shared_ptr<void>> source_tokens_; //!< source universe status changes
std::vector<std::weak_ptr<const std::function<void(DMX::Universe*)>>> cb_sourceListChange; //!< list of calback functions
bool hold_last_look_;
MergeMode merge_mode_;
std::weak_ptr<Universe> m_dominant;
mutable std::shared_mutex mtx_arbitrate;
};
} // SACN namespace