Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions Viewer/ecflowUI/src/VMirrorAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

VMirrorAttrType::VMirrorAttrType()
: VAttributeType("mirror") {
dataCount_ = 9;
dataCount_ = 10;
searchKeyToData_["mirror_name"] = NameIndex;
searchKeyToData_["mirror_remote_path"] = RemotePathIndex;
searchKeyToData_["mirror_remote_host"] = RemoteHostIndex;
Expand All @@ -38,10 +38,11 @@ QString VMirrorAttrType::toolTip(QStringList d) const {
t += "<b>Remote Port:</b> " + d[RemotePortIndex] + "<br>";
t += "<b>Polling:</b> " + d[PollingIndex] + "<br>";
t += "<b>SSL:</b> " + d[SslIndex] + "<br>";
t += "<b>Auth:</b> " + d[AuthIndex];
t += "<b>Auth:</b> " + d[AuthIndex] + "<br>";
if (const auto& reason = d[ReasonIndex]; !reason.isEmpty()) {
t += "<br><b>Reason:</b> <span style=\"color:red\">" + d[ReasonIndex] + "</span>";
t += "<br><b>Reason:</b> <span style=\"color:red\">" + d[ReasonIndex] + "</span><br>";
}
t += "<b>Propagate:</b> " + d[PropagateIndex];
}
return t;
}
Expand All @@ -57,15 +58,16 @@ QString VMirrorAttrType::definition(QStringList d) const {

void VMirrorAttrType::encode(const ecf::MirrorAttr& mirror, QStringList& data, bool firstLine) const {

data << qName_ // TypeIndex
<< QString::fromStdString(mirror.name()) // NameIndex
<< QString::fromStdString(mirror.remote_path()) // RemotePathIndex
<< QString::fromStdString(mirror.remote_host()) // RemoteHostIndex
<< QString::fromStdString(mirror.remote_port()) // RemotePortIndex
<< QString::fromStdString(mirror.polling()) // PollingIndex
<< QString::fromStdString(mirror.ssl() ? "true" : "false") // SslIndex
<< QString::fromStdString(mirror.auth()) // AuthIndex
<< QString::fromStdString(mirror.reason()); // ReasonIndex
data << qName_ // TypeIndex
<< QString::fromStdString(mirror.name()) // NameIndex
<< QString::fromStdString(mirror.remote_path()) // RemotePathIndex
<< QString::fromStdString(mirror.remote_host()) // RemoteHostIndex
<< QString::fromStdString(mirror.remote_port()) // RemotePortIndex
<< QString::fromStdString(mirror.polling()) // PollingIndex
<< QString::fromStdString(mirror.ssl() ? "true" : "false") // SslIndex
<< QString::fromStdString(mirror.auth()) // AuthIndex
<< QString::fromStdString(mirror.reason()) // ReasonIndex
<< QString::fromStdString(mirror.propagate() ? "true" : "false"); // PropagateIndex
}

void VMirrorAttrType::encode_empty(QStringList& data) const {
Expand Down
3 changes: 2 additions & 1 deletion Viewer/ecflowUI/src/VMirrorAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class VMirrorAttrType : public VAttributeType {
PollingIndex = 5,
SslIndex = 6,
AuthIndex = 7,
ReasonIndex = 8
ReasonIndex = 8,
PropagateIndex = 9
};
};

Expand Down
3 changes: 2 additions & 1 deletion docs/glossary.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,7 @@ Glossary
as this may lead to undesired behaviour, including deadlocks.

`Only one mirror attribute is allowed per node`, and each attribute is
defined by the following properties:
defined by the following options:

- :code:`name`, an identifier
- :code:`remote_path`, the path of the node on the remote ecFlow server
Expand All @@ -1698,6 +1698,7 @@ Glossary
- :code:`ssl`, to connect to the ecFlow server using SSL
- :code:`polling`, the value (in seconds) used to periodically contact the remote ecFlow server
- :code:`auth`, the location to the Mirror authentication credentials file
- :code:`propagate`, to enable the propagation of the remote node state up the tree of local nodes

.. warning::

Expand Down
8 changes: 7 additions & 1 deletion docs/python_api/reference/MirrorAttr.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ Returns the name of the Mirror attribute
.. py:method:: MirrorAttr.polling(self: ecflow.MirrorAttr) -> str
:module: ecflow

Returns the polling interval used to contact the remove ecFlow server
Returns the polling interval used to contact the remote ecFlow server


.. py:method:: MirrorAttr.propagate(self: ecflow.MirrorAttr) -> bool
:module: ecflow

Returns a boolean, where true means the Node state is propagated up the Node tree


.. py:method:: MirrorAttr.remote_host(self: ecflow.MirrorAttr) -> str
Expand Down
11 changes: 11 additions & 0 deletions docs/ug/cookbook/how_to_mirror_a_remote_task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ at :term:`suite` level:
Variables that are referred in trigger expressions *must* be defined, as a placeholder
for variables that eventually get synchronised.

.. note::

By default, only the mirrored task's own state is updated; parent families and triggers
are unaffected. To propagate the mirrored state up the local node tree and cause dependent
triggers to fire, add the ``--propagate`` option::

mirror --name A --remote_path /s1/f1/t1 ... --ssl --propagate

Use ``--propagate`` only when you explicitly want state changes on the mirrored task to
drive the evaluation of triggers on its parent families.

Deploy the Suite with a `mirrored` Task
=======================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ This defines a :term:`mirror` attribute, which synchronizes the status of a
:term:`node` between a local and a remote ecFlow server. The options defining
the attribute can be provided in any order.

The :term:`mirror` attribute is defined by the coordinates to the remote :term:`node` to be mirrored, including the
remote :term:`node` path, the remote host name, the remote port number; and the polling interval in seconds.
Furthermore, the option :code:`--ssl` enables the use of a Secure connection and the option :code:`--auth` can be used
to provide the authentication credentials for the remote ecFlow server.

.. code-block:: shell

task t1
mirror --name A --remote_path /s/f/tA --remote_host host --remote_port 3141 --polling 20 --ssl --auth /path/to/auth.json
mirror --name A --remote_path /s/f/tA --remote_host host --remote_port 3141 --polling 300 --ssl --auth /path/to/auth.json

task t2
aviso --name B --remote_path /s/f/tB
Expand All @@ -20,6 +25,25 @@ the attribute can be provided in any order.
# --polling %ECF_MIRROR_POLLING%
# --auth %ECF_MIRROR_AUTH%


By default, the :term:`mirror` attribute will not propagate the state of the remote :term:`node` to the local node tree,
and only the actual associated local :term:`node` will see its state updated.
The option :code:`--propagate` can be used to enable the propagation of the remote :term:`node` state to the local node
tree.

.. code-block:: shell

task t1
mirror --name A --remote_path /s/f/tA --remote_host host --remote_port 3141 --polling 300 --propagate


.. note::
The ``--polling`` value (in seconds) controls how often the local ecFlow
server contacts the remote server. Lower values reduce latency but
have impact on the remote server as the network traffic increases.
A value of 300 seconds is considered suitable for most cases, and a minimum
of 60 seconds being imposed to avoid overloading the remote server.

The Authentication credentials, provided via option :code:`--auth`, are
provided in a `JSON` file with the following content:

Expand Down
1 change: 1 addition & 0 deletions libs/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ set(test_srcs
test/TestJobProfiler.cpp
test/TestLimit.cpp
test/TestMigration.cpp
test/TestMirrorAttr.cpp
test/TestMissNextTimeSlot.cpp
test/TestMovePeer.cpp
test/TestNodeAlgorithms.cpp
Expand Down
46 changes: 30 additions & 16 deletions libs/node/src/ecflow/node/MirrorAttr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ MirrorAttr::MirrorAttr(Node* parent,
polling_t polling,
flag_t ssl,
auth_t auth,
reason_t reason)
reason_t reason,
flag_t propagate)
: parent_{parent},
name_{std::move(name)},
remote_path_{std::move(remote_path)},
Expand All @@ -43,6 +44,7 @@ MirrorAttr::MirrorAttr(Node* parent,
ssl_{ssl},
auth_{std::move(auth)},
reason_{std::move(reason)},
propagate_{propagate},
controller_{nullptr} {
if (!is_valid_name(name_)) {
throw ecf::InvalidArgument(ecf::Message("Invalid MirrorAttr name :", name_));
Expand All @@ -61,7 +63,12 @@ MirrorAttr::~MirrorAttr() {
}

std::string MirrorAttr::absolute_name() const {
return parent_->absNodePath() + ':' + name_;
if (parent_) {
return parent_->absNodePath() + ':' + name_;
}
else {
return "(detached):" + name_;
}
}

bool MirrorAttr::why(std::string& theReasonWhy) const {
Expand Down Expand Up @@ -103,17 +110,23 @@ void MirrorAttr::mirror() {

// Notifications found -- Node state to be updated or error to be reported
std::visit(ecf::overload{[this](const service::mirror::MirrorNotification& notification) {
auto latest_state = static_cast<NState::State>(notification.data().state);
auto latest_state = static_cast<NState::State>(notification.data().state);
auto latest_state_name = NState::toString(latest_state);

SLOG(D,
"MirrorAttr: Updating Mirror attribute (name: " << name_ << ") to state "
<< latest_state);
SLOG(T,
"MirrorAttr: Sync Mirror '"
<< absolute_name() << "' with {path: '" << remote_path_ << "', host: '"
<< remote_host_ << "', port: '" << remote_port_ << "', ssl: '" << ssl_
<< "'} to state '" << latest_state_name << "'");

// ** Node State
reason_ = "";
parent_->get_flag().clear(Flag::REMOTE_ERROR);
parent_->get_flag().set_state_change_no(state_change_no_);
parent_->setStateOnly(latest_state, true);
if (propagate_) {
parent_->set_most_significant_state_up_node_tree();
}

// ** Node Variables
std::vector<Variable> all_variables = notification.data().regular_variables;
Expand All @@ -134,7 +147,7 @@ void MirrorAttr::mirror() {
parent_->replace_events(notification.data().events);
},
[this](const service::mirror::MirrorError& error) {
SLOG(D,
SLOG(T,
"MirrorAttr: Failure detected on Mirror attribute (name: "
<< name_ << ") due to " << error.reason());
reason_ = error.reason();
Expand Down Expand Up @@ -227,7 +240,7 @@ void MirrorAttr::start_controller() {
auto polling = resolve_cfg(polling_, default_polling, fallback_polling);
auto auth = resolve_cfg(auth_, default_remote_auth, fallback_remote_auth);

SLOG(D,
SLOG(T,
"MirrorAttr: start polling Mirror attribute '" << absolute_name() << "', from " << remote_path_ << " @ "
<< remote_host << ':' << remote_port
<< ") using polling: " << polling << " s");
Expand All @@ -251,8 +264,8 @@ void MirrorAttr::start_controller() {

// Controller -- start up the Mirror controller, and configure the Mirror request
controller_ = std::make_shared<controller_t>();
controller_->subscribe(
ecf::service::mirror::MirrorRequest{remote_path_, remote_host, remote_port, polling_value, ssl_, auth});
controller_->subscribe(ecf::service::mirror::MirrorRequest{
absolute_name(), remote_path_, remote_host, remote_port, polling_value, ssl_, auth});
// Controller -- effectively start the Mirror process
// n.b. this must be done after subscribing in the controller, so that the polling interval is set
controller_->start();
Expand All @@ -261,10 +274,9 @@ void MirrorAttr::start_controller() {

void MirrorAttr::stop_controller() {
if (controller_) {
SLOG(D,
"MirrorAttr: finishing polling for Mirror attribute \"" << parent_->absNodePath() << ":" << name_
<< "\", from host: " << remote_host_
<< ", port: " << remote_port_ << ")");
SLOG(T,
"MirrorAttr: finishing polling for Mirror attribute \""
<< absolute_name() << "\", from host: " << remote_host_ << ", port: " << remote_port_ << ")");

controller_->stop();
controller_.reset();
Expand All @@ -275,7 +287,7 @@ bool operator==(const MirrorAttr& lhs, const MirrorAttr& rhs) {
return lhs.name() == rhs.name() && lhs.remote_path() == rhs.remote_path() &&
lhs.remote_host() == rhs.remote_host() && lhs.remote_port() == rhs.remote_port() &&
lhs.polling() == rhs.polling() && lhs.ssl() == rhs.ssl() && lhs.auth() == rhs.auth() &&
lhs.reason() == rhs.reason();
lhs.reason() == rhs.reason() && lhs.propagate() == rhs.propagate();
}

std::string to_python_string(const MirrorAttr& mirror) {
Expand All @@ -292,7 +304,9 @@ std::string to_python_string(const MirrorAttr& mirror) {
s += ", polling=";
s += mirror.polling();
s += ", ssl=";
s += mirror.ssl();
s += mirror.ssl() ? "1" : "0";
s += ", propagate=";
s += mirror.propagate() ? "1" : "0";
s += ", auth=";
s += mirror.auth();
s += ", reason=";
Expand Down
20 changes: 13 additions & 7 deletions libs/node/src/ecflow/node/MirrorAttr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
#include <string>

#include "ecflow/core/Log.hpp"
#include "ecflow/core/Serialization.hpp"
#include "ecflow/core/Str.hpp"
#include "ecflow/service/mirror/MirrorService.hpp"

namespace cereal {
class access;
}

class Node;

namespace ecf {
Expand Down Expand Up @@ -55,6 +52,7 @@ class MirrorAttr {
static constexpr const char* default_remote_auth = "%ECF_MIRROR_REMOTE_AUTH%";

// Fallback option values, used when the variables providing default values are not defined
static constexpr const char* fallback_remote_host = "localhost";
static constexpr const char* fallback_remote_port = "3141";
static constexpr const char* fallback_polling = "120";
static constexpr const char* fallback_remote_auth = "";
Expand All @@ -78,7 +76,8 @@ class MirrorAttr {
polling_t polling,
flag_t ssl,
auth_t auth,
reason_t reason);
reason_t reason,
flag_t propagate);

MirrorAttr(const MirrorAttr& rhs) = default;
~MirrorAttr();
Expand All @@ -97,9 +96,10 @@ class MirrorAttr {
[[nodiscard]] inline flag_t ssl() const { return ssl_; }
[[nodiscard]] inline const std::string& auth() const { return auth_; }
[[nodiscard]] inline const std::string& reason() const { return reason_; }
[[nodiscard]] inline flag_t propagate() const { return propagate_; }

[[nodiscard]] inline std::string resolved_remote_host() const {
return resolve_cfg(remote_host_, default_remote_host, fallback_remote_port);
return resolve_cfg(remote_host_, default_remote_host, fallback_remote_host);
}
[[nodiscard]] inline std::string resolved_remote_port() const {
return resolve_cfg(remote_port_, default_remote_port, fallback_remote_port);
Expand Down Expand Up @@ -146,9 +146,10 @@ class MirrorAttr {
remote_host_t remote_host_;
remote_port_t remote_port_;
polling_t polling_;
flag_t ssl_;
flag_t ssl_ = false;
auth_t auth_;
reason_t reason_;
flag_t propagate_ = false;

unsigned int state_change_no_{0}; // *not* persisted, only used on the server side

Expand All @@ -171,8 +172,13 @@ void serialize(Archive& ar, MirrorAttr& mirror, [[maybe_unused]] std::uint32_t v
ar & mirror.ssl_;
ar & mirror.auth_;
ar & mirror.reason_;
if (version >= 1) {
ar & mirror.propagate_;
}
}

} // namespace ecf

CEREAL_CLASS_VERSION(ecf::MirrorAttr, 1);

#endif /* ecflow_node_MirrorAttr_HPP */
3 changes: 3 additions & 0 deletions libs/node/src/ecflow/node/formatter/MirrorFormatter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ struct Formatter<MirrorAttr, Stream>
output << " --reason ";
output << item.reason();
}
if (item.propagate()) {
output << " --propagate";
}
}
};

Expand Down
4 changes: 3 additions & 1 deletion libs/node/src/ecflow/node/parser/MirrorParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ ecf::MirrorAttr MirrorParser::parse_mirror_line(const std::string& line, Node* p
description.add_options()(option_remote_auth,
po::value<std::string>()->default_value(ecf::MirrorAttr::default_remote_auth));
description.add_options()(option_reason, po::value<std::string>()->default_value(""));
description.add_options()(option_propagate, "Propagate State up the node tree");

po::parsed_options parsed_options = po::command_line_parser(tokens).options(description).run();

Expand All @@ -94,8 +95,9 @@ ecf::MirrorAttr MirrorParser::parse_mirror_line(const std::string& line, Node* p
auto ssl = get_option_value<ecf::MirrorAttr::flag_t>(vm, option_ssl, line);
auto auth = get_option_value<ecf::MirrorAttr::auth_t>(vm, option_remote_auth, line);
auto reason = get_option_value<ecf::MirrorAttr::reason_t>(vm, option_reason, line);
auto propagate = get_option_value<ecf::MirrorAttr::flag_t>(vm, option_propagate, line);

return ecf::MirrorAttr{parent, name, ecflow_path, ecflow_host, ecflow_port, polling, ssl, auth, reason};
return ecf::MirrorAttr{parent, name, ecflow_path, ecflow_host, ecflow_port, polling, ssl, auth, reason, propagate};
}

bool MirrorParser::doParse(const std::string& line, std::vector<std::string>& lineTokens) {
Expand Down
1 change: 1 addition & 0 deletions libs/node/src/ecflow/node/parser/MirrorParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MirrorParser : public Parser {
static constexpr const char* option_ssl = "ssl";
static constexpr const char* option_remote_auth = "remote_auth";
static constexpr const char* option_reason = "reason";
static constexpr const char* option_propagate = "propagate";

static ecf::MirrorAttr parse_mirror_line(const std::string& line);
static ecf::MirrorAttr parse_mirror_line(const std::string& line, const std::string& name);
Expand Down
Loading
Loading