254 lines
7.0 KiB
C++
254 lines
7.0 KiB
C++
/*
|
|
universesender.cpp
|
|
|
|
Copyright (c) 2020 Kevin Matz (kevin.matz@gmail.com)
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all
|
|
copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
SOFTWARE.
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include "universesender.h"
|
|
#include "universe.h"
|
|
#include "source.h"
|
|
|
|
#include <shared_mutex>
|
|
|
|
namespace sACN {
|
|
|
|
/**
|
|
* @brief UniverseSender::UniverseSender
|
|
* @param source
|
|
* @param universe
|
|
*/
|
|
UniverseSender::UniverseSender(Source * source, Universe * universe)
|
|
: mSource(source)
|
|
, mUniverse(universe)
|
|
, terminated_resend(3)
|
|
, minimum_update_time(22700)
|
|
, retransmission_count(0)
|
|
, keep_alive_interval(800)
|
|
, enabled_(true)
|
|
, last_data_({0})
|
|
, dmp_(std::make_shared<ACN::DMP::Pdu>())
|
|
, frame_(std::make_shared<DATA::Pdu>())
|
|
{
|
|
// add ourself as a sender for DMP messages
|
|
mUniverse->addDmpSender(std::bind(&UniverseSender::dataFrameSender, this,
|
|
std::placeholders::_1));
|
|
|
|
// header segment
|
|
auto addrtyp = std::make_shared<ACN::DMP::address_type>();
|
|
addrtyp->address_length = ACN::DMP::TWO;
|
|
addrtyp->x_reserved = 0;
|
|
addrtyp->data_type = ACN::DMP::ARRAY;
|
|
addrtyp->relative = false;
|
|
addrtyp->z_reserved = true; // buy why? Its in the standard...
|
|
|
|
// DMP layer
|
|
dmp_->setVector(ACN::DMP::SET_PROPERTY);
|
|
dmp_->setHeader(addrtyp);
|
|
|
|
// sACN framing layer
|
|
frame_->setVector(VECTOR_E131_DATA_PACKET);
|
|
|
|
// start the thread after setting up the PDU
|
|
worker_ = std::thread(&UniverseSender::loop_, this);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief UniverseSender::~UniverseSender
|
|
*/
|
|
UniverseSender::~UniverseSender()
|
|
{
|
|
if (isSending())
|
|
{
|
|
kill();
|
|
worker_.join();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief UniverseSender::isSending
|
|
* @return
|
|
*/
|
|
bool UniverseSender::isSending() const
|
|
{
|
|
std::shared_lock lk_ctl(mtx_control);
|
|
return (worker_.joinable());
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Wake up and send a frame of sACN.
|
|
*
|
|
*
|
|
*/
|
|
void UniverseSender::flush()
|
|
{
|
|
std::shared_lock lk_ctl(mtx_control);
|
|
request_.notify_all();
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief UniverseSender::kill
|
|
*/
|
|
void UniverseSender::kill()
|
|
{
|
|
{
|
|
std::unique_lock lk_ctl(mtx_control);
|
|
enabled_ = false;
|
|
}
|
|
flush();
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief UniverseSender::loop_
|
|
*/
|
|
void UniverseSender::loop_()
|
|
{
|
|
bool new_data;
|
|
std::chrono::nanoseconds elapsed;
|
|
std::chrono::microseconds sleep;
|
|
std::unique_lock lk_thread(mtx_thread_);
|
|
|
|
update_dmp_(); // initial data segment
|
|
|
|
while (enabled_ || terminated_resend > 0)
|
|
{
|
|
// enforce strict minimum update times
|
|
elapsed = std::chrono::system_clock::now() - mUniverse->last_updated_;
|
|
if (elapsed < minimum_update_time)
|
|
std::this_thread::sleep_for(minimum_update_time - elapsed);
|
|
|
|
if (enabled_)
|
|
{
|
|
mUniverse->setStatus(Universe::DMX_ACTIVE);
|
|
std::shared_lock lk_data(mUniverse->mtx_data);
|
|
new_data = (mUniverse->null_start_data != last_data_);
|
|
if (new_data)
|
|
{
|
|
last_data_ = mUniverse->null_start_data;
|
|
retransmission_count = 0;
|
|
update_dmp_();
|
|
}
|
|
if (++retransmission_count > 3)
|
|
{
|
|
retransmission_count = 3; // prevent counter from overflowing
|
|
sleep = keep_alive_interval;
|
|
}
|
|
else
|
|
sleep = minimum_update_time;
|
|
}
|
|
else
|
|
{
|
|
mUniverse->setStatus(Universe::sACN_TERMINATED); // note the changed operating status
|
|
std::unique_lock lk_univ(mUniverse->mtx_control);
|
|
mUniverse->metadata_->options.stream_terminated = true; // set the stream_terminated bit
|
|
std::unique_lock lk_ctl(mtx_control);
|
|
--terminated_resend; // stream_terminated must be sent 3 times
|
|
sleep = minimum_update_time; // stay throttled up through the termination sequence
|
|
}
|
|
|
|
// send the sACN message
|
|
mUniverse->sendDMP(dmp_);
|
|
{
|
|
std::unique_lock lk_ctl(mUniverse->mtx_control);
|
|
mUniverse->last_updated_ = std::chrono::system_clock::now();
|
|
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
|
|
/// >
|
|
/// > ... The sequence number for a universe shall be incremented by one for
|
|
/// > every packet sent on that universe...
|
|
mUniverse->metadata_->sequence_number++;
|
|
}
|
|
|
|
// sleep before the next cycle
|
|
request_.wait_for(lk_thread, sleep);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief UniverseSender::update_dmp_
|
|
*/
|
|
void UniverseSender::update_dmp_()
|
|
{
|
|
std::unique_lock lk_ctl(mtx_control);
|
|
|
|
// header segment
|
|
#ifdef RTTI_ENABLED
|
|
auto addrtyp = std::dynamic_pointer_cast<ACN::DMP::address_type>(dmp_->header());
|
|
#else
|
|
auto addrtyp = std::static_pointer_cast<ACN::DMP::address_type>(dmp_->header());
|
|
#endif
|
|
|
|
// property range
|
|
ACN::DMP::Range pr(*addrtyp);
|
|
pr.address = 0;
|
|
pr.incriment = 1;
|
|
pr.count = mUniverse->activeSlots() < mUniverse->null_start_data.size()
|
|
? mUniverse->activeSlots()
|
|
: mUniverse->null_start_data.size();
|
|
|
|
// property data
|
|
std::vector<octet> pd;
|
|
{
|
|
std::shared_lock lk_data(mUniverse->mtx_data);
|
|
std::copy_n(mUniverse->null_start_data.begin(), pr.count, std::back_inserter(pd));
|
|
}
|
|
|
|
// data segment
|
|
auto addrlst = std::make_shared<ACN::DMP::address_pair_list>(*addrtyp);
|
|
addrlst->properties.emplace_back(ACN::DMP::address_data_pair(pr, pd));
|
|
dmp_->setData(addrlst);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Universe::dataFrameSender
|
|
* @param dmp
|
|
*/
|
|
void UniverseSender::dataFrameSender(ACN::PDU::Message<ACN::DMP::Pdu> dmp) const
|
|
{
|
|
std::shared_lock lk_ctl(mtx_control);
|
|
std::shared_lock lk_univ(mUniverse->mtx_control);
|
|
|
|
// sACN Framing Layer
|
|
frame_->setHeader(mUniverse->metadata_);
|
|
frame_->setData(dmp);
|
|
|
|
// send
|
|
switch (mUniverse->destination.type)
|
|
{
|
|
case ACN::SDT::SDT_ADDR_NULL:
|
|
mSource->rlpSendUdp(VECTOR_ROOT_E131_DATA, frame_,
|
|
IPv4MulticastAddress(mUniverse->metadata_->universe));
|
|
mSource->rlpSendUdp(VECTOR_ROOT_E131_DATA, frame_,
|
|
IPv6MulticastAddress(mUniverse->metadata_->universe));
|
|
break;
|
|
default:
|
|
mSource->rlpSendUdp(VECTOR_ROOT_E131_DATA, frame_, mUniverse->destination);
|
|
}
|
|
}
|
|
|
|
} // namespace sACN
|