1
0
Fork 0

threaded worker loop for periodic discovery broadcast

This commit is contained in:
Kevin Matz 2021-08-30 13:59:08 -04:00
parent d421a5c577
commit 09b2210616
2 changed files with 48 additions and 8 deletions

View File

@ -33,11 +33,20 @@ namespace sACN {
*/
Source::Source(UUID::uuid cid)
: Component(cid)
, discovery_future_(discovery_exitSignal_.get_future())
, discovery_worker_(&Source::discovery_loop_, this)
{
fctn_ = "OpenLCP sACN Source";
}
Source::~Source()
{
discovery_exitSignal_.set_value();
discovery_worker_.join();
}
/**
* @brief Source::create
* @param num
@ -46,12 +55,15 @@ void Source::create(const uint16_t num)
{
if (universes_.count(num))
return;
universes_.emplace(num, new Universe());
auto metadata = std::shared_ptr<DATA::data_header>(new DATA::data_header());
metadata->source_name = this->name();
metadata->universe = num;
universes_mutext_.lock();
universes_.emplace(num, new Universe());
universes_[num]->setProvenance(metadata);
universes_mutext_.unlock();
}
@ -64,14 +76,11 @@ void Source::terminate(const uint16_t num)
if (!universes_.count(num))
return;
universes_mutext_.lock();
auto metadata = universes_[num]->provenance();
auto options = DATA::data_options();
options.force_synchronization = metadata->options.force_synchronization;
options.preview_data = metadata->options.preview_data;
options.stream_terminated = true;
metadata->options = options;
metadata->options.stream_terminated = true;
universes_[num]->setProvenance(metadata);
universes_mutext_.unlock();
}
@ -84,7 +93,9 @@ void Source::end(const uint16_t num)
if (!universes_.count(num))
return;
universes_mutext_.lock();
universes_.erase(num);
universes_mutext_.unlock();
}
@ -118,13 +129,17 @@ void Source::discoveryAnnounce()
// known universes
std::vector<EXTENDED::DISCOVERY::discoveredUniverse> list;
universes_mutext_.lock();
for (const auto & [num, univ] : universes_)
{
if (univ->provenance()->options.stream_terminated)
continue;
if (univ->destination.type != ACN::SDT::SDT_ADDR_NULL)
continue;
list.emplace_back(EXTENDED::DISCOVERY::discoveredUniverse());
list.back().universe = num;
}
universes_mutext_.unlock();
/// > \cite sACN 8.3 Page
/// >
@ -204,4 +219,17 @@ Universe * Source::universe(const uint16_t num)
return universes_.at(num);
}
/**
* @brief Source::discovery_loop_
*/
void Source::discovery_loop_()
{
while (discovery_future_.wait_for(
std::chrono::milliseconds(E131_UNIVERSE_DISCOVER_INTERVAL))
== std::future_status::timeout)
discoveryAnnounce();
}
} // SACN

View File

@ -27,7 +27,11 @@
#include "sacn.h"
#include "universe.h"
#include <future>
#include <mutex>
#include <unordered_map>
#include <thread>
namespace sACN {
@ -45,6 +49,8 @@ class Source
{
public:
Source(UUID::uuid = UUID::uuid());
~Source();
virtual void create(const uint16_t);
virtual void terminate(const uint16_t);
Universe * universe(const uint16_t);
@ -60,6 +66,7 @@ protected:
private:
std::unordered_map <uint16_t, Universe *> universes_;
std::mutex universes_mutext_;
/// > \cite sACN 6.2.5 E1.31 Data Packet: Sequence Number
/// >
@ -70,6 +77,11 @@ private:
/// > the sequence number of an E1.31 Data Packet on that same universe.
std::unordered_map <uint16_t, uint8_t> data_sequences_;
std::unordered_map <uint16_t, uint8_t> sync_sequences_;
std::promise<void> discovery_exitSignal_;
std::future<void> discovery_future_;
std::thread discovery_worker_;
void discovery_loop_();
};