1
0
Fork 0

refactor the discovery worker thread

This commit is contained in:
Kevin Matz 2022-11-26 12:48:02 -05:00
parent 11d202d51a
commit 3835cee9bd
2 changed files with 87 additions and 91 deletions

View File

@ -34,9 +34,7 @@ namespace sACN {
*/
Source::Source(UUID::uuid cid, std::string fctn)
: Component(cid, fctn)
, discovery_future_(discovery_exitSignal_.get_future())
, discovery_worker_(&Source::discovery_loop_, this)
, sendNullDiscovery(false)
{
}
@ -87,30 +85,89 @@ void Source::terminate(const uint16_t num)
/**
* @brief @cite sACN 12 Universe Discovery
*
* > E1.31 Universe Discovery enables other components on a network to know
* > which universes are being used to transmit data or synchronization
* > information. ... this specification requires any source intending to
* > comply with E1.31 to implement Universe Discovery...
* @brief Source::sendExtendedFrame
* @param vector
* @param header
* @param data
* @param ip
*/
void Source::discoveryAnnounce()
void Source::sendExtendedFrame(const uint16_t vector,
std::shared_ptr<ACN::PDU::pdu_header> header,
std::shared_ptr<ACN::PDU::pdu_data> data,
const ACN::SDT::UDP::ipAddress& ip)
{
/// > \cite sACN 12.1 Universe Discovery and Termination
/// >
/// > Any source that is no longer sending any universe data may stop
/// > sending E1.31 Universe Discovery Packets until such time that it
/// > resumes transmission of E1.31 Data and/or Synchronization information.
if (universes_.empty())
{
if (sendNullDiscovery)
sendNullDiscovery = false;
else
return;
}
else
sendNullDiscovery = true;
if (!(vector == VECTOR_E131_EXTENDED_DISCOVERY ||
vector == VECTOR_E131_EXTENDED_SYNCHRONIZATION))
return;
auto framepdu = std::make_shared<EXTENDED::Pdu>();
framepdu->setVector(vector);
framepdu->setHeader(header);
framepdu->setData(data);
rlpSendUdp(VECTOR_ROOT_E131_EXTENDED, framepdu, ip);
}
/**
* @brief Source::universe
* @param num
* @return
*/
std::shared_ptr<Universe> Source::universe(const uint16_t num)
{
if (!universes_.count(num))
return nullptr;
return universes_.at(num);
}
void Source::assignUserName(const std::string s)
{
ACN::Component::assignUserName(s);
for( auto & [_, universe] : universes_ )
universe->provenance()->source_name = name();
}
/**
* @brief Source::discovery_loop_
*/
void Source::discovery_loop_()
{
/// After terminating all universes, at least one empty universe
/// discovery message should be sent.
bool send_null_discovery = false;
auto future = discovery_exitSignal_.get_future();
while (future.wait_for(std::chrono::milliseconds(E131_UNIVERSE_DISCOVER_INTERVAL))
!= std::future_status::ready)
{
/// > \cite sACN 12.1 Universe Discovery and Termination
/// >
/// > Any source that is no longer sending any universe data may stop
/// > sending E1.31 Universe Discovery Packets until such time that it
/// > resumes transmission of E1.31 Data and/or Synchronization information.
if (universes_.empty())
{
if (send_null_discovery)
send_null_discovery = false;
else
continue;
}
else
send_null_discovery = true;
discovery_send_();
}
}
/**
* @brief Source::discovery_send_
*/
void Source::discovery_send_()
{
// framing layer header
auto frmheader = std::make_shared<EXTENDED::discovery_header>();
frmheader->source_name = name();
@ -180,62 +237,4 @@ void Source::discoveryAnnounce()
}
/**
* @brief Source::sendExtendedFrame
* @param vector
* @param header
* @param data
* @param ip
*/
void Source::sendExtendedFrame(const uint16_t vector,
std::shared_ptr<ACN::PDU::pdu_header> header,
std::shared_ptr<ACN::PDU::pdu_data> data,
const ACN::SDT::UDP::ipAddress& ip)
{
if (!(vector == VECTOR_E131_EXTENDED_DISCOVERY ||
vector == VECTOR_E131_EXTENDED_SYNCHRONIZATION))
return;
auto framepdu = std::make_shared<EXTENDED::Pdu>();
framepdu->setVector(vector);
framepdu->setHeader(header);
framepdu->setData(data);
rlpSendUdp(VECTOR_ROOT_E131_EXTENDED, framepdu, ip);
}
/**
* @brief Source::universe
* @param num
* @return
*/
std::shared_ptr<Universe> Source::universe(const uint16_t num)
{
if (!universes_.count(num))
return nullptr;
return universes_.at(num);
}
void Source::assignUserName(const std::string s)
{
ACN::Component::assignUserName(s);
for( auto & [_, universe] : universes_ )
universe->provenance()->source_name = name();
}
/**
* @brief Source::discovery_loop_
*/
void Source::discovery_loop_()
{
while (discovery_future_.wait_for(
std::chrono::milliseconds(E131_UNIVERSE_DISCOVER_INTERVAL))
== std::future_status::timeout)
discoveryAnnounce();
}
} // SACN

View File

@ -58,10 +58,7 @@ public:
// from ACN::Component
virtual void assignUserName(const std::string) override;
protected:
virtual void discoveryAnnounce();
void sendExtendedFrame(const uint16_t vector,
std::shared_ptr<ACN::PDU::pdu_header> header,
std::shared_ptr<ACN::PDU::pdu_data> data,
@ -80,14 +77,14 @@ private:
/// > the sequence number of an E1.31 Data Packet on that same universe.
std::unordered_map <uint16_t, uint8_t> sync_sequences_;
std::promise<void> discovery_exitSignal_;
std::future<void> discovery_future_;
std::thread discovery_worker_;
/// > \cite sACN 12 Universe Discovery
/// >
/// > ... this specification requires any source intending to comply with
/// > E1.31 to implement Universe Discovery...
std::promise<void> discovery_exitSignal_; //!< signal the discovery worker thread to end
std::thread discovery_worker_; //!< thread responsible for sending discovery
void discovery_loop_();
/// After terminating all universes, at least one empty universe
/// discovery message should be sent.
bool sendNullDiscovery;
void discovery_send_();
};