425 lines
13 KiB
C++
425 lines
13 KiB
C++
/*
|
|
receiver.cpp
|
|
|
|
Copyright (c) 2020 Kevin Matz (kevin.matz@gmail.com)
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in all
|
|
copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
SOFTWARE.
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include "pdu.h"
|
|
#include "receiver.h"
|
|
|
|
namespace sACN {
|
|
|
|
/**
|
|
* @brief Receiver::Receiver
|
|
* @param cid
|
|
* @param fctn
|
|
*
|
|
* Constructor. Register RLP vector callbacks.
|
|
*/
|
|
Receiver::Receiver(UUID::uuid cid, std::string fctn)
|
|
: Component(cid, fctn)
|
|
, discovery_enabled_(false)
|
|
{
|
|
RlpRegisterVector(VECTOR_ROOT_E131_DATA, std::bind(&Receiver::dataReceiver,
|
|
this, std::placeholders::_1));
|
|
RlpRegisterVector(VECTOR_ROOT_E131_EXTENDED, std::bind(&Receiver::extendedReceiver,
|
|
this, std::placeholders::_1));
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::~Receiver
|
|
*/
|
|
Receiver::~Receiver()
|
|
{
|
|
for (auto& [num, _] : universes_)
|
|
Receiver::unsubscribe(num);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::subscribe
|
|
* @param num
|
|
*/
|
|
void Receiver::subscribe(const uint16_t num)
|
|
{
|
|
if (num == sACN::E131_DISCOVERY_UNIVERSE || universes_.count(num))
|
|
return;
|
|
universes_.emplace(num, std::make_shared<UniverseArbitrator>());
|
|
universes_.at(num)->expectedUniverse = num;
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::unsubscribe
|
|
* @param num
|
|
*/
|
|
void Receiver::unsubscribe(const uint16_t num)
|
|
{
|
|
if (universes_.count(num))
|
|
universes_.erase(num);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::universe
|
|
* @param num
|
|
* @return
|
|
*/
|
|
std::shared_ptr<Universe> Receiver::universe(const uint16_t num) const
|
|
{
|
|
if (!universes_.count(num))
|
|
return nullptr;
|
|
#ifdef RTTI_ENABLED
|
|
return std::dynamic_pointer_cast<Universe>(universes_.at(num));
|
|
#else
|
|
return std::static_pointer_cast<Universe>(universes_.at(num));
|
|
#endif
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::discoveryStart
|
|
*/
|
|
void Receiver::discoveryStart()
|
|
{
|
|
subscribe(sACN::E131_DISCOVERY_UNIVERSE);
|
|
discovery_enabled_ = true;
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::discoveryStop
|
|
*/
|
|
void Receiver::discoveryStop()
|
|
{
|
|
unsubscribe(sACN::E131_DISCOVERY_UNIVERSE);
|
|
discovery_enabled_ = false;
|
|
discovered.clear();
|
|
for (const auto& cb : discoveryCallbacks_)
|
|
cb();
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::onDiscovered
|
|
* @param cb something that wants to know about available universes.
|
|
*/
|
|
void Receiver::onDiscovered(const std::function<void()> cb)
|
|
{
|
|
discoveryCallbacks_.push_back(cb);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::dataReceiver - dispatcher of RLP DATA PDU
|
|
* @param root a shared pointer to the PDU
|
|
*
|
|
* Receive VECTOR_ROOT_E131_DATA vector'd packets.
|
|
*/
|
|
void Receiver::dataReceiver(ACN::PDU::Message<ACN::RLP::Pdu> root)
|
|
{
|
|
root->createDataBlock<DATA::Pdu>();
|
|
// a PDU::Block is guaranteed to have been instantiated, but if the input
|
|
// stream failed, it will not list any PDU. OK to loop without checking
|
|
// the state of the stream.
|
|
#ifdef RTTI_ENABLED
|
|
auto block = std::dynamic_pointer_cast<ACN::PDU::Block<DATA::Pdu>>(root->data());
|
|
#else
|
|
auto block = std::static_pointer_cast<ACN::PDU::Block<DATA::Pdu>>(root->data());
|
|
#endif
|
|
for(auto const &frame : *block->pdu)
|
|
{
|
|
/// > \cite sACN 6.2.1 E1.31 Data Packet: Vector
|
|
/// >
|
|
/// > Sources sending an E1.31 Data Packet shall set the E1.31 Layer's
|
|
/// > Vector to VECTOR_E131_DATA_PACKET. This value indicates that the
|
|
/// > E1.31 framing layer is wrapping a DMP PDU.
|
|
switch(frame->vector()) {
|
|
case VECTOR_E131_DATA_PACKET:
|
|
dataFrameHandler(frame);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::extendedReceiver - dispatcher of RLP EXTENDED PDU
|
|
* @param root a shared pointer to the PDU
|
|
*
|
|
* Receive VECTOR_ROOT_E131_EXTENDED vector'd packets.
|
|
*/
|
|
void Receiver::extendedReceiver(ACN::PDU::Message<ACN::RLP::Pdu> root)
|
|
{
|
|
root->createDataBlock<EXTENDED::Pdu>();
|
|
// a PDU::Block is guaranteed to have been instantiated, but if the input
|
|
// stream failed, it will not list any PDU. OK to loop without checking
|
|
// the state of the stream.
|
|
#ifdef RTTI_ENABLED
|
|
auto block = std::dynamic_pointer_cast<ACN::PDU::Block<EXTENDED::Pdu>>(root->data());
|
|
#else
|
|
auto block = std::static_pointer_cast<ACN::PDU::Block<EXTENDED::Pdu>>(root->data());
|
|
#endif
|
|
for(auto const &frame : *block->pdu)
|
|
{
|
|
switch(frame->vector()) {
|
|
/// \cite sACN 6.3 E1.31 Synchronization Packet Framing Layer
|
|
case VECTOR_E131_EXTENDED_SYNCHRONIZATION:
|
|
syncFrameHandler(frame);
|
|
break;
|
|
/// \cite sACN 6.4 E1.31 Universe Discovery Packet Framing Layer
|
|
case VECTOR_E131_EXTENDED_DISCOVERY:
|
|
if (discovery_enabled_)
|
|
discoveryFrameHandler(frame);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::dataFrameHandler
|
|
* @param frame
|
|
*
|
|
* Receive `VECTOR_ROOT_E131_DATA -> VECTOR_E131_DATA_PACKET` vector'd packets.
|
|
*
|
|
* Merging will be based on frame header. PDU data will be read as a block of
|
|
* DMP PDUs and passed to the universe.
|
|
*/
|
|
void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame)
|
|
{
|
|
#ifdef RTTI_ENABLED
|
|
auto metadata = std::dynamic_pointer_cast<DATA::data_header>(frame->header());
|
|
#else
|
|
auto metadata = std::static_pointer_cast<DATA::data_header>(frame->header());
|
|
#endif
|
|
|
|
if (!universes_.count(metadata->universe))
|
|
return; // not subscribed to this universe
|
|
|
|
auto universe = universes_.at(metadata->universe)->sourceUniverse(*metadata);
|
|
if (!universe)
|
|
universe = universes_.at(metadata->universe)->addNewSource(*metadata);
|
|
|
|
/// > \cite sACN 6.2.6 E1.31 Data Packet: Options
|
|
/// >
|
|
/// > Preview_Data: Bit 7 (most significant bit)
|
|
/// > This bit, when set to 1, indicates that the data in this packet is
|
|
/// > intended for use in visualization or media server preview applications
|
|
/// > and shall not be used to generate live output.
|
|
/// \bug Preview_Data flag causes the frame to be ignored.
|
|
if (metadata->options.preview_data)
|
|
return;
|
|
|
|
/// > \cite sACN 6.2.6 E1.31 Data Packet: Options
|
|
/// >
|
|
/// > Stream_Terminated: Bit 6
|
|
/// >
|
|
/// > allow E1.31 sources to terminate transmission of a stream or of
|
|
/// > universe synchronization without waiting for a timeout to occur.
|
|
if (metadata->options.stream_terminated)
|
|
{
|
|
/// Remove the terminated universe as sources.
|
|
/// The universe may keep itself, in a terminated state, for hold-last-look.
|
|
universes_[metadata->universe]->deleteSourceUniverse(*metadata);
|
|
/// > Any property values in an E1.31 Data Packet containing this bit
|
|
/// > shall be ignored.
|
|
return;
|
|
}
|
|
|
|
/// > \cite sACN 6.2.4.1 Synchronization Address Usage in an E1.31 Data Packet
|
|
/// >
|
|
/// > a value of 0 in the Synchronization Address indicates that the universe
|
|
/// > data is not synchronized.
|
|
if (metadata->sync_address == 0 ||
|
|
/// > \cite sACN 6.2.6 E1.31 Data Packet: Options Force_Synchronization
|
|
/// >
|
|
/// > This bit indicates whether to lock or revert to an unsynchronized
|
|
/// > state when synchronization is lost. When set to 0, components that
|
|
/// > had been operating in a synchronized state shall not update with
|
|
/// > any new packets until synchronization resumes. When set to 1, once
|
|
/// > synchronization has been lost, components that had been operating
|
|
/// > in a synchronized state need not wait for a new E1.31
|
|
/// > Synchronization Packet in order to update to the next E1.31 Data
|
|
/// > Packet.
|
|
(universe->isSyncronized() &&
|
|
metadata->options.force_synchronization &&
|
|
universe->rxRate() == 0))
|
|
{
|
|
universe->resetSynchronization();
|
|
}
|
|
else
|
|
{
|
|
subscribe(metadata->sync_address);
|
|
#ifdef RTTI_ENABLED
|
|
auto block = std::dynamic_pointer_cast<ACN::PDU::Block<ACN::DMP::Pdu>>(frame->data());
|
|
#else
|
|
auto block = std::static_pointer_cast<ACN::PDU::Block<ACN::DMP::Pdu>>(frame->data());
|
|
#endif
|
|
if (!block->pdu->empty())
|
|
{
|
|
#ifdef RTTI_ENABLED
|
|
auto dmp = std::dynamic_pointer_cast<ACN::DMP::Pdu>(block->pdu->front());
|
|
auto list = std::dynamic_pointer_cast<ACN::DMP::address_pair_list>(dmp->data());
|
|
#else
|
|
auto dmp = std::static_pointer_cast<ACN::DMP::Pdu>(block->pdu->front());
|
|
auto list = std::static_pointer_cast<ACN::DMP::address_pair_list>(dmp->data());
|
|
#endif
|
|
if (!list->properties.empty())
|
|
{
|
|
const auto & [_, data] = list->properties.front();
|
|
universe->setMetadata(metadata);
|
|
universe->setSyncData(data);
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
|
|
/// > \cite sACN 6.7.2 Sequence Numbering
|
|
/// >
|
|
/// > Having first received a packet with sequence number A,
|
|
/// > a second packet with sequence number B arrives.
|
|
/// > If, using signed 8-bit binary arithmetic, B - A
|
|
/// > is less than or equal to 0, but greater than -20,
|
|
/// > then the packet containing sequence number B shall be deemed out of
|
|
/// > sequence and discarded.
|
|
auto a = universe->metadata()->sequence_number;
|
|
auto b = metadata->sequence_number;
|
|
int dif = b - a;
|
|
if (dif <= 0 && dif > -20)
|
|
return; // too far out of sequence
|
|
|
|
// PDU data will be a block of DMP
|
|
#ifdef RTTI_ENABLED
|
|
auto block = std::dynamic_pointer_cast<ACN::PDU::Block<ACN::DMP::Pdu>>(frame->data());
|
|
#else
|
|
auto block = std::static_pointer_cast<ACN::PDU::Block<ACN::DMP::Pdu>>(frame->data());
|
|
#endif
|
|
universe->setMetadata(metadata);
|
|
universe->DmpReceiver(block);
|
|
|
|
// do ArbitratingUniverse maintence tasks
|
|
universes_.at(metadata->universe)->rxRate();
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::syncFrameHandler
|
|
* @param frame
|
|
*/
|
|
void Receiver::syncFrameHandler(ACN::PDU::Message<EXTENDED::Pdu> frame)
|
|
{
|
|
#ifdef RTTI_ENABLED
|
|
auto header = std::dynamic_pointer_cast<EXTENDED::sync_header>(frame->header());
|
|
#else
|
|
auto header = std::static_pointer_cast<EXTENDED::sync_header>(frame->header());
|
|
#endif
|
|
universes_.at(header->sync_address)->synchronize(header->sequence_number);
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::discoveryFrameHandler
|
|
* @param frame
|
|
*/
|
|
void Receiver::discoveryFrameHandler(ACN::PDU::Message<EXTENDED::Pdu> frame) {
|
|
#ifdef RTTI_ENABLED
|
|
auto block = std::dynamic_pointer_cast<ACN::PDU::Block<EXTENDED::DISCOVERY::Pdu>>(frame->data());
|
|
#else
|
|
auto block = std::static_pointer_cast<ACN::PDU::Block<EXTENDED::DISCOVERY::Pdu>>(frame->data());
|
|
#endif
|
|
for(auto const &pdu : *block->pdu)
|
|
{
|
|
/// > \cite sACN 8 Universe Discovery Layer
|
|
/// >
|
|
/// > Universe Discovery data only appears in E1.31 Universe Discovery
|
|
/// > Packets and shall not be included in E1.31 Data Packets or E1.31
|
|
/// > Synchronization Packets.
|
|
switch(pdu->vector()) {
|
|
case VECTOR_UNIVERSE_DISCOVERY_UNIVERSE_LIST:
|
|
discoveryListHanlder(pdu);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @brief Receiver::discoveryListHanlder
|
|
* @param pdu
|
|
*/
|
|
void Receiver::discoveryListHanlder(ACN::PDU::Message<EXTENDED::DISCOVERY::Pdu> pdu) {
|
|
#ifdef RTTI_ENABLED
|
|
auto rlpHeader = std::dynamic_pointer_cast<ACN::RLP::rlp_header>
|
|
(pdu->parent()->parent()->header());
|
|
auto frameHeader = std::dynamic_pointer_cast<EXTENDED::discovery_header>
|
|
(pdu->parent()->header());
|
|
auto header = std::dynamic_pointer_cast<EXTENDED::DISCOVERY::discovery_list_header>
|
|
(pdu->header());
|
|
auto data = std::dynamic_pointer_cast<EXTENDED::DISCOVERY::discovery_list_data>
|
|
(pdu->data());
|
|
#else
|
|
auto rlpHeader = std::static_pointer_cast<ACN::RLP::rlp_header>
|
|
(pdu->parent()->parent()->header());
|
|
auto frameHeader = std::static_pointer_cast<EXTENDED::discovery_header>
|
|
(pdu->parent()->header());
|
|
auto header = std::static_pointer_cast<EXTENDED::DISCOVERY::discovery_list_header>
|
|
(pdu->header());
|
|
auto data = std::static_pointer_cast<EXTENDED::DISCOVERY::discovery_list_data>
|
|
(pdu->data());
|
|
#endif
|
|
|
|
// on the first page:
|
|
if (header->page == 0)
|
|
// wipe all known discoveries from this CID
|
|
for (auto disc = discovered.begin(); disc != discovered.end();)
|
|
{
|
|
if (disc->get()->source == rlpHeader->cid)
|
|
disc = discovered.erase(disc);
|
|
else
|
|
++disc;
|
|
}
|
|
|
|
for (auto& found : data->found)
|
|
{
|
|
found->source = rlpHeader->cid;
|
|
found->description = frameHeader->source_name;
|
|
discovered.insert(found);
|
|
}
|
|
|
|
// on the last page:
|
|
if (header->page == header->last_page)
|
|
// notify that changes are complete
|
|
for (const auto& cb : discoveryCallbacks_)
|
|
cb();
|
|
}
|
|
|
|
|
|
}; // SACN
|