1
0
Fork 0

adapt to the DMP rx/tx mechanism

This commit is contained in:
Kevin Matz 2022-12-08 18:05:33 -05:00
parent f9896ff5f3
commit d67b3b8f40
5 changed files with 119 additions and 131 deletions

View File

@ -198,38 +198,19 @@ bool ArbitratingUniverse::isSyncronized()
/**
* @brief ArbitratingUniverse::provenance
* @brief ArbitratingUniverse::metadata
* @return
*/
std::shared_ptr<DATA::data_header> ArbitratingUniverse::provenance()
std::shared_ptr<DATA::data_header> ArbitratingUniverse::metadata()
{
auto universe = dominant_();
if (universe)
return universe->metadata();
auto prov = std::make_shared<DATA::data_header>();
prov->universe = expectedUniverse;
prov->priority = 255; // invalid
return prov;
}
/**
* @brief ArbitratingUniverse::set
* @param pdu
* @param src
*/
void ArbitratingUniverse::set(std::shared_ptr<ACN::DMP::Pdu> pdu,
std::shared_ptr<DATA::data_header> src)
{
if (expectedUniverse != 0 &&
src->universe != expectedUniverse)
return;
if (!sources_.count(*src))
addNewSource(*src);
sources_.at(*src)->set(pdu, src);
auto metadata = std::make_shared<DATA::data_header>();
metadata->universe = expectedUniverse;
metadata->priority = 255; // invalid
return metadata;
}
@ -302,6 +283,17 @@ double ArbitratingUniverse::rxRate()
}
/**
* @brief ArbitratingUniverse::rxDmpSetProperty
* @param message
*/
void ArbitratingUniverse::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
(void)message;
}
/**
* @brief MergeProxyUniverse::dominant_
* @return

View File

@ -68,7 +68,6 @@ public:
void dataChangedNotifier(DMX::Universe* universe);
// SACN::Universe overrides:
void set(std::shared_ptr<ACN::DMP::Pdu>, std::shared_ptr<DATA::data_header>) override;
virtual uint8_t status() override;
std::shared_ptr<DATA::data_header> metadata() override;
@ -94,6 +93,9 @@ public:
protected:
void doListChangeCallbacks();
// ACN::DMP::Component overrides
virtual void rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu>) override;
private:
std::unordered_map<DATA::data_header, std::shared_ptr<Universe>> sources_;
std::vector<std::shared_ptr<void>> source_data_tokens; //!< source universe data change tokens

View File

@ -202,7 +202,8 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame) {
auto metadata = std::static_pointer_cast<DATA::data_header>(frame->header());
if (!universes_.count(metadata->universe))
return;
return; // not subscribed to this universe
auto universe = universes_.at(metadata->universe)->sourceUniverse(*metadata);
if (!universe)
universe = universes_.at(metadata->universe)->addNewSource(*metadata);
@ -213,7 +214,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame) {
/// > This bit, when set to 1, indicates that the data in this packet is
/// > intended for use in visualization or media server preview applications
/// > and shall not be used to generate live output.
/// \bug Preview_Data flag is ignored
/// \bug Preview_Data flag causes the frame to be ignored.
if (metadata->options.preview_data)
return;
@ -294,22 +295,7 @@ void Receiver::dataFrameHandler(ACN::PDU::Message<DATA::Pdu> frame) {
// PDU data will be a block of DMP
auto block = std::static_pointer_cast<ACN::PDU::Block<ACN::DMP::Pdu>>(frame->data());
universe->setMetadata(metadata);
for (const auto & dmp : *block->pdu)
{
/// > \cite sACN 7.2 DMP Layer: Vector
/// >
/// > The DMP Layer's Vector shall be set to VECTOR_DMP_SET_PROPERTY,
/// > which indicates a DMP Set Property message by sources. Receivers
/// > shall discard the packet if the received value is not
/// > VECTOR_DMP_SET_PROPERTY.
switch(dmp->vector()) {
case ACN::DMP::SET_PROPERTY:
universe->set(dmp, metadata);
break;
default:
break;
}
}
universe->DmpReceiver(block);
}

View File

@ -42,6 +42,9 @@ Universe::Universe(Source* src)
, tx_worker_(&Universe::tx_loop_, this)
{
destination.type = ACN::SDT::SDT_ADDR_NULL;
// add ourself as a sender of our own DMP messages
addDmpSender(std::bind(&Universe::dataFrameSender, this, std::placeholders::_1));
}
@ -66,29 +69,36 @@ Universe::~Universe()
/**
* @brief Universe::set
* @param dmp
* @param source
* @brief Universe::rxDmpGetProperty
* @param message
*/
void Universe::set(ACN::PDU::Message<ACN::DMP::Pdu> dmp,
std::shared_ptr<DATA::data_header> source)
void Universe::rxDmpGetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
(void)message;
}
/**
* @brief Universe::rxDmpSetProperty
* @param message
*/
void Universe::rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
// only act on the first property pair in the data
if (!message->data())
return;
auto set_data = std::static_pointer_cast<ACN::DMP::address_pair_list>(message->data());
if (set_data->properties.empty())
return;
const auto& [range, data] = set_data->properties.front();
/// > \cite sACN 7.3 Address Type and Data Type
/// >
/// > Sources shall set the DMP Layer's Address Type and Data Type to 0xa1.
/// > Receivers shall discard the packet if the received value is not 0xa1.
if (!dmp->header())
return;
auto type = std::static_pointer_cast<ACN::DMP::address_type>(dmp->header());
if (type->relative) return;
if (type->data_type != ACN::DMP::ARRAY) return;
if (type->address_length != ACN::DMP::TWO) return;
// only act on the first property pair in the data
if (!dmp->data())
return;
auto set_data = std::static_pointer_cast<ACN::DMP::address_pair_list>(dmp->data());
const auto& [range, data] = set_data->properties.front();
if (range.isRelative()) return;
if (range.type() != ACN::DMP::ARRAY) return;
if (range.elementLength() != ACN::DMP::TWO) return;
/// > \cite sACN 7.4 First Property Address
/// >
@ -108,49 +118,28 @@ void Universe::set(ACN::PDU::Message<ACN::DMP::Pdu> dmp,
/// >
/// > The DMP Layer's Property Value Count is used to encode the number of
/// > DMX512-A [DMX] Slots (including the START Code slot).
if (range.count < 1 || range.count > 513)
if (range.count < 1 || range.count > null_start_data.size())
return;
active_data_slots_ = range.address + range.count;
setProvenance(source);
if (/// > \cite sACN 6.2.4.1 If a receiver is presented with an E1.31 Data
/// >
/// > Packet containing a Synchronization Address of 0, it shall discard
/// > any data waiting to be processed and immediately act on that Data
/// > Packet.
(source->sync_address == 0) ||
/// > \cite sACN 6.2.6 E1.31 Data Packet: Options Force_Synchronization
/// >
/// > This bit indicates whether to lock or revert to an unsynchronized
/// > state when synchronization is lost. When set to 0, components that
/// > had been operating in a synchronized state shall not update with
/// > any new packets until synchronization resumes. When set to 1, once
/// > synchronization has been lost, components that had been operating
/// > in a synchronized state need not wait for a new E1.31
/// > Synchronization Packet in order to update to the next E1.31 Data
/// > Packet.
(isSyncronized() && source->options.force_synchronization && rxRate() == 0))
{
if (sync_data_)
{
delete sync_data_;
sync_data_ = nullptr;
}
/// > \cite sACN 7.7 Property Values (DMX512-A Data)
/// >
/// > The DMP Layer's Property values field is used to encode the
/// > DMX512-A [DMX] START Code and data.
DMX::Universe::setData(data);
}
else
sync_data_ = new std::vector<uint8_t>(data);
/// > \cite sACN 7.7 Property Values (DMX512-A Data)
/// >
/// > The DMP Layer's Property values field is used to encode the
/// > DMX512-A [DMX] START Code and data.
DMX::Universe::setData(data);
}
/**
* @brief Universe::rxDmpGetProperty
* @param message
*/
void Universe::rxDmpSubscribe(ACN::PDU::Message<ACN::DMP::Pdu> message)
{
(void)message;
}
/**
* @brief Universe::setMetadata
* @param metadata
@ -332,49 +321,59 @@ void Universe::resetSynchronization()
/**
* @brief Universe::setSyncData
* @param data
* @brief Universe::sACNsend
*/
void Universe::setSyncData(const std::vector<uint8_t> & data)
void Universe::sACNsend() const
{
resetSynchronization();
sync_data_ = new std::vector<uint8_t>(data);
}
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
/// >
/// > ... The sequence number for a universe shall be incremented by one for
/// > every packet sent on that universe...
metadata_->sequence_number++;
// header
auto addrtyp = std::make_shared<ACN::DMP::address_type>();
addrtyp->address_length = ACN::DMP::TWO;
addrtyp->x_reserved = 0;
addrtyp->data_type = ACN::DMP::ARRAY;
addrtyp->relative = false;
addrtyp->z_reserved = true; // buy why? Its in the standard...
// property range
ACN::DMP::Range pr(*addrtyp);
pr.address = 0;
pr.incriment = 1;
pr.count = (active_data_slots_ <= 513 ? active_data_slots_ : 513);
/**
* @brief Universe::as_DMP_
* @return
*/
ACN::PDU::Message<ACN::DMP::Pdu> Universe::as_DMP_() const
{
// header
auto addrtyp = std::make_shared<ACN::DMP::address_type>();
addrtyp->address_length = ACN::DMP::TWO;
addrtyp->x_reserved = 0;
addrtyp->data_type = ACN::DMP::ARRAY;
addrtyp->relative = false;
addrtyp->z_reserved = true; // buy why? Its in the standard...
// property data
std::vector<octet> pd;
std::copy(null_start_data.begin(), null_start_data.begin() + pr.count,
std::back_inserter(pd));
// property range
ACN::DMP::Range pr(*addrtyp);
pr.address = 0;
pr.incriment = 1;
pr.count = active_data_slots_ < null_start_data.size() ? active_data_slots_ : null_start_data.size();
// data segment
auto addrlst = std::make_shared<ACN::DMP::address_pair_list>(*addrtyp);
addrlst->properties.emplace_back(ACN::DMP::address_data_pair(pr, pd));
// property data
std::vector<octet> pd;
std::copy_n(null_start_data.begin(), pr.count, std::back_inserter(pd));
// DMP layer
auto dmp = std::make_shared<ACN::DMP::Pdu>();
dmp->setVector(ACN::DMP::SET_PROPERTY);
dmp->setHeader(addrtyp);
dmp->setData(addrlst);
// data segment
auto addrlst = std::make_shared<ACN::DMP::address_pair_list>(*addrtyp);
addrlst->properties.emplace_back(ACN::DMP::address_data_pair(pr, pd));
// DMP layer
auto dmp = std::make_shared<ACN::DMP::Pdu>();
dmp->setVector(ACN::DMP::SET_PROPERTY);
dmp->setHeader(addrtyp);
dmp->setData(addrlst);
return dmp;
}
/**
* @brief Universe::dataFrameSender
* @param dmp
*/
void Universe::dataFrameSender(ACN::PDU::Message<ACN::DMP::Pdu> dmp) const
{
// sACN Framing Layer
auto frame = std::make_shared<DATA::Pdu>();
frame->setVector(VECTOR_E131_DATA_PACKET);
@ -481,8 +480,13 @@ void Universe::tx_loop_()
}
// send the sACN message
sACNsend();
sendDMP(as_DMP_());
last_updated_ = std::chrono::system_clock::now();
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
/// >
/// > ... The sequence number for a universe shall be incremented by one for
/// > every packet sent on that universe...
metadata_->sequence_number++;
// sleep before the next cycle
tx_request_.wait_for(mtx, sleep);

View File

@ -53,8 +53,6 @@ public:
Universe(Source* = nullptr);
~Universe();
virtual void set(ACN::PDU::Message<ACN::DMP::Pdu>, std::shared_ptr<DATA::data_header>);
virtual std::shared_ptr<DATA::data_header> metadata();
virtual void setMetadata(std::shared_ptr<DATA::data_header>);
@ -97,7 +95,12 @@ public:
};
protected:
void sACNsend() const;
void dataFrameSender(ACN::PDU::Message<ACN::DMP::Pdu>) const;
// ACN::DMP::Component overrides
virtual void rxDmpGetProperty(ACN::PDU::Message<ACN::DMP::Pdu>) override;
virtual void rxDmpSetProperty(ACN::PDU::Message<ACN::DMP::Pdu>) override;
virtual void rxDmpSubscribe(ACN::PDU::Message<ACN::DMP::Pdu>) override;
private:
std::shared_ptr<DATA::data_header> metadata_;
@ -127,12 +130,13 @@ private:
* > the sequence number of an E1.31 Data Packet on that same universe.
*/
uint8_t sync_sequence_;
std::condition_variable_any tx_request_;
std::mutex tx_control_mutex_;
bool tx_enable_;
std::thread tx_worker_;
void tx_loop_();
ACN::PDU::Message<ACN::DMP::Pdu> as_DMP_() const;
};
} // SACN namespace