From 466efd7477e01f0f438804496c7c65a3a5a99f86 Mon Sep 17 00:00:00 2001 From: Kevin Matz Date: Thu, 8 Dec 2022 19:09:39 -0500 Subject: [PATCH] move tx to a helper class --- protocol/esta/sacn/CMakeLists.txt | 2 + protocol/esta/sacn/universe.cpp | 116 ++----------------- protocol/esta/sacn/universe.h | 12 +- protocol/esta/sacn/universesender.cpp | 155 ++++++++++++++++++++++++++ protocol/esta/sacn/universesender.h | 88 +++++++++++++++ 5 files changed, 256 insertions(+), 117 deletions(-) create mode 100644 protocol/esta/sacn/universesender.cpp create mode 100644 protocol/esta/sacn/universesender.h diff --git a/protocol/esta/sacn/CMakeLists.txt b/protocol/esta/sacn/CMakeLists.txt index d857fc1..b62363f 100644 --- a/protocol/esta/sacn/CMakeLists.txt +++ b/protocol/esta/sacn/CMakeLists.txt @@ -20,6 +20,8 @@ target_sources(${PROJECT_NAME} sacn.h source.cpp universe.cpp + universesender.h + universesender.cpp ) target_link_libraries(${PROJECT_NAME} diff --git a/protocol/esta/sacn/universe.cpp b/protocol/esta/sacn/universe.cpp index 1ff858d..6afc062 100644 --- a/protocol/esta/sacn/universe.cpp +++ b/protocol/esta/sacn/universe.cpp @@ -38,11 +38,14 @@ Universe::Universe(Source* src) , source_(src) , active_data_slots_(1) // start code , sync_sequence_(0) - , tx_enable_(true) - , tx_worker_(&Universe::tx_loop_, this) + , sender_(nullptr) { destination.type = ACN::SDT::SDT_ADDR_NULL; + // create a sending thread + if (src) + sender_ = new UniverseSender(this); + // add ourself as a sender of our own DMP messages addDmpSender(std::bind(&Universe::dataFrameSender, this, std::placeholders::_1)); } @@ -54,14 +57,7 @@ Universe::Universe(Source* src) Universe::~Universe() { // shut down tx worker thread - if (tx_worker_.joinable()) - { - tx_control_mutex_.lock(); - tx_enable_ = false; - tx_control_mutex_.unlock(); - tx_request_.notify_all(); - tx_worker_.join(); - } + delete sender_; // delete sync data pointer delete sync_data_; @@ -255,7 +251,7 @@ void Universe::setValue (const uint16_t start, const uint16_t footprint, DMX::Universe::setValue(start, footprint, data); // request sACN message to be sent - tx_request_.notify_all(); + sender_->flush(); } @@ -395,102 +391,4 @@ void Universe::dataFrameSender(ACN::PDU::Message dmp) const } -/** - * @brief Universe::tx_loop_ - */ -void Universe::tx_loop_() -{ - // rx universes, by definition, don't need to tx. - if (!isEditable()) - return; - - /// \cite sACN 6.2.6 E1.31 Data Packet: Options - Stream_Terminated - /// Three packets containing this bit ... shall be sent by sources upon terminating - /// sourcing of a universe. - int terminated_resend = 3; - - /// > \cite sACN 6.6.1 Transmission Rate - /// > - /// > E1.31 sources shall not transmit packets for a given universe number - /// > at a rate which exceeds the maximum refresh rate specified in - /// > E1.11 \cite DMX - /// - /// > \cite DMX Table 6 - Timing Diagram Values - output of transmitting UART - /// > - /// > Minimum Update Time for 513 slots : 22.7ms = 22700µs - std::chrono::microseconds minimum_update_time(22700); - - /// > \cite sACN 6.6.2 Null START Code Transmission Requirements in E1.31 - /// > Data Packets - /// > - /// > 1. Three packets containing the non-changing Property Values - /// > (corresponding to DMX512-A slot data) shall be sent before the - /// > initiation of transmission suppression. - DMX::DimmerData last_null_data({0}); - unsigned int retransmission_count = 0; - - /// > 2. Thereafter, a single keep-alive packet shall be transmitted at - /// > intervals of between 800mS and 1000mS. - std::chrono::milliseconds keep_alive_interval(800); - - // allocate the control variables used by the loop - bool enable = true; // at least 1 loop - bool new_data; - std::chrono::nanoseconds elapsed; - std::chrono::microseconds sleep; - std::mutex mtx; // std::conditional_variable requires in the sleeping thread. Why?? - - while (enable || terminated_resend > 0) - { - // enforce strict minimum update times - elapsed = std::chrono::system_clock::now() - last_updated_; - if (elapsed < minimum_update_time) - std::this_thread::sleep_for(minimum_update_time - elapsed); - - // check for control permission to continue looping - tx_control_mutex_.lock(); - enable = tx_enable_; - tx_control_mutex_.unlock(); - - if (enable) - { - setStatus(DMX_ACTIVE); - null_start_mutex.lock(); - new_data = (null_start_data != last_null_data); - if (new_data) - { - last_null_data = null_start_data; - retransmission_count = 0; - } - null_start_mutex.unlock(); - if (++retransmission_count > 3) - { - retransmission_count = 3; // prevent counter from overflowing - sleep = keep_alive_interval; - } - else - sleep = minimum_update_time; - } - else - { - setStatus(sACN_TERMINATED); // note the changed operating status - metadata_->options.stream_terminated = true; // set the stream_terminated bit - --terminated_resend; // stream_terminated must be sent 3 times - sleep = minimum_update_time; // stay throttled up through the termination sequence - } - - // send the sACN message - sendDMP(as_DMP_()); - last_updated_ = std::chrono::system_clock::now(); - /// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number - /// > - /// > ... The sequence number for a universe shall be incremented by one for - /// > every packet sent on that universe... - metadata_->sequence_number++; - - // sleep before the next cycle - tx_request_.wait_for(mtx, sleep); - } -} - }; // namespace SACN diff --git a/protocol/esta/sacn/universe.h b/protocol/esta/sacn/universe.h index 77e8ea3..75e0760 100644 --- a/protocol/esta/sacn/universe.h +++ b/protocol/esta/sacn/universe.h @@ -27,11 +27,9 @@ #include "dmp/component.h" #include "../dmx/universe.h" #include "sacn.h" +#include "universesender.h" #include -#include -#include -#include #include namespace sACN { @@ -53,6 +51,8 @@ public: Universe(Source* = nullptr); ~Universe(); + friend class UniverseSender; + virtual std::shared_ptr metadata(); virtual void setMetadata(std::shared_ptr); @@ -130,12 +130,8 @@ private: * > the sequence number of an E1.31 Data Packet on that same universe. */ uint8_t sync_sequence_; - std::condition_variable_any tx_request_; - std::mutex tx_control_mutex_; - bool tx_enable_; - std::thread tx_worker_; - void tx_loop_(); + UniverseSender * sender_; ACN::PDU::Message as_DMP_() const; }; diff --git a/protocol/esta/sacn/universesender.cpp b/protocol/esta/sacn/universesender.cpp new file mode 100644 index 0000000..fa3620a --- /dev/null +++ b/protocol/esta/sacn/universesender.cpp @@ -0,0 +1,155 @@ +/* + universesender.cpp + + Copyright (c) 2020 Kevin Matz (kevin.matz@gmail.com) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#include "universesender.h" +#include "universe.h" + +namespace sACN { + +/** + * @brief UniverseSender::UniverseSender + * @param universe + */ +UniverseSender::UniverseSender(Universe * universe) + : mUniverse(universe) + , terminated_resend(3) + , minimum_update_time(22700) + , last_null_data({0}) + , retransmission_count(0) + , keep_alive_interval(800) + , enable_(true) + , worker_(&UniverseSender::loop_, this) +{ +} + + +/** + * @brief UniverseSender::~UniverseSender + */ +UniverseSender::~UniverseSender() +{ + if (isSending()) + { + kill(); + worker_.join(); + } +} + + +/** + * @brief UniverseSender::isSending + * @return + */ +bool UniverseSender::isSending() +{ + return (worker_.joinable()); +} + + +/** + * @brief UniverseSender::flush + */ +void UniverseSender::flush() +{ + request_.notify_all(); +} + + +/** + * @brief UniverseSender::kill + */ +void UniverseSender::kill() +{ + control_mutex_.lock(); + enable_ = false; + control_mutex_.unlock(); + flush(); +} + + +/** + * @brief UniverseSender::loop_ + */ +void UniverseSender::loop_() +{ + bool enabled = true; // at least 1 loop + bool new_data; + std::chrono::nanoseconds elapsed; + std::chrono::microseconds sleep; + std::mutex mtx; // std::conditional_variable requires in the sleeping thread. Why?? + + while (enabled || terminated_resend > 0) + { + // enforce strict minimum update times + elapsed = std::chrono::system_clock::now() - mUniverse->last_updated_; + if (elapsed < minimum_update_time) + std::this_thread::sleep_for(minimum_update_time - elapsed); + + // check for control permission to continue looping + control_mutex_.lock(); + enabled = enable_; + control_mutex_.unlock(); + + if (enable_) + { + mUniverse->setStatus(Universe::DMX_ACTIVE); + mUniverse->null_start_mutex.lock(); + new_data = (mUniverse->null_start_data != last_null_data); + if (new_data) + { + last_null_data = mUniverse->null_start_data; + retransmission_count = 0; + } + mUniverse->null_start_mutex.unlock(); + if (++retransmission_count > 3) + { + retransmission_count = 3; // prevent counter from overflowing + sleep = keep_alive_interval; + } + else + sleep = minimum_update_time; + } + else + { + mUniverse->setStatus(Universe::sACN_TERMINATED); // note the changed operating status + mUniverse->metadata_->options.stream_terminated = true; // set the stream_terminated bit + --terminated_resend; // stream_terminated must be sent 3 times + sleep = minimum_update_time; // stay throttled up through the termination sequence + } + + // send the sACN message + mUniverse->sendDMP(mUniverse->as_DMP_()); + mUniverse->last_updated_ = std::chrono::system_clock::now(); + /// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number + /// > + /// > ... The sequence number for a universe shall be incremented by one for + /// > every packet sent on that universe... + mUniverse->metadata_->sequence_number++; + + // sleep before the next cycle + request_.wait_for(mtx, sleep); + } +} + + +} // namespace sACN diff --git a/protocol/esta/sacn/universesender.h b/protocol/esta/sacn/universesender.h new file mode 100644 index 0000000..ea24aaa --- /dev/null +++ b/protocol/esta/sacn/universesender.h @@ -0,0 +1,88 @@ +/* + universesender.h + + Copyright (c) 2020 Kevin Matz (kevin.matz@gmail.com) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#pragma once + +#include "../dmx/universe.h" +#include +#include +#include + +namespace sACN { + +class Universe; + +/** + * @brief The UniverseSender class + */ +class UniverseSender +{ +public: + UniverseSender(Universe *); + ~UniverseSender(); + + bool isSending(); + + void flush(); + void kill(); + +private: + Universe * mUniverse; + + /// \cite sACN 6.2.6 E1.31 Data Packet: Options - Stream_Terminated + /// Three packets containing this bit ... shall be sent by sources upon terminating + /// sourcing of a universe. + int terminated_resend; + + /// > \cite sACN 6.6.1 Transmission Rate + /// > + /// > E1.31 sources shall not transmit packets for a given universe number + /// > at a rate which exceeds the maximum refresh rate specified in + /// > E1.11 \cite DMX + /// + /// > \cite DMX Table 6 - Timing Diagram Values - output of transmitting UART + /// > + /// > Minimum Update Time for 513 slots : 22.7ms = 22700µs + std::chrono::microseconds minimum_update_time; + + /// > \cite sACN 6.6.2 Null START Code Transmission Requirements in E1.31 + /// > Data Packets + /// > + /// > 1. Three packets containing the non-changing Property Values + /// > (corresponding to DMX512-A slot data) shall be sent before the + /// > initiation of transmission suppression. + DMX::DimmerData last_null_data; + unsigned int retransmission_count; + + /// > 2. Thereafter, a single keep-alive packet shall be transmitted at + /// > intervals of between 800mS and 1000mS. + std::chrono::milliseconds keep_alive_interval; + + bool enable_; + std::mutex control_mutex_; + std::condition_variable_any request_; + std::thread worker_; + void loop_(); +}; + +} // namespace sACN