From e7afe82857ac3ccc3e10b40cee60ea94cc59232b Mon Sep 17 00:00:00 2001 From: rtk0c Date: Sat, 24 Apr 2021 17:11:44 -0700 Subject: Second draft of graph rebuild - Change working set approach to basic BFS with dependency (input pins) counting - TODO convert evaluator to use depths --- core/src/Model/Workflow/Evaluation.cpp | 78 ++------------- core/src/Model/Workflow/Workflow.cpp | 174 +++++++++++++++++++++------------ core/src/Model/Workflow/Workflow.hpp | 35 +++++-- core/src/Utils/Macros.hpp | 2 +- core/src/Utils/Vector.hpp | 6 ++ 5 files changed, 158 insertions(+), 137 deletions(-) create mode 100644 core/src/Utils/Vector.hpp (limited to 'core/src') diff --git a/core/src/Model/Workflow/Evaluation.cpp b/core/src/Model/Workflow/Evaluation.cpp index 111d34e..db09973 100644 --- a/core/src/Model/Workflow/Evaluation.cpp +++ b/core/src/Model/Workflow/Evaluation.cpp @@ -55,89 +55,31 @@ void WorkflowEvaluationContext::SetConnectionValue(const WorkflowNode::OutputPin } void WorkflowEvaluationContext::Run() { - std::queue candidates; // Stores index to nodes int evaluatedCount = 0; int erroredCount = 0; - // Evaluate all the input nodes first - for (size_t i = 0; i < mRuntimeNodes.size(); ++i) { - if (mWorkflow->mNodes[i]->GetType() == WorkflowNode::InputType) { - candidates.push(i); - } - } - - auto AddOutputsToCandidates = [&](size_t idx) { - auto& node = *mWorkflow->mNodes[idx]; - auto& rNode = mRuntimeNodes[idx]; - for (auto& pin : node.mOutputs) { - if (!pin.IsConnected()) continue; - // TODO support the other variant - if (pin.GetSupportedDirection() != WorkflowConnection::OneToMany) continue; - - auto& rConn = mRuntimeConnections[pin.Connection]; - auto& conn = mWorkflow->mConnections[pin.Connection]; - if (rConn.IsAvailableValue()) { - for (WorkflowConnection::ConnectionPoint& cp : conn.MultiConnections) { - if (rNode.Status != EvaluationStatus::Unevaluated) { - candidates.push(cp.Node); - } - } - } - } - }; - auto FindCandidates = [&]() { - for (size_t i = 0; i < mWorkflow->mNodes.size(); ++i) { - auto& node = mWorkflow->mNodes[i]; - auto& rNode = mRuntimeNodes[i]; - - if (rNode.Status != EvaluationStatus::Unevaluated) { - continue; - } - - for (auto& pin : node->mInputs) { - if (!pin.IsConnected()) continue; - - auto& rConn = mRuntimeConnections[pin.Connection]; - if (!rConn.IsAvailableValue()) { - goto skip; - } - } - - candidates.push(i); + for (auto& depthGroup : mWorkflow->GetDepthGroups()) { + for (size_t idx : depthGroup) { + auto& rn = mRuntimeNodes[idx]; + auto& n = *mWorkflow->mNodes[idx]; - skip: - continue; - } - }; - - while (true) { - while (!candidates.empty()) { - auto idx = candidates.front(); - auto& node = *mWorkflow->mNodes[idx]; - auto& rNode = mRuntimeNodes[idx]; - candidates.pop(); + // TODO int preEvalErrors = mErrors.size(); - node.Evaluate(*this); + n.Evaluate(*this); if (preEvalErrors != mErrors.size()) { erroredCount++; } else { evaluatedCount++; - AddOutputsToCandidates(idx); } } - - if (evaluatedCount + erroredCount >= mRuntimeNodes.size()) { - break; - } - - // Candidates empty, but there are still possibly-evaluable nodes - FindCandidates(); } for (size_t i = 0; i < mRuntimeNodes.size(); ++i) { - if (mWorkflow->mNodes[i]->GetType() == WorkflowNode::OutputType) { - // TODO + auto& rn = mRuntimeNodes[i]; + auto& n = *mWorkflow->mNodes[i]; + if (n.GetType() == WorkflowNode::OutputType) { + // TODO record outputs } } } diff --git a/core/src/Model/Workflow/Workflow.cpp b/core/src/Model/Workflow/Workflow.cpp index 7afcf0b..a1af44a 100644 --- a/core/src/Model/Workflow/Workflow.cpp +++ b/core/src/Model/Workflow/Workflow.cpp @@ -78,6 +78,14 @@ WorkflowNode::WorkflowNode(Type type, Kind kind) , mDepth{ -1 } { } +Vec2i WorkflowNode::GetPosition() const { + return mPosition; +} + +void WorkflowNode::SetPosition(const Vec2i& position) { + mPosition = position; +} + size_t WorkflowNode::GetId() const { return mId; } @@ -90,6 +98,10 @@ WorkflowNode::Kind WorkflowNode::GetKind() const { return mKind; } +int WorkflowNode::GetDepth() const { + return mDepth; +} + void WorkflowNode::ConnectInput(int nodeId, WorkflowNode& output, int outputNodeId) { mWorkflow->Connect(*this, nodeId, output, outputNodeId); } @@ -236,6 +248,25 @@ void Workflow::RemoveStep(size_t id) { step->mId = WorkflowNode::kInvalidId; } +void Workflow::RemoveConnection(size_t id) { + auto& conn = mConnections[id]; + if (!conn.IsValid()) return; + + for (auto& point : conn.GetSourcePoints()) { + auto& node = *mNodes[point.Node]; + auto& pin = node.mOutputs[point.Pin]; + pin.Connection = WorkflowNode::kInvalidId; + } + for (auto& point : conn.GetDestinationPoints()) { + auto& node = *mNodes[point.Node]; + auto& pin = node.mInputs[point.Pin]; + pin.Connection = WorkflowNode::kInvalidId; + } + + conn = {}; + mDepthsDirty = true; +} + bool Workflow::Connect(WorkflowNode& sourceNode, int sourcePin, WorkflowNode& destinationNode, int destinationPin) { auto& src = sourceNode.mOutputs[sourcePin]; auto& dst = destinationNode.mInputs[destinationPin]; @@ -304,6 +335,8 @@ bool Workflow::Connect(WorkflowNode& sourceNode, int sourcePin, WorkflowNode& de return true; } +// TODO cleanup these two implementation + bool Workflow::DisconnectBySource(WorkflowNode& sourceNode, int sourcePin) { auto& sp = sourceNode.mOutputs[sourcePin]; if (!sp.IsConnected()) return false; @@ -318,7 +351,7 @@ bool Workflow::DisconnectBySource(WorkflowNode& sourceNode, int sourcePin) { auto& vec = conn.MultiConnections; vec.erase(std::remove(vec.begin(), vec.end(), Pt{ sourceNode.GetId(), sourcePin }), vec.end()); - sp.Connection = WorkflowConnection::kInvalidId; + sp.Connection = WorkflowNode::kInvalidId; } break; case WorkflowConnection::OneToMany: { @@ -326,9 +359,9 @@ bool Workflow::DisconnectBySource(WorkflowNode& sourceNode, int sourcePin) { for (auto& pt : conn.MultiConnections) { auto& node = *mNodes[pt.Node]; - node.mInputs[pt.Pin].Connection = WorkflowConnection::kInvalidId; + node.mInputs[pt.Pin].Connection = WorkflowNode::kInvalidId; } - sp.Connection = WorkflowConnection::kInvalidId; + sp.Connection = WorkflowNode::kInvalidId; conn = {}; } break; @@ -343,7 +376,7 @@ bool Workflow::DisconnectByDestination(WorkflowNode& destinationNode, int destin if (!dp.IsConnected()) return false; if (dp.IsConstantConnection()) { dp.ConnectionToConst = false; - dp.Connection = WorkflowConnection::kInvalidId; + dp.Connection = WorkflowNode::kInvalidId; return true; } @@ -356,9 +389,9 @@ bool Workflow::DisconnectByDestination(WorkflowNode& destinationNode, int destin for (auto& pt : conn.MultiConnections) { auto& node = *mNodes[pt.Node]; - node.mOutputs[pt.Pin].Connection = WorkflowConnection::kInvalidId; + node.mOutputs[pt.Pin].Connection = WorkflowNode::kInvalidId; } - dp.Connection = WorkflowConnection::kInvalidId; + dp.Connection = WorkflowNode::kInvalidId; conn = {}; } break; @@ -369,7 +402,7 @@ bool Workflow::DisconnectByDestination(WorkflowNode& destinationNode, int destin auto& vec = conn.MultiConnections; vec.erase(std::remove(vec.begin(), vec.end(), Pt{ destinationNode.GetId(), destinationPin }), vec.end()); - dp.Connection = WorkflowConnection::kInvalidId; + dp.Connection = WorkflowNode::kInvalidId; } break; } @@ -377,63 +410,76 @@ bool Workflow::DisconnectByDestination(WorkflowNode& destinationNode, int destin return true; } -void Workflow::RemoveConnection(size_t id) { - auto& conn = mConnections[id]; - if (!conn.IsValid()) return; - - switch (conn.ConnectionDirection) { - case WorkflowConnection::ManyToOne: { - - } break; - case WorkflowConnection::OneToMany: { - - } break; - } - - // TODO - - mDepthsDirty = true; +const std::vector>& Workflow::GetDepthGroups() const { + return mDepthGroups; } bool Workflow::DoesDepthNeedsUpdate() const { return mDepthsDirty; } -Workflow::GraphUpdateResult Workflow::UpdateGraph() { +Workflow::GraphUpdateResult Workflow::UpdateGraph(bool getInfo) { + if (!mDepthsDirty) { + return GraphUpdate_NoWorkToDo{}; + } + // Terminology: - // - Input pin <=> dependency nodes + // - Dependency = nodes its input pins are connected to + // - Dependents = nodes its output pins are connected to struct WorkingNode { // The max depth out of all dependency nodes, maintained during the traversal and committed as the actual depth - // when all dependencies of this node has been resolved + // when all dependencies of this node has been resolved. Add 1 to get the depth that will be assigned to the node. int MaximumDepth = 0; int FulfilledInputCount = 0; }; + std::vector workingNodes; - tsl::robin_set workingSet; // The set of valid (known to has depth) nodes, used for iterating - tsl::robin_set backWorkingSet; // The set of valid nodes built while iterating `workingSet`, and will be moved into it after done iterating + std::queue q; + + // Check if all dependencies of this node is satisfied + auto CheckNodeDependencies = [&](WorkflowNode& node) -> bool { + for (auto& pin : node.mInputs) { + if (!pin.IsConnected()) { + return false; + } + } + return true; + }; workingNodes.reserve(mNodes.size()); - for (size_t i = 0; i < mNodes.size(); ++i) { - auto& node = mNodes[i]; - workingNodes.push_back(WorkingNode{}); + { + std::vector unsatisfiedNodes; + for (size_t i = 0; i < mNodes.size(); ++i) { + auto& node = mNodes[i]; + workingNodes.push_back(WorkingNode{}); + + if (!node) continue; - if (!node) continue; - node->mDepth = -1; + if (!CheckNodeDependencies(*node)) { + if (getInfo) unsatisfiedNodes.push_back(i); + } - // Start traversing with the input nodes - if (node->GetType() == WorkflowNode::InputType) { - workingSet.insert(i); + node->mDepth = -1; + + // Start traversing with the input nodes + if (node->GetType() == WorkflowNode::InputType) { + q.push(i); + } + } + + if (!unsatisfiedNodes.empty()) { + return GraphUpdate_UnsatisfiedDependencies{ std::move(unsatisfiedNodes) }; } } - auto HasCyclicReference = [&](WorkflowNode* node) -> bool { + auto HasCyclicReference = [&](WorkflowNode& node) -> bool { // TODO return false; }; - auto AddOutputsToWorkingSet = [&](WorkflowNode* node) -> void { - for (auto& pin : node->mOutputs) { + auto ProcessNode = [&](WorkflowNode& node) -> void { + for (auto& pin : node.mOutputs) { if (!pin.IsConnected()) continue; auto& conn = mConnections[pin.Connection]; @@ -441,48 +487,54 @@ Workflow::GraphUpdateResult Workflow::UpdateGraph() { auto& wn = workingNodes[point.Node]; auto& n = *mNodes[point.Node].get(); - wn.FulfilledInputCount++; - if (HasCyclicReference(n)) { // TODO break; } - // Fulfilled node + wn.FulfilledInputCount++; + wn.MaximumDepth = std::max(node.mDepth, wn.MaximumDepth); + + // Node's dependency is fulfilled, we can process its dependents next // We use >= here because for a many-to-one pin, the dependency is an "or" relation ship, i.e. any of the nodes firing before this will fulfill the requirement - // TODO calc depth based on previous dependencies if (n.mInputs.size() >= wn.FulfilledInputCount) { - backWorkingSet.insert(point.Node); + n.mDepth = wn.MaximumDepth + 1; } } } }; int processedNodes = 0; - int currentDepth = 0; - while (true) { - for (size_t idx : workingSet) { - auto& wn = workingNodes[idx]; - auto& n = *mNodes[idx]; - if (n.mInputs.size() == wn.FulfilledInputCount) { - n.mDepth = currentDepth; - - AddOutputsToWorkingSet(n); - processedNodes++; - } + while (!q.empty()) { + auto& wn = workingNodes[q.front()]; + auto& n = *mNodes[q.front()]; + q.pop(); + processedNodes++; + + ProcessNode(n); + } + + if (processedNodes < mNodes.size()) { + // There is unreachable nodes + if (!getInfo) { + return GraphUpdate_UnreachableNodes{}; } - workingSet = std::move(backWorkingSet); - backWorkingSet.clear(); + std::vector unreachableNodes; + for (size_t i = 0; i < mNodes.size(); ++i) { + auto& wn = workingNodes[i]; + auto& n = *mNodes[i]; - currentDepth++; + // This is a reachable node + if (n.mDepth != -1) continue; - if (processedNodes == mNodes.size()) { - break; + unreachableNodes.push_back(i); } + + return GraphUpdate_UnreachableNodes{ std::move(unreachableNodes) }; } - return GraphUpdateResult::Success; + return GraphUpdate_Success{}; } std::pair Workflow::AllocWorkflowConnection() { diff --git a/core/src/Model/Workflow/Workflow.hpp b/core/src/Model/Workflow/Workflow.hpp index bfed722..9c02168 100644 --- a/core/src/Model/Workflow/Workflow.hpp +++ b/core/src/Model/Workflow/Workflow.hpp @@ -1,6 +1,7 @@ #pragma once -#include "Value.hpp" +#include "Model/Workflow/Value.hpp" +#include "Utils/Vector.hpp" #include "cplt_fwd.hpp" #include @@ -9,6 +10,7 @@ #include #include #include +#include #include class WorkflowConnection { @@ -97,6 +99,7 @@ protected: size_t mId; std::vector mInputs; std::vector mOutputs; + Vec2i mPosition; Type mType; Kind mKind; int mDepth; @@ -110,9 +113,13 @@ public: WorkflowNode(WorkflowNode&&) = default; WorkflowNode& operator=(WorkflowNode&&) = default; + Vec2i GetPosition() const; + void SetPosition(const Vec2i& position); + size_t GetId() const; Type GetType() const; Kind GetKind() const; + int GetDepth() const; void ConnectInput(int nodeId, WorkflowNode& output, int outputNodeId); void DisconnectInput(int nodeId); @@ -145,6 +152,7 @@ private: std::vector mConnections; std::vector> mNodes; std::vector> mConstants; + std::vector> mDepthGroups; bool mDepthsDirty = true; public: @@ -155,19 +163,32 @@ public: void AddStep(std::unique_ptr step); void RemoveStep(size_t id); + void RemoveConnection(size_t id); + bool Connect(WorkflowNode& sourceNode, int sourcePin, WorkflowNode& destinationNode, int destinationPin); bool DisconnectBySource(WorkflowNode& sourceNode, int sourcePin); bool DisconnectByDestination(WorkflowNode& destinationNode, int destinationPin); - void RemoveConnection(size_t id); - + const std::vector>& GetDepthGroups() const; bool DoesDepthNeedsUpdate() const; - enum GraphUpdateResult { - Success, - CyclicReference, + struct GraphUpdate_Success {}; + struct GraphUpdate_NoWorkToDo {}; + struct GraphUpdate_UnsatisfiedDependencies { + std::vector UnsatisfiedNodes; + }; + struct GraphUpdate_UnreachableNodes { + std::vector UnreachableNodes; }; - GraphUpdateResult UpdateGraph(); + + using GraphUpdateResult = std::variant< + GraphUpdate_Success, + GraphUpdate_NoWorkToDo, + GraphUpdate_UnsatisfiedDependencies, + GraphUpdate_UnreachableNodes>; + + /// When `getInfo == false, the corresponding error code is returned but without/with empty payloads. + GraphUpdateResult UpdateGraph(bool getInfo = true); private: std::pair AllocWorkflowConnection(); diff --git a/core/src/Utils/Macros.hpp b/core/src/Utils/Macros.hpp index ab846f4..3ba8261 100644 --- a/core/src/Utils/Macros.hpp +++ b/core/src/Utils/Macros.hpp @@ -3,7 +3,7 @@ #define CONCAT_IMPL(a, b) a##b #define CONCAT(a, b) CONCAT_IMPL(a, b) -#define UNIQUE_NAME(prefix) CONCAT(prefix, __LINE__) +#define UNIQUE_NAME(prefix) CONCAT(prefix, __COUNTER__) #if defined(_MSC_VER) # define UNREACHABLE __assume(false) diff --git a/core/src/Utils/Vector.hpp b/core/src/Utils/Vector.hpp new file mode 100644 index 0000000..372d484 --- /dev/null +++ b/core/src/Utils/Vector.hpp @@ -0,0 +1,6 @@ +#pragma once + +struct Vec2i { + int x = 0; + int y = 0; +}; -- cgit v1.2.3-70-g09d2