From d67b3b8f40950d2ae62eadb77bb47455eae7f4a2 Mon Sep 17 00:00:00 2001 From: Kevin Matz Date: Thu, 8 Dec 2022 18:05:33 -0500 Subject: [PATCH] adapt to the DMP rx/tx mechanism --- protocol/esta/sacn/arbitratinguniverse.cpp | 42 +++-- protocol/esta/sacn/arbitratinguniverse.h | 4 +- protocol/esta/sacn/receiver.cpp | 22 +-- protocol/esta/sacn/universe.cpp | 170 +++++++++++---------- protocol/esta/sacn/universe.h | 12 +- 5 files changed, 119 insertions(+), 131 deletions(-) diff --git a/protocol/esta/sacn/arbitratinguniverse.cpp b/protocol/esta/sacn/arbitratinguniverse.cpp index 8514ae6..d2d0f24 100644 --- a/protocol/esta/sacn/arbitratinguniverse.cpp +++ b/protocol/esta/sacn/arbitratinguniverse.cpp @@ -198,38 +198,19 @@ bool ArbitratingUniverse::isSyncronized() /** - * @brief ArbitratingUniverse::provenance + * @brief ArbitratingUniverse::metadata * @return */ -std::shared_ptr ArbitratingUniverse::provenance() +std::shared_ptr ArbitratingUniverse::metadata() { auto universe = dominant_(); if (universe) return universe->metadata(); - auto prov = std::make_shared(); - prov->universe = expectedUniverse; - prov->priority = 255; // invalid - return prov; -} - - -/** - * @brief ArbitratingUniverse::set - * @param pdu - * @param src - */ -void ArbitratingUniverse::set(std::shared_ptr pdu, - std::shared_ptr src) -{ - if (expectedUniverse != 0 && - src->universe != expectedUniverse) - return; - - if (!sources_.count(*src)) - addNewSource(*src); - - sources_.at(*src)->set(pdu, src); + auto metadata = std::make_shared(); + metadata->universe = expectedUniverse; + metadata->priority = 255; // invalid + return metadata; } @@ -302,6 +283,17 @@ double ArbitratingUniverse::rxRate() } +/** + * @brief ArbitratingUniverse::rxDmpSetProperty + * @param message + */ +void ArbitratingUniverse::rxDmpSetProperty(ACN::PDU::Message message) +{ + (void)message; +} + + + /** * @brief MergeProxyUniverse::dominant_ * @return diff --git a/protocol/esta/sacn/arbitratinguniverse.h b/protocol/esta/sacn/arbitratinguniverse.h index 0402831..de2bc35 100644 --- a/protocol/esta/sacn/arbitratinguniverse.h +++ b/protocol/esta/sacn/arbitratinguniverse.h @@ -68,7 +68,6 @@ public: void dataChangedNotifier(DMX::Universe* universe); // SACN::Universe overrides: - void set(std::shared_ptr, std::shared_ptr) override; virtual uint8_t status() override; std::shared_ptr metadata() override; @@ -94,6 +93,9 @@ public: protected: void doListChangeCallbacks(); + // ACN::DMP::Component overrides + virtual void rxDmpSetProperty(ACN::PDU::Message) override; + private: std::unordered_map> sources_; std::vector> source_data_tokens; //!< source universe data change tokens diff --git a/protocol/esta/sacn/receiver.cpp b/protocol/esta/sacn/receiver.cpp index f2df133..2ea719c 100644 --- a/protocol/esta/sacn/receiver.cpp +++ b/protocol/esta/sacn/receiver.cpp @@ -202,7 +202,8 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) { auto metadata = std::static_pointer_cast(frame->header()); if (!universes_.count(metadata->universe)) - return; + return; // not subscribed to this universe + auto universe = universes_.at(metadata->universe)->sourceUniverse(*metadata); if (!universe) universe = universes_.at(metadata->universe)->addNewSource(*metadata); @@ -213,7 +214,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) { /// > This bit, when set to 1, indicates that the data in this packet is /// > intended for use in visualization or media server preview applications /// > and shall not be used to generate live output. - /// \bug Preview_Data flag is ignored + /// \bug Preview_Data flag causes the frame to be ignored. if (metadata->options.preview_data) return; @@ -294,22 +295,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message frame) { // PDU data will be a block of DMP auto block = std::static_pointer_cast>(frame->data()); universe->setMetadata(metadata); - for (const auto & dmp : *block->pdu) - { - /// > \cite sACN 7.2 DMP Layer: Vector - /// > - /// > The DMP Layer's Vector shall be set to VECTOR_DMP_SET_PROPERTY, - /// > which indicates a DMP Set Property message by sources. Receivers - /// > shall discard the packet if the received value is not - /// > VECTOR_DMP_SET_PROPERTY. - switch(dmp->vector()) { - case ACN::DMP::SET_PROPERTY: - universe->set(dmp, metadata); - break; - default: - break; - } - } + universe->DmpReceiver(block); } diff --git a/protocol/esta/sacn/universe.cpp b/protocol/esta/sacn/universe.cpp index 9e3e534..1ff858d 100644 --- a/protocol/esta/sacn/universe.cpp +++ b/protocol/esta/sacn/universe.cpp @@ -42,6 +42,9 @@ Universe::Universe(Source* src) , tx_worker_(&Universe::tx_loop_, this) { destination.type = ACN::SDT::SDT_ADDR_NULL; + + // add ourself as a sender of our own DMP messages + addDmpSender(std::bind(&Universe::dataFrameSender, this, std::placeholders::_1)); } @@ -66,29 +69,36 @@ Universe::~Universe() /** - * @brief Universe::set - * @param dmp - * @param source + * @brief Universe::rxDmpGetProperty + * @param message */ -void Universe::set(ACN::PDU::Message dmp, - std::shared_ptr source) +void Universe::rxDmpGetProperty(ACN::PDU::Message message) { + (void)message; +} + + +/** + * @brief Universe::rxDmpSetProperty + * @param message + */ +void Universe::rxDmpSetProperty(ACN::PDU::Message message) +{ + // only act on the first property pair in the data + if (!message->data()) + return; + auto set_data = std::static_pointer_cast(message->data()); + if (set_data->properties.empty()) + return; + const auto& [range, data] = set_data->properties.front(); + /// > \cite sACN 7.3 Address Type and Data Type /// > /// > Sources shall set the DMP Layer's Address Type and Data Type to 0xa1. /// > Receivers shall discard the packet if the received value is not 0xa1. - if (!dmp->header()) - return; - auto type = std::static_pointer_cast(dmp->header()); - if (type->relative) return; - if (type->data_type != ACN::DMP::ARRAY) return; - if (type->address_length != ACN::DMP::TWO) return; - - // only act on the first property pair in the data - if (!dmp->data()) - return; - auto set_data = std::static_pointer_cast(dmp->data()); - const auto& [range, data] = set_data->properties.front(); + if (range.isRelative()) return; + if (range.type() != ACN::DMP::ARRAY) return; + if (range.elementLength() != ACN::DMP::TWO) return; /// > \cite sACN 7.4 First Property Address /// > @@ -108,49 +118,28 @@ void Universe::set(ACN::PDU::Message dmp, /// > /// > The DMP Layer's Property Value Count is used to encode the number of /// > DMX512-A [DMX] Slots (including the START Code slot). - if (range.count < 1 || range.count > 513) + if (range.count < 1 || range.count > null_start_data.size()) return; - active_data_slots_ = range.address + range.count; - setProvenance(source); - if (/// > \cite sACN 6.2.4.1 If a receiver is presented with an E1.31 Data - /// > - /// > Packet containing a Synchronization Address of 0, it shall discard - /// > any data waiting to be processed and immediately act on that Data - /// > Packet. - (source->sync_address == 0) || - /// > \cite sACN 6.2.6 E1.31 Data Packet: Options Force_Synchronization - /// > - /// > This bit indicates whether to lock or revert to an unsynchronized - /// > state when synchronization is lost. When set to 0, components that - /// > had been operating in a synchronized state shall not update with - /// > any new packets until synchronization resumes. When set to 1, once - /// > synchronization has been lost, components that had been operating - /// > in a synchronized state need not wait for a new E1.31 - /// > Synchronization Packet in order to update to the next E1.31 Data - /// > Packet. - (isSyncronized() && source->options.force_synchronization && rxRate() == 0)) - { - if (sync_data_) - { - delete sync_data_; - sync_data_ = nullptr; - } - /// > \cite sACN 7.7 Property Values (DMX512-A Data) - /// > - /// > The DMP Layer's Property values field is used to encode the - /// > DMX512-A [DMX] START Code and data. - DMX::Universe::setData(data); - } - else - sync_data_ = new std::vector(data); + /// > \cite sACN 7.7 Property Values (DMX512-A Data) + /// > + /// > The DMP Layer's Property values field is used to encode the + /// > DMX512-A [DMX] START Code and data. + DMX::Universe::setData(data); } /** + * @brief Universe::rxDmpGetProperty + * @param message */ +void Universe::rxDmpSubscribe(ACN::PDU::Message message) { + (void)message; +} + + /** * @brief Universe::setMetadata * @param metadata @@ -332,49 +321,59 @@ void Universe::resetSynchronization() /** * @brief Universe::setSyncData * @param data - * @brief Universe::sACNsend */ void Universe::setSyncData(const std::vector & data) -void Universe::sACNsend() const { resetSynchronization(); sync_data_ = new std::vector(data); } - /// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number - /// > - /// > ... The sequence number for a universe shall be incremented by one for - /// > every packet sent on that universe... - metadata_->sequence_number++; - // header - auto addrtyp = std::make_shared(); - addrtyp->address_length = ACN::DMP::TWO; - addrtyp->x_reserved = 0; - addrtyp->data_type = ACN::DMP::ARRAY; - addrtyp->relative = false; - addrtyp->z_reserved = true; // buy why? Its in the standard... - // property range - ACN::DMP::Range pr(*addrtyp); - pr.address = 0; - pr.incriment = 1; - pr.count = (active_data_slots_ <= 513 ? active_data_slots_ : 513); +/** + * @brief Universe::as_DMP_ + * @return + */ +ACN::PDU::Message Universe::as_DMP_() const +{ + // header + auto addrtyp = std::make_shared(); + addrtyp->address_length = ACN::DMP::TWO; + addrtyp->x_reserved = 0; + addrtyp->data_type = ACN::DMP::ARRAY; + addrtyp->relative = false; + addrtyp->z_reserved = true; // buy why? Its in the standard... - // property data - std::vector pd; - std::copy(null_start_data.begin(), null_start_data.begin() + pr.count, - std::back_inserter(pd)); + // property range + ACN::DMP::Range pr(*addrtyp); + pr.address = 0; + pr.incriment = 1; + pr.count = active_data_slots_ < null_start_data.size() ? active_data_slots_ : null_start_data.size(); - // data segment - auto addrlst = std::make_shared(*addrtyp); - addrlst->properties.emplace_back(ACN::DMP::address_data_pair(pr, pd)); + // property data + std::vector pd; + std::copy_n(null_start_data.begin(), pr.count, std::back_inserter(pd)); - // DMP layer - auto dmp = std::make_shared(); - dmp->setVector(ACN::DMP::SET_PROPERTY); - dmp->setHeader(addrtyp); - dmp->setData(addrlst); + // data segment + auto addrlst = std::make_shared(*addrtyp); + addrlst->properties.emplace_back(ACN::DMP::address_data_pair(pr, pd)); + // DMP layer + auto dmp = std::make_shared(); + dmp->setVector(ACN::DMP::SET_PROPERTY); + dmp->setHeader(addrtyp); + dmp->setData(addrlst); + + return dmp; +} + + + +/** + * @brief Universe::dataFrameSender + * @param dmp + */ +void Universe::dataFrameSender(ACN::PDU::Message dmp) const +{ // sACN Framing Layer auto frame = std::make_shared(); frame->setVector(VECTOR_E131_DATA_PACKET); @@ -481,8 +480,13 @@ void Universe::tx_loop_() } // send the sACN message - sACNsend(); + sendDMP(as_DMP_()); last_updated_ = std::chrono::system_clock::now(); + /// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number + /// > + /// > ... The sequence number for a universe shall be incremented by one for + /// > every packet sent on that universe... + metadata_->sequence_number++; // sleep before the next cycle tx_request_.wait_for(mtx, sleep); diff --git a/protocol/esta/sacn/universe.h b/protocol/esta/sacn/universe.h index 7126670..77e8ea3 100644 --- a/protocol/esta/sacn/universe.h +++ b/protocol/esta/sacn/universe.h @@ -53,8 +53,6 @@ public: Universe(Source* = nullptr); ~Universe(); - virtual void set(ACN::PDU::Message, std::shared_ptr); - virtual std::shared_ptr metadata(); virtual void setMetadata(std::shared_ptr); @@ -97,7 +95,12 @@ public: }; protected: - void sACNsend() const; + void dataFrameSender(ACN::PDU::Message) const; + + // ACN::DMP::Component overrides + virtual void rxDmpGetProperty(ACN::PDU::Message) override; + virtual void rxDmpSetProperty(ACN::PDU::Message) override; + virtual void rxDmpSubscribe(ACN::PDU::Message) override; private: std::shared_ptr metadata_; @@ -127,12 +130,13 @@ private: * > the sequence number of an E1.31 Data Packet on that same universe. */ uint8_t sync_sequence_; - std::condition_variable_any tx_request_; std::mutex tx_control_mutex_; bool tx_enable_; std::thread tx_worker_; void tx_loop_(); + + ACN::PDU::Message as_DMP_() const; }; } // SACN namespace