零代码平台设计
在明确业务问题后,进一步选择了图结构建模、可达性分析、SCC 强连通分量、token 账本和事件队列调度等方法,将流程执行从旧的递归式模块调用,改造成 Runtime 统一调度的状态机模型。图结构校验时,普通可达性问题通过栈式图遍历解决。不要再把流程运行理解成“模块互相调用”,而是理解成“一个 Runtime 在调度一批带上下文的事件”。它不负责执行流程,也不负责找下一个节点,而是定义 Runti

零代码平台涉及 顺序 / 选择 / 循环的功能。
这次 FlowV2 Runtime 的改造,主要是为了解决旧流程运行方式在复杂图结构下不可靠的问题。
旧逻辑更像是“模块执行完以后直接通知下一个模块”,适合简单单链路流程。但是一旦流程里出现以下情况,就会很容易出问题:
- 一个 Start 后面分出多条分支;
- 多条分支最后汇合到同一个模块;
- 判断模块只选择其中一条输出,其余分支不能真的执行;
- 循环流程中同一个模块会被多轮反复触发;
- Tips、Delay、位置控制、归零、子流程等模块不是立即结束,而是要等待外部回调;
- 图表模块这类节点只消费数据,不一定要继续连 EndTool;
- EndTool 收到一次信号不代表整个主流程结束,必须等所有活跃分支都结束。
因此单纯替换某几个模块调用是无法达成设想的需求的。考虑将问题从“模块互相通知”抽象为“有向图上的状态流转”。在明确业务问题后,进一步选择了图结构建模、可达性分析、SCC 强连通分量、token 账本和事件队列调度等方法,将流程执行从旧的递归式模块调用,改造成 Runtime 统一调度的状态机模型。
所以这次的核心思路是:
不要再把流程运行理解成“模块互相调用”,而是理解成“一个 Runtime 在调度一批带上下文的事件”。
也就是说,每次有信号从一条边流向一个节点,Runtime 都会把它包装成一个事件,然后统一进入事件队列,由 Runtime 判断:
- - 这个节点现在能不能执行;
- - 输入是不是到齐;
- - 当前信号属于哪一次运行;
- - 属于哪条分支 token;
- - 属于哪一轮循环;
- - 属于哪个子流程 frame;
- - 如果是未选中分支,是否只做账本传播;
- - 如果是异步节点,是否要暂停当前 token;
- - 如果到达终点,是否真的可以结束整个流程。
2. 总体设计思路
FlowV2 的整体设计是先将旧流程对象“拓印”为一张有向图。图结构校验时,普通可达性问题通过栈式图遍历解决。
例如:判断节点主要通过DFS/BFS识别是否能从 Start 到达、是否最终能到达 EndTool 或 chart 这类 terminal sink;循环区域则主要通过 SCC(强连通分量)识别。
也就是说,DFS/BFS 这类遍历用于回答“路径是否可达”,SCC 用于回答“哪些节点处在同一个循环区域里”。
真正运行时的 LoopBack 不再完全依赖静态预标记,而是 Runtime 根据当前 token 的 nodePath 动态判断是否回到已访问节点。
FlowV2 的整体设计可以拆成四层:
text
旧流程图对象
↓
FlowV2Graph:把旧模块和连线转换成统一图结构
↓
FlowRuntimeState:记录一次运行中的 token / frame / join / pending 等账本
↓
FlowV2Runtime:用事件队列驱动流程执行
↓
FlowV2Executor / legacyObject->executeFlowV2:真正执行插件逻辑
其中最关键的是三件事:
2.1 用 Graph 描述“流程长什么样”
`FlowV2Graph` 不负责执行模块,只负责保存节点和边。
它关心的问题是:
- - 有哪些节点;
- - 有哪些边;
- - 每条边的 source / target / port 是什么;
- - Start 在哪里;
- - End 在哪里;
- - 哪些节点可以作为终点;
- - 图结构是否合法;
- - 节点是否从 Start 可达;
- - 节点是否最终能到达某个终点;
- - 是否存在循环区域。
简单来说,Graph 是流程的静态结构。
2.2 用 RuntimeState 记录“当前跑到哪里了”
`FlowRuntimeState` 是账本。
它不负责找下一个节点,也不负责执行插件,而是记录本次运行的所有上下文。
这里最重要的几个概念是:
| 概念 | 作用 |
| ------------- | ---------------------- |
| runId | 区分第几次点击运行,避免旧回调污染新流程 |
| tokenId | 表示一条正在流程图上移动的执行路径 |
| parentTokenId | 表示当前 token 是从哪条父路径分出来的 |
| tokenGroupId | 表示同一次分叉产生的一组兄弟 token |
| frameId | 表示当前属于哪个子流程 / 递归栈帧 |
| joinBatchId | 表示当前是哪一次汇合批次 |
| iteration | 表示当前循环轮次 |
| pendingJoins | 记录多输入节点当前还在等哪些输入 |
| tokens | 记录所有执行路径的状态 |
| frames | 记录主流程 / 子流程调用栈 |
| leases | 预留给异步、子流程、循环生命周期控制 |
这些字段的目的,是让 Runtime 不再只看“哪个节点发来了信号”,而是能判断:
- 这个信号属于哪一次运行?
- 属于哪条分支?
- 属于哪一轮循环?
- 属于哪个子流程?
- 是否和其他输入属于同一次汇合?
尤其是 `joinBatchId + iteration`,是为了解决循环场景下的汇合错位问题。
例如:
A 分支循环 3 次
B 分支只执行 1 次
A 和 B 最后汇合到 C
如果没有 `joinBatchId` 和 `iteration`,C 很容易把 A 第 2 轮的输入和 B 第 1 轮的输入错误地凑成一组。
现在每一轮循环都会创建新的 `joinBatchId`,所以不同轮次不会混在一起。
2.3 用 Runtime 统一调度“事件怎么流”
`FlowV2Runtime` 是真正的调度器。
它不让模块自己随便调下一个模块,而是统一通过:
enqueueEvent()
runEventLoop()
dispatchEvent()
prepareNodeExecution()
executeNode()
completeNode()
这一套流程向下推进。
这样做的好处是:
1. 分支可以同时存在,但每条分支都有自己的 token;
2. 多输入节点不会早执行;
3. 判断模块未选中分支不会真的执行插件;
4. 异步节点可以暂停当前 token,不阻塞其他 token;
5. 循环可以生成新 token 和新 joinBatch;
6. EndTool / chart 这类终点可以统一收口;
7. 主流程结束时可以判断所有分支是否都结束,而不是谁先到 EndTool 谁就结束。
------------------------------------------------------------------------------------------------------------------
FlowRunTimeState 是 FlowV2 的运行账本定义文件。
它不负责执行流程,也不负责找下一个节点,而是定义 Runtime 运行过程中需要保存的所有状态结构。
可以把这个文件理解为:FlowV2 的“状态表结构定义”
#pragma once
#include <QString>
#include <QVariant>
#include <QMap>
#include <QDateTime>
#include <QtGlobal>
#include <tuple>
/**
* @brief 流程信号状态
*
* 注意:
* 这些状态只表示“当前信号”的语义,
* 不能单独作为主流程是否结束的依据。
*/
enum class FlowSignalStatus
{
Unknown = 0,
// 正常执行完成,可以继续触发下游模块
Complete = 1,
// 当前路径执行失败
Failed = 2,
// 当前路径本轮不执行,只做账
Ignore = 3,
// 节点已经启动,但还没有完成。
// 当前 token 暂停,等待外部事件恢复。
Pending = 4
};
/**
* @brief Token 状态
*
* token 可以理解成“一条正在流程图上移动的执行路径”。
*/
enum class FlowTokenState
{
Active = 0,
// 当前 token 暂停,等待外部事件恢复。
Waiting = 1,
Finished = 2,
Cancelled = 3
};
/**
* @brief Frame 状态
*
* frame 可以理解成“子流程 / 递归调用栈帧”。
*/
enum class FlowFrameState
{
Active = 0,
Returned = 1,
Cancelled = 2
};
/**
* @brief Lease 类型
*
* lease 可以理解成“流程生命周期租约”。
* 只要 lease 没释放,主流程就不能 initial。
*/
enum class FlowLeaseType
{
Unknown = 0,
// QTimer / 硬件回调 / 运动完成等待
Async = 1,
// 子流程 / 递归等待
Frame = 2,
// 循环体本轮还没完成
Loop = 3
};
/**
* @brief 流程上下文
*
* 每条流程信号都应该携带这个上下文。
*
* runId:
* 本次点击运行的 ID。
*
* tokenId:
* 当前执行路径 ID。
*
* tokenGroupId:
* 同一次分叉产生的一组兄弟 token。
* 多输入汇合时常用它来判断是不是同一批分支。
*
* frameId:
* 当前子流程 / 递归栈帧 ID。
*
* parentFrameId:
* 父级 frame。
*
* joinBatchId:
* 当前汇合批次 ID。
* 解决 A 循环多轮、B 不循环时的错位问题。
*
* iteration:
* 当前循环轮次。
*
* depth:
* 当前递归深度。
*
* eventId:
* 当前派发事件 ID。
*/
struct FlowContext
{
quint64 runId = 0;
quint64 tokenId = 0;
quint64 parentTokenId = 0; // add by Momo. FlowV2 branch token parent.
quint64 tokenGroupId = 0;
quint64 frameId = 0;
quint64 parentFrameId = 0;
quint64 joinBatchId = 0;
int iteration = 0;
int depth = 0;
quint64 eventId = 0;
bool isValid() const
{
return runId != 0 && tokenId != 0;
}
};
/**
* @brief 流程边 key
*
* 表示一条具体连线:
* fromNodeId:fromPort -> toNodeId:toPort
*/
struct FlowEdgeKey
{
QString fromNodeId;
int fromPort = 0;
QString toNodeId;
int toPort = 0;
QString edgeId;
bool operator<(const FlowEdgeKey& other) const
{
return std::tie(fromNodeId, fromPort, toNodeId, toPort, edgeId)
< std::tie(other.fromNodeId, other.fromPort, other.toNodeId, other.toPort, other.edgeId);
}
};
/**
* @brief 一条实际传递的流程信号
*/
struct FlowSignal
{
FlowContext context;
FlowEdgeKey edge;
FlowSignalStatus status = FlowSignalStatus::Unknown;
QVariant data;
};
/**
* @brief 多输入汇合 key
*
* 用来区分:
* 同一个模块在不同 run / tokenGroup / frame / joinBatch / iteration 下的输入状态。
*/
struct FlowJoinKey
{
quint64 runId = 0;
quint64 tokenGroupId = 0;
quint64 frameId = 0;
quint64 joinBatchId = 0;
QString nodeId;
int iteration = 0;
bool operator<(const FlowJoinKey& other) const
{
return std::tie(runId, tokenGroupId, frameId, joinBatchId, nodeId, iteration)
< std::tie(other.runId, other.tokenGroupId, other.frameId, other.joinBatchId, other.nodeId, other.iteration);
}
};
/**
* @brief 多输入节点的一条输入记录
*/
struct FlowInputRecord
{
FlowSignalStatus status = FlowSignalStatus::Unknown;
FlowContext context;
QVariant data;
QDateTime arrivedTime;
};
/**
* @brief 多输入节点当前上下文下的汇合状态
*
* expectedInputs:
* 当前这一次汇合应该等待哪些输入。
*
* arrivedInputs:
* 当前已经到达的输入。
*/
struct FlowJoinState
{
QMap<FlowEdgeKey, bool> expectedInputs;
QMap<FlowEdgeKey, FlowInputRecord> arrivedInputs;
bool isReady() const
{
if (expectedInputs.isEmpty()) {
return true;
}
for (auto it = expectedInputs.begin(); it != expectedInputs.end(); ++it) {
if (!arrivedInputs.contains(it.key())) {
return false;
}
}
return true;
}
FlowSignalStatus reduceStatus() const
{
bool hasComplete = false;
bool hasFailed = false;
for (auto it = arrivedInputs.begin(); it != arrivedInputs.end(); ++it) {
if (it.value().status == FlowSignalStatus::Failed) {
hasFailed = true;
}
if (it.value().status == FlowSignalStatus::Complete) {
hasComplete = true;
}
}
if (hasFailed) {
return FlowSignalStatus::Failed;
}
if (hasComplete) {
return FlowSignalStatus::Complete;
}
return FlowSignalStatus::Ignore;
}
};
/**
* @brief token 记录
*/
struct FlowTokenRecord
{
quint64 tokenId = 0;
quint64 parentTokenId = 0; // add by Momo. 0 means root token.
quint64 tokenGroupId = 0;
quint64 frameId = 0;
FlowTokenState state = FlowTokenState::Active;
QString currentNodeId;
QDateTime createTime;
QDateTime finishTime;
};
/**
* @brief 子流程 / 递归 frame 记录
*/
struct FlowFrameRecord
{
quint64 frameId = 0;
quint64 parentFrameId = 0;
quint64 callerTokenId = 0;
int depth = 0;
FlowFrameState state = FlowFrameState::Active;
QMap<QString, QVariant> inputs;
QMap<QString, QVariant> locals;
QVariant returnValue;
QDateTime createTime;
QDateTime finishTime;
};
/**
* @brief lease 记录
*
* 用来表示:
* 现在还有一个异步任务 / 子流程 / 循环实例没结束。
*/
struct FlowLeaseRecord
{
quint64 leaseId = 0;
FlowLeaseType type = FlowLeaseType::Unknown;
FlowContext context;
QString ownerNodeId;
QString reason;
bool released = false;
QDateTime createTime;
QDateTime releaseTime;
};
/**
* @brief 单次主流程运行的完整账本
*/
struct FlowRunState
{
quint64 runId = 0;
bool running = false;
bool failed = false;
QString failureReason;
FlowContext rootContext;
/**
* @brief 是否已经有 EndTool 请求结束。
*
* 注意:
* finishRequested == true 不代表马上 initial。
* 还必须等 pendingEvents / runningReceives / leases / tokens / frames 都清空。
*/
bool finishRequested = false;
/**
* @brief 已经投递出去,但目标节点还没开始处理的事件数量。
*/
int pendingEvents = 0;
/**
* @brief 正在执行 receive() 的节点数量。
*/
int runningReceives = 0;
/**
* @brief 当前 run 下的所有 token。
*/
QMap<quint64, FlowTokenRecord> tokens;
/**
* @brief 当前 run 下的所有 frame。
*/
QMap<quint64, FlowFrameRecord> frames;
/**
* @brief 当前 run 下的所有 lease。
*/
QMap<quint64, FlowLeaseRecord> leases;
/**
* @brief 当前 run 下所有等待中的多输入汇合状态。
*/
QMap<FlowJoinKey, FlowJoinState> pendingJoins;
QDateTime startTime;
QDateTime finishTime;
int activeTokenCount() const
{
int count = 0;
for (auto it = tokens.begin(); it != tokens.end(); ++it) {
if (it.value().state == FlowTokenState::Active) {
++count;
}
}
return count;
}
int activeBranchTokenCount() const
{
int count = 0;
for (auto it = tokens.begin(); it != tokens.end(); ++it) {
const FlowTokenRecord& token = it.value();
// parentTokenId != 0 表示这是 Start fan-out 后生成的分支 token。
// root token 只代表本次运行入口,不参与 End barrier 等待。
if (token.parentTokenId != 0 &&
token.state == FlowTokenState::Active) {
++count;
}
}
return count;
}
bool canFinishStage1() const
{
return finishRequested
&& !failed
&& activeBranchTokenCount() == 0
&& activeFrameCount() <= 1;
}
int activeFrameCount() const
{
int count = 0;
for (auto it = frames.begin(); it != frames.end(); ++it) {
if (it.value().state == FlowFrameState::Active) {
++count;
}
}
return count;
}
int activeLeaseCount() const
{
int count = 0;
for (auto it = leases.begin(); it != leases.end(); ++it) {
if (!it.value().released) {
++count;
}
}
return count;
}
bool isIdle() const
{
return pendingEvents == 0
&& runningReceives == 0
&& activeTokenCount() == 0
&& activeFrameCount() == 0
&& activeLeaseCount() == 0
&& pendingJoins.isEmpty();
}
bool canInitial() const
{
return finishRequested && isIdle();
}
void clear()
{
runId = 0;
running = false;
finishRequested = false;
pendingEvents = 0;
runningReceives = 0;
tokens.clear();
frames.clear();
leases.clear();
pendingJoins.clear();
startTime = QDateTime();
finishTime = QDateTime();
failed = false;
failureReason.clear();
rootContext = FlowContext();
}
};
/**
* @brief FlowRuntime 全局状态
*
* 建议作为 modelObjectmainprocess 的成员变量:
*
* class modelObjectmainprocess {
* private:
* FlowRuntimeState _flowRuntime;
* };
*/
struct FlowRuntimeState
{
FlowRunState currentRun;
quint64 nextRunId = 0;
quint64 nextTokenId = 0;
quint64 nextTokenGroupId = 0;
quint64 nextFrameId = 0;
quint64 nextJoinBatchId = 0;
quint64 nextEventId = 0;
quint64 nextLeaseId = 0;
int maxDepth = 32;
int maxLoopIteration = 1000;
int maxTotalEvents = 100000;
void resetAll()
{
currentRun.clear();
nextRunId = 0;
nextTokenId = 0;
nextTokenGroupId = 0;
nextFrameId = 0;
nextJoinBatchId = 0;
nextEventId = 0;
nextLeaseId = 0;
}
FlowContext createRootContext()
{
currentRun.clear();
currentRun.runId = ++nextRunId;
currentRun.running = true;
currentRun.finishRequested = false;
currentRun.startTime = QDateTime::currentDateTime();
const quint64 rootTokenId = ++nextTokenId;
const quint64 rootGroupId = ++nextTokenGroupId;
const quint64 rootFrameId = ++nextFrameId;
const quint64 rootJoinBatchId = ++nextJoinBatchId;
FlowTokenRecord token;
token.tokenId = rootTokenId;
token.tokenGroupId = rootGroupId;
token.frameId = rootFrameId;
token.state = FlowTokenState::Active;
token.createTime = QDateTime::currentDateTime();
token.parentTokenId = 0;
currentRun.tokens.insert(rootTokenId, token);
FlowFrameRecord frame;
frame.frameId = rootFrameId;
frame.parentFrameId = 0;
frame.depth = 0;
frame.state = FlowFrameState::Active;
frame.createTime = QDateTime::currentDateTime();
currentRun.frames.insert(rootFrameId, frame);
FlowContext ctx;
ctx.runId = currentRun.runId;
ctx.tokenId = rootTokenId;
ctx.tokenGroupId = rootGroupId;
ctx.frameId = rootFrameId;
ctx.parentFrameId = 0;
ctx.joinBatchId = rootJoinBatchId;
ctx.iteration = 0;
ctx.depth = 0;
ctx.eventId = 0;
ctx.parentTokenId = 0;
currentRun.rootContext = ctx;
return ctx;
}
// 处理子流程的层级关系
FlowContext createChildFrameContext(const FlowContext& parentContext, const QString& callerNodeId)
{
if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
return FlowContext();
}
if (parentContext.depth + 1 > maxDepth) {
return FlowContext();
}
const quint64 childFrameId = ++nextFrameId;
const quint64 childRootTokenId = ++nextTokenId;
const quint64 childTokenGroupId = ++nextTokenGroupId;
const quint64 childJoinBatchId = ++nextJoinBatchId;
FlowFrameRecord frame;
frame.frameId = childFrameId;
frame.parentFrameId = parentContext.frameId;
frame.callerTokenId = parentContext.tokenId;
frame.depth = parentContext.depth + 1;
frame.state = FlowFrameState::Active;
frame.createTime = QDateTime::currentDateTime();
currentRun.frames.insert(childFrameId, frame);
FlowTokenRecord token;
token.tokenId = childRootTokenId;
token.parentTokenId = parentContext.tokenId;
token.tokenGroupId = childTokenGroupId;
token.frameId = childFrameId;
token.state = FlowTokenState::Active;
token.currentNodeId = callerNodeId;
token.createTime = QDateTime::currentDateTime();
currentRun.tokens.insert(childRootTokenId, token);
FlowContext ctx;
ctx.runId = parentContext.runId;
ctx.tokenId = childRootTokenId;
ctx.parentTokenId = parentContext.tokenId;
ctx.tokenGroupId = childTokenGroupId;
ctx.frameId = childFrameId;
ctx.parentFrameId = parentContext.frameId;
ctx.joinBatchId = childJoinBatchId;
ctx.iteration = 0;
ctx.depth = parentContext.depth + 1;
ctx.eventId = ++nextEventId;
return ctx;
}
void finishFrame(const FlowContext& context)
{
auto it = currentRun.frames.find(context.frameId);
if (it == currentRun.frames.end()) {
return;
}
if (it.value().state == FlowFrameState::Active) {
it.value().state = FlowFrameState::Returned;
it.value().finishTime = QDateTime::currentDateTime();
}
}
int activeChildTokenCount(quint64 parentTokenId) const
{
int count = 0;
for (auto it = currentRun.tokens.begin(); it != currentRun.tokens.end(); ++it) {
const FlowTokenRecord& token = it.value();
if (token.state != FlowTokenState::Active &&
token.state != FlowTokenState::Waiting) {
continue;
}
quint64 currentParentId = token.parentTokenId;
while (currentParentId != 0) {
if (currentParentId == parentTokenId) {
++count;
break;
}
auto parentIt = currentRun.tokens.find(currentParentId);
if (parentIt == currentRun.tokens.end()) {
break;
}
currentParentId = parentIt.value().parentTokenId;
}
}
return count;
}
FlowContext createBranchContext(const FlowContext& parentContext, int branchIndex)
{
Q_UNUSED(branchIndex);
if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
return FlowContext();
}
FlowContext ctx = parentContext;
ctx.parentTokenId = parentContext.tokenId;
ctx.tokenId = ++nextTokenId;
ctx.eventId = ++nextEventId;
// 注意:
// 不要重置 iteration。
// 普通 fan-out 属于当前轮内部执行,必须继承 parentContext.iteration。
// 不要新建 joinBatchId。
// 同一轮 fan-out 后再 join,应该仍然属于同一批 joinBatchId。
FlowTokenRecord token;
token.tokenId = ctx.tokenId;
token.parentTokenId = ctx.parentTokenId;
token.tokenGroupId = ctx.tokenGroupId;
token.frameId = ctx.frameId;
token.state = FlowTokenState::Active;
token.createTime = QDateTime::currentDateTime();
currentRun.tokens.insert(ctx.tokenId, token);
return ctx;
}
FlowContext createLoopContext(
const FlowContext& parentContext,
int loopIndex,
bool ignoreMaxLoopIteration = false)
{
Q_UNUSED(loopIndex);
if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
return FlowContext();
}
if (!ignoreMaxLoopIteration &&
parentContext.iteration + 1 > maxLoopIteration) {
return FlowContext();
}
FlowContext ctx = parentContext;
ctx.parentTokenId = parentContext.tokenId;
ctx.tokenId = ++nextTokenId;
ctx.eventId = ++nextEventId;
ctx.iteration = parentContext.iteration + 1;
ctx.joinBatchId = ++nextJoinBatchId;
FlowTokenRecord token;
token.tokenId = ctx.tokenId;
token.parentTokenId = ctx.parentTokenId;
token.tokenGroupId = ctx.tokenGroupId;
token.frameId = ctx.frameId;
token.state = FlowTokenState::Active;
token.createTime = QDateTime::currentDateTime();
currentRun.tokens.insert(ctx.tokenId, token);
return ctx;
}
void finishToken(const FlowContext& context)
{
auto it = currentRun.tokens.find(context.tokenId);
if (it == currentRun.tokens.end()) {
return;
}
if (it.value().state == FlowTokenState::Active) {
it.value().state = FlowTokenState::Finished;
it.value().finishTime = QDateTime::currentDateTime();
}
}
void cancelToken(const FlowContext& context)
{
auto it = currentRun.tokens.find(context.tokenId);
if (it == currentRun.tokens.end()) {
return;
}
if (it.value().state == FlowTokenState::Active) {
it.value().state = FlowTokenState::Cancelled;
it.value().finishTime = QDateTime::currentDateTime();
}
}
void requestFinish()
{
currentRun.finishRequested = true;
currentRun.running = false;
currentRun.finishTime = QDateTime::currentDateTime();
}
void markFailed(const QString& reason)
{
currentRun.failed = true;
currentRun.failureReason = reason;
currentRun.finishRequested = true;
currentRun.running = false;
currentRun.finishTime = QDateTime::currentDateTime();
for (auto it = currentRun.tokens.begin(); it != currentRun.tokens.end(); ++it) {
if (it.value().state == FlowTokenState::Active ||
it.value().state == FlowTokenState::Waiting) {
it.value().state = FlowTokenState::Cancelled;
it.value().finishTime = currentRun.finishTime;
}
}
}
bool hasFailed() const
{
return currentRun.failed;
}
int activeBranchTokenCount() const
{
return currentRun.activeBranchTokenCount();
}
bool canFinishStage1() const
{
return currentRun.canFinishStage1();
}
};
FlowV2Graph 负责保存和校验流程图结构。
它不负责运行流程,只负责描述:
- 有哪些节点?
- 有哪些边?
- Start 在哪里?
- End 在哪里?
- 哪些节点能到终点?
- 哪些区域是循环?
#pragma once
#include <QString>
#include <QVector>
#include <QMap>
#include <QVariantMap>
#include <QSet>
#include "FlowRuntimeState.h"
#include "qtpubliclib_global.h"
class modelObject;
class modelObjectprocess;
enum class FlowV2EdgeKind
{
Normal = 0,
// 后续循环阶段使用。
// Stage1 不允许 LoopBack。
LoopBack = 1
};
struct FlowV2Node
{
QString nodeId;
QString type;
QString displayName;
QVariantMap config;
bool isStart = false;
bool isEnd = false;
// 只作为旧图结构来源和后续插件适配入口。
// FlowV2Runtime Stage1 不调用 legacyObject->receive/send。
modelObject* legacyObject = nullptr;
};
struct FlowV2Edge
{
QString edgeId;
QString sourceNodeId;
int sourcePort = 0;
QString targetNodeId;
int targetPort = 0;
FlowV2EdgeKind kind = FlowV2EdgeKind::Normal;
};
enum class FlowV2BranchState
{
Pending = 0,
Running = 1,
Finished = 2,
Failed = 3
};
struct FlowV2Branch
{
quint64 branchId = 0;
FlowContext context;
QVector<QString> nodePath;
int currentIndex = 0;
FlowV2BranchState state = FlowV2BranchState::Pending;
};
struct FlowV2ValidateResult
{
bool ok = false;
QString error;
};
class QTPUBLICLIB_EXPORT FlowV2Graph
{
public:
QMap<QString, FlowV2Node> nodes;
QVector<FlowV2Edge> edges;
QString findStartNodeId() const;
QString findEndNodeId() const;
QVector<FlowV2Edge> incomingEdges(const QString& nodeId) const;
QVector<FlowV2Edge> outgoingEdges(const QString& nodeId) const;
FlowV2ValidateResult validateStage1Dag() const;
QVector<FlowV2Branch> buildBranchesFromStart(QString* errorMessage = nullptr) const;
void markLoopBackEdges(); // 添加循环
bool isTerminalSinkNode(const QString& nodeId) const; // 添加多终点
bool hasContinuousManualLoopRegion() const; // 添加动态回边
private:
bool hasCycle(QString* cycleNodeId = nullptr) const;
bool hasCycleDfs(
const QString& nodeId,
QSet<QString>& visiting,
QSet<QString>& visited,
QString* cycleNodeId) const;
QSet<QString> reachableFromStart(const QString& startId) const;
QSet<QString> nodesCanReachEnd(const QString& endId) const;
QSet<QString> nodesCanReachTerminal() const; // 添加多终点
// 添加循环
void markLoopBackEdgesDfs(
const QString& nodeId,
QSet<QString>& visiting,
QSet<QString>& visited);
QVector<QSet<QString>> stronglyConnectedComponents() const;
void strongConnect(
const QString& nodeId,
int& index,
QMap<QString, int>& indexMap,
QMap<QString, int>& lowMap,
QVector<QString>& stack,
QSet<QString>& onStack,
QVector<QSet<QString>>& components) const;
bool canReachNode(
const QString& fromNodeId,
const QString& targetNodeId) const;
// 添加动态回边
bool isThrottleNode(const QString& nodeId) const;
bool canReachEndTool(const QString& fromNodeId) const;
FlowV2ValidateResult validateLoopRegions() const;
bool componentHasCycle(const QSet<QString>& component) const;
bool componentHasThrottleNode(const QSet<QString>& component) const;
};
class QTPUBLICLIB_EXPORT FlowV2GraphBuilder
{
public:
static FlowV2Graph fromLegacyProcess(modelObjectprocess* process);
private:
static int targetPortFor(modelObject* source, modelObject* target);
};
#include "FlowV2Graph.h"
#include "dataDefine.h"
#include <QSet>
#include <QStringList>
#include <QDebug>
namespace {
bool isStartType(const QString& type)
{
return type.contains(QStringLiteral("starttool"), Qt::CaseInsensitive);
}
bool isEndType(const QString& type)
{
return type.contains(QStringLiteral("endtool"), Qt::CaseInsensitive);
}
QString nodeDisplayName(modelObject* obj)
{
if (!obj) {
return QString();
}
const QString winText = obj->getWinTxt();
if (!winText.isEmpty()) {
return winText;
}
return obj->getIdname();
}
QString makeEdgeId(const QString& sourceId, int sourcePort, const QString& targetId, int targetPort)
{
return QStringLiteral("%1:%2->%3:%4")
.arg(sourceId)
.arg(sourcePort)
.arg(targetId)
.arg(targetPort);
}
} // namespace
QString FlowV2Graph::findStartNodeId() const
{
QString result;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
if (it.value().isStart) {
if (!result.isEmpty()) {
return QString();
}
result = it.key();
}
}
return result;
}
QString FlowV2Graph::findEndNodeId() const
{
QString result;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
if (it.value().isEnd) {
if (!result.isEmpty()) {
return QString();
}
result = it.key();
}
}
return result;
}
QVector<FlowV2Edge> FlowV2Graph::incomingEdges(const QString& nodeId) const
{
QVector<FlowV2Edge> result;
for (const FlowV2Edge& edge : edges) {
if (edge.targetNodeId == nodeId) {
result.push_back(edge);
}
}
return result;
}
QVector<FlowV2Edge> FlowV2Graph::outgoingEdges(const QString& nodeId) const
{
QVector<FlowV2Edge> result;
for (const FlowV2Edge& edge : edges) {
if (edge.sourceNodeId == nodeId) {
result.push_back(edge);
}
}
return result;
}
FlowV2ValidateResult FlowV2Graph::validateStage1Dag() const
{
FlowV2ValidateResult result;
const QString startId = findStartNodeId();
if (startId.isEmpty()) {
result.error = QStringLiteral("FlowV2 Stage1 requires exactly one starttool.");
return result;
}
/* const QString endId = findEndNodeId();
if (endId.isEmpty()) {
result.error = QStringLiteral("FlowV2 Stage1 requires exactly one endtool.");
return result;
}*/
// 添加多终点
const QString endId = findEndNodeId();
bool hasTerminalSink = false;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
if (isTerminalSinkNode(nodeId)) {
hasTerminalSink = true;
break;
}
}
if (!hasTerminalSink) {
result.error = QStringLiteral("FlowV2 requires at least one terminal node, such as endtool or chart.");
return result;
}
// 1. 所有边的端点必须存在
// Normal 边参与 DAG 校验;LoopBack 边允许存在,但不参与普通无环校验。
for (const FlowV2Edge& edge : edges) {
if (!nodes.contains(edge.sourceNodeId)) {
result.error = QStringLiteral("edge source node not found: %1").arg(edge.sourceNodeId);
return result;
}
if (!nodes.contains(edge.targetNodeId)) {
result.error = QStringLiteral("edge target node not found: %1").arg(edge.targetNodeId);
return result;
}
if (edge.sourceNodeId == edge.targetNodeId) {
result.error = QStringLiteral("self-loop is not allowed in FlowV2: %1")
.arg(edge.sourceNodeId);
return result;
}
}
// 2. Start / End 基本约束。
if (!incomingEdges(startId).isEmpty()) {
result.error = QStringLiteral("starttool must not have input edges.");
return result;
}
if (outgoingEdges(startId).isEmpty()) {
result.error = QStringLiteral("starttool must have at least one output edge.");
return result;
}
/*if (!outgoingEdges(endId).isEmpty()) {
result.error = QStringLiteral("endtool must not have output edges.");
return result;
}
if (incomingEdges(endId).isEmpty()) {
result.error = QStringLiteral("endtool must have at least one input edge.");
return result;
}*/
// EndTool 现在不是唯一终点。
// 如果存在 endtool,才校验它;
// 如果流程以 chart 这类 terminal sink 结束,可以没有 endtool。
if (!endId.isEmpty()) {
if (!outgoingEdges(endId).isEmpty()) {
result.error = QStringLiteral("endtool must not have output edges.");
return result;
}
// 只有当 endtool 参与流程时,才要求它有输入。
// 如果当前流程用 chart 作为终点,孤立/不存在的 endtool 不应该阻塞运行。
if (incomingEdges(endId).isEmpty()) {
bool hasOtherTerminalSink = false;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
if (nodeId == endId) {
continue;
}
if (isTerminalSinkNode(nodeId)) {
hasOtherTerminalSink = true;
break;
}
}
if (!hasOtherTerminalSink) {
result.error = QStringLiteral("endtool must have at least one input edge.");
return result;
}
}
}
// 3. 普通节点允许 N 入 M 出,但不能没有输入或没有输出。
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
const FlowV2Node& node = it.value();
if (node.isStart || node.isEnd) {
continue;
}
const int inCount = incomingEdges(nodeId).size();
const int outCount = outgoingEdges(nodeId).size();
if (inCount < 1) {
result.error = QStringLiteral("middle node must have at least one input: %1")
.arg(nodeId);
return result;
}
if (outCount < 1 && !isTerminalSinkNode(nodeId)) {
result.error = QStringLiteral("middle node must have at least one output: %1")
.arg(nodeId);
return result;
}
}
// 4. 必须无环。
const FlowV2ValidateResult loopRegionResult = validateLoopRegions();
if (!loopRegionResult.ok) {
return loopRegionResult;
}
// 5. 所有节点必须从 Start 可达。
/*const QSet<QString> reachable = reachableFromStart(startId);
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
if (!reachable.contains(it.key())) {
result.error = QStringLiteral("node is not reachable from starttool: %1")
.arg(it.key());
return result;
}
}*/
const QSet<QString> reachable = reachableFromStart(startId);
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
const FlowV2Node& node = it.value();
// 如果存在 chart 等 terminal sink,孤立 endtool 可以忽略。
if (node.isEnd &&
incomingEdges(nodeId).isEmpty() &&
outgoingEdges(nodeId).isEmpty()) {
continue;
}
if (!reachable.contains(nodeId)) {
result.error = QStringLiteral("node is not reachable from starttool: %1")
.arg(nodeId);
return result;
}
}
// 6. 所有节点必须能到 End。
/* const QSet<QString> canReachEnd = nodesCanReachEnd(endId);
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
if (!canReachEnd.contains(it.key())) {
result.error = QStringLiteral("node cannot reach endtool: %1")
.arg(it.key());
return result;
}
}*/
const QSet<QString> canReachTerminal = nodesCanReachTerminal();
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
const FlowV2Node& node = it.value();
if (node.isEnd &&
incomingEdges(nodeId).isEmpty() &&
outgoingEdges(nodeId).isEmpty()) {
continue;
}
if (!canReachTerminal.contains(nodeId)) {
result.error = QStringLiteral("node cannot reach terminal node: %1")
.arg(nodeId);
return result;
}
}
result.ok = true;
return result;
}
QVector<FlowV2Branch> FlowV2Graph::buildBranchesFromStart(QString* errorMessage) const
{
QVector<FlowV2Branch> result;
if (errorMessage) {
errorMessage->clear();
}
const QString startId = findStartNodeId();
const QString endId = findEndNodeId();
if (startId.isEmpty() || endId.isEmpty()) {
if (errorMessage) {
*errorMessage = QStringLiteral("missing starttool or endtool.");
}
return result;
}
const QVector<FlowV2Edge> startOutputs = outgoingEdges(startId);
for (const FlowV2Edge& firstEdge : startOutputs) {
FlowV2Branch branch;
branch.nodePath.push_back(startId);
QSet<QString> visited;
visited.insert(startId);
FlowV2Edge currentEdge = firstEdge;
while (true) {
const QString currentNodeId = currentEdge.targetNodeId;
if (!nodes.contains(currentNodeId)) {
if (errorMessage) {
*errorMessage = QStringLiteral("edge target node not found: %1").arg(currentNodeId);
}
return QVector<FlowV2Branch>();
}
if (visited.contains(currentNodeId)) {
if (errorMessage) {
*errorMessage = QStringLiteral("cycle detected in FlowV2 Stage1 branch at node: %1").arg(currentNodeId);
}
return QVector<FlowV2Branch>();
}
visited.insert(currentNodeId);
branch.nodePath.push_back(currentNodeId);
if (currentNodeId == endId) {
break;
}
const QVector<FlowV2Edge> outputs = outgoingEdges(currentNodeId);
if (outputs.size() != 1) {
if (errorMessage) {
*errorMessage = QStringLiteral("middle node must have exactly one output in Stage1: %1").arg(currentNodeId);
}
return QVector<FlowV2Branch>();
}
currentEdge = outputs.first();
}
result.push_back(branch);
}
return result;
}
FlowV2Graph FlowV2GraphBuilder::fromLegacyProcess(modelObjectprocess* process)
{
FlowV2Graph graph;
if (!process) {
return graph;
}
const QMap<QString, modelObject*> children = process->getChild();
for (auto it = children.begin(); it != children.end(); ++it) {
modelObject* obj = it.value();
if (!obj) {
continue;
}
FlowV2Node node;
node.nodeId = obj->getIdname();
node.type = obj->getType();
node.displayName = nodeDisplayName(obj);
node.isStart = isStartType(node.type);
node.isEnd = isEndType(node.type);
node.legacyObject = obj;
graph.nodes.insert(node.nodeId, node);
}
QSet<QString> edgeIds;
for (auto it = children.begin(); it != children.end(); ++it) {
modelObject* source = it.value();
if (!source) {
continue;
}
const QString sourceId = source->getIdname();
for (auto outIt = source->nextModules.begin(); outIt != source->nextModules.end(); ++outIt) {
const int sourcePort = outIt.key();
for (modelObject* target : outIt.value()) {
if (!target) {
continue;
}
const QString targetId = target->getIdname();
const int targetPort = targetPortFor(source, target);
const QString edgeId = makeEdgeId(sourceId, sourcePort, targetId, targetPort);
if (edgeIds.contains(edgeId)) {
continue;
}
FlowV2Edge edge;
edge.edgeId = edgeId;
edge.sourceNodeId = sourceId;
edge.sourcePort = sourcePort;
edge.targetNodeId = targetId;
edge.targetPort = targetPort;
graph.edges.push_back(edge);
edgeIds.insert(edgeId);
}
}
}
/*return graph;*/
graph.markLoopBackEdges();
return graph;
}
int FlowV2GraphBuilder::targetPortFor(modelObject* source, modelObject* target)
{
if (!source || !target) {
return 0;
}
for (auto it = target->inModule.begin(); it != target->inModule.end(); ++it) {
if (it.value().contains(source)) {
return it.key();
}
}
return 0;
}
bool FlowV2Graph::hasCycle(QString* cycleNodeId) const
{
QSet<QString> visiting;
QSet<QString> visited;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
if (visited.contains(nodeId)) {
continue;
}
if (hasCycleDfs(nodeId, visiting, visited, cycleNodeId)) {
return true;
}
}
return false;
}
bool FlowV2Graph::hasCycleDfs(
const QString& nodeId,
QSet<QString>& visiting,
QSet<QString>& visited,
QString* cycleNodeId) const
{
visiting.insert(nodeId);
const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
// Stage1 所有边都应该是 Normal。
// 后续循环阶段可以选择跳过 LoopBack,再单独校验 loop 语义。
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
const QString nextNodeId = edge.targetNodeId;
if (visiting.contains(nextNodeId)) {
if (cycleNodeId) {
*cycleNodeId = nextNodeId;
}
return true;
}
if (!visited.contains(nextNodeId)) {
if (hasCycleDfs(nextNodeId, visiting, visited, cycleNodeId)) {
return true;
}
}
}
visiting.remove(nodeId);
visited.insert(nodeId);
return false;
}
QSet<QString> FlowV2Graph::reachableFromStart(const QString& startId) const
{
QSet<QString> visited;
QVector<QString> stack;
stack.push_back(startId);
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
if (!visited.contains(edge.targetNodeId)) {
stack.push_back(edge.targetNodeId);
}
}
}
return visited;
}
QSet<QString> FlowV2Graph::nodesCanReachEnd(const QString& endId) const
{
QSet<QString> visited;
QVector<QString> stack;
stack.push_back(endId);
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> inputs = incomingEdges(nodeId);
for (const FlowV2Edge& edge : inputs) {
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
if (!visited.contains(edge.sourceNodeId)) {
stack.push_back(edge.sourceNodeId);
}
}
}
return visited;
}
QSet<QString> FlowV2Graph::nodesCanReachTerminal() const
{
QSet<QString> visited;
QVector<QString> stack;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
if (isTerminalSinkNode(nodeId)) {
stack.push_back(nodeId);
}
}
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> inputs = incomingEdges(nodeId);
for (const FlowV2Edge& edge : inputs) {
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
if (!visited.contains(edge.sourceNodeId)) {
stack.push_back(edge.sourceNodeId);
}
}
}
return visited;
}
//void FlowV2Graph::markLoopBackEdges()
//{
// for (FlowV2Edge& edge : edges) {
// edge.kind = FlowV2EdgeKind::Normal;
// }
//
// const QString endId = findEndNodeId();
// if (endId.isEmpty()) {
// return;
// }
//
// const QVector<QSet<QString>> components = stronglyConnectedComponents();
//
// for (const QSet<QString>& component : components) {
// if (component.size() <= 1) {
// continue;
// }
//
// QSet<QString> exitNodes;
//
// // 找循环区域里的“出口节点”:
// // 这个节点既有 SCC 内部边,又有 SCC 外部边能到 End。
// for (const FlowV2Edge& edge : edges) {
// if (!component.contains(edge.sourceNodeId)) {
// continue;
// }
//
// if (component.contains(edge.targetNodeId)) {
// continue;
// }
//
// if (canReachNode(edge.targetNodeId, endId)) {
// exitNodes.insert(edge.sourceNodeId);
// }
// }
//
// if (exitNodes.isEmpty()) {
// qWarning() << "[FlowV2][Graph] loop region has no exit node. nodes =" << component;
// continue;
// }
//
// // 只有出口节点留在 SCC 内部的边才是 LoopBack。
// // SCC 内其他边仍然是 Normal。
// for (FlowV2Edge& edge : edges) {
// if (!exitNodes.contains(edge.sourceNodeId)) {
// continue;
// }
//
// if (!component.contains(edge.targetNodeId)) {
// continue;
// }
//
// edge.kind = FlowV2EdgeKind::LoopBack;
//
// qDebug() << "[FlowV2][Graph] mark LoopBack edge by SCC ="
// << edge.sourceNodeId << "->" << edge.targetNodeId
// << "edgeId =" << edge.edgeId;
// }
// }
//}
void FlowV2Graph::markLoopBackEdges()
{
// 不再用 DFS 自动猜 LoopBack。
// LoopBack 在 Runtime 中根据当前 token 的 nodePath 动态判断。
for (FlowV2Edge& edge : edges) {
edge.kind = FlowV2EdgeKind::Normal;
}
}
void FlowV2Graph::markLoopBackEdgesDfs(
const QString& nodeId,
QSet<QString>& visiting,
QSet<QString>& visited)
{
visiting.insert(nodeId);
for (FlowV2Edge& edge : edges) {
if (edge.sourceNodeId != nodeId) {
continue;
}
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
const QString nextNodeId = edge.targetNodeId;
if (visiting.contains(nextNodeId)) {
edge.kind = FlowV2EdgeKind::LoopBack;
qDebug() << "[FlowV2][Graph] mark LoopBack edge ="
<< edge.sourceNodeId << "->" << edge.targetNodeId
<< "edgeId =" << edge.edgeId;
continue;
}
if (!visited.contains(nextNodeId)) {
markLoopBackEdgesDfs(nextNodeId, visiting, visited);
}
}
visiting.remove(nodeId);
visited.insert(nodeId);
}
QVector<QSet<QString>> FlowV2Graph::stronglyConnectedComponents() const
{
QVector<QSet<QString>> components;
QMap<QString, int> indexMap;
QMap<QString, int> lowMap;
QVector<QString> stack;
QSet<QString> onStack;
int index = 0;
for (auto it = nodes.begin(); it != nodes.end(); ++it) {
const QString nodeId = it.key();
if (!indexMap.contains(nodeId)) {
strongConnect(
nodeId,
index,
indexMap,
lowMap,
stack,
onStack,
components);
}
}
return components;
}
void FlowV2Graph::strongConnect(
const QString& nodeId,
int& index,
QMap<QString, int>& indexMap,
QMap<QString, int>& lowMap,
QVector<QString>& stack,
QSet<QString>& onStack,
QVector<QSet<QString>>& components) const
{
indexMap.insert(nodeId, index);
lowMap.insert(nodeId, index);
++index;
stack.push_back(nodeId);
onStack.insert(nodeId);
const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
const QString nextNodeId = edge.targetNodeId;
if (!indexMap.contains(nextNodeId)) {
strongConnect(
nextNodeId,
index,
indexMap,
lowMap,
stack,
onStack,
components);
lowMap[nodeId] = qMin(lowMap.value(nodeId), lowMap.value(nextNodeId));
}
else if (onStack.contains(nextNodeId)) {
lowMap[nodeId] = qMin(lowMap.value(nodeId), indexMap.value(nextNodeId));
}
}
if (lowMap.value(nodeId) != indexMap.value(nodeId)) {
return;
}
QSet<QString> component;
while (!stack.isEmpty()) {
const QString current = stack.takeLast();
onStack.remove(current);
component.insert(current);
if (current == nodeId) {
break;
}
}
components.push_back(component);
}
bool FlowV2Graph::canReachNode(
const QString& fromNodeId,
const QString& targetNodeId) const
{
if (fromNodeId.isEmpty() || targetNodeId.isEmpty()) {
return false;
}
QSet<QString> visited;
QVector<QString> stack;
stack.push_back(fromNodeId);
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (nodeId == targetNodeId) {
return true;
}
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
if (!visited.contains(edge.targetNodeId)) {
stack.push_back(edge.targetNodeId);
}
}
}
return false;
}
bool FlowV2Graph::isTerminalSinkNode(const QString& nodeId) const
{
if (!nodes.contains(nodeId)) {
return false;
}
const FlowV2Node node = nodes.value(nodeId);
const QString type = node.type.toLower();
const QString displayName = node.displayName.toLower();
if (node.isEnd) {
return true;
}
// 图表模块是终端消费节点:
// 它接收数据并绘图,不需要继续连接 endtool。
if (type == QStringLiteral("chart") ||
type.contains(QStringLiteral("chart"))
//|| displayName.contains(QStringLiteral("图表"))
) {
return true;
}
return false;
}
//bool FlowV2Graph::isThrottleNode(const QString& nodeId) const
//{
// if (!nodes.contains(nodeId)) {
// return false;
// }
//
// const QString type = nodes.value(nodeId).type.toLower();
//
// return type.contains(QStringLiteral("delay"))
// || type.contains(QStringLiteral("signal"));
//}
bool FlowV2Graph::isThrottleNode(const QString& nodeId) const
{
if (!nodes.contains(nodeId)) {
return false;
}
const FlowV2Node node = nodes.value(nodeId);
const QString type = node.type.toLower();
const QString name = node.displayName.toLower();
const auto containsThrottleKeyword = [](const QString& text) -> bool {
return text.contains(QStringLiteral("delay"))
|| text.contains(QStringLiteral("signal"))
|| text.contains(QStringLiteral("mathgenerate"))
|| text.contains(QStringLiteral("math generate"))
|| text.contains(QStringLiteral("point to point"))
|| text.contains(QStringLiteral("point_to_point"))
|| text.contains(QStringLiteral("position"))
|| text.contains(QStringLiteral("resettoorigin"))
|| text.contains(QStringLiteral("reset to origin"))
|| text.contains(QStringLiteral("calibrate"))
/* || text.contains(QStringLiteral("延迟"))
|| text.contains(QStringLiteral("信号"))
|| text.contains(QStringLiteral("位置"))
|| text.contains(QStringLiteral("归零"))*/
;
};
return containsThrottleKeyword(type) || containsThrottleKeyword(name);
}
bool FlowV2Graph::canReachEndTool(const QString& fromNodeId) const
{
const QString endId = findEndNodeId();
if (endId.isEmpty()) {
return false;
}
return canReachNode(fromNodeId, endId);
}
bool FlowV2Graph::hasContinuousManualLoopRegion() const
{
const QVector<QSet<QString>> components = stronglyConnectedComponents();
const QSet<QString> canReachTerminal = nodesCanReachTerminal();
for (const QSet<QString>& component : components) {
if (component.size() <= 1) {
continue;
}
bool hasTerminalExit = false;
bool hasEndExit = false;
bool hasThrottle = false;
for (const QString& nodeId : component) {
if (isThrottleNode(nodeId)) {
hasThrottle = true;
}
}
for (const FlowV2Edge& edge : edges) {
if (!component.contains(edge.sourceNodeId)) {
continue;
}
if (component.contains(edge.targetNodeId)) {
continue;
}
if (canReachTerminal.contains(edge.targetNodeId)) {
hasTerminalExit = true;
}
if (canReachEndTool(edge.targetNodeId)) {
hasEndExit = true;
}
}
// 有 terminal sink,但没有 endtool 出口,且循环中有节流节点:
// 认为是持续手动停止流程。
if (hasTerminalExit && !hasEndExit && hasThrottle) {
qDebug() << "[FlowV2][Graph] continuous manual loop region =" << component;
return true;
}
}
return false;
}
bool FlowV2Graph::componentHasCycle(const QSet<QString>& component) const
{
if (component.size() > 1) {
return true;
}
if (component.size() == 1) {
const QString nodeId = *component.begin();
for (const FlowV2Edge& edge : edges) {
if (edge.sourceNodeId == nodeId &&
edge.targetNodeId == nodeId) {
return true;
}
}
}
return false;
}
//bool FlowV2Graph::componentHasThrottleNode(const QSet<QString>& component) const
//{
// for (const QString& nodeId : component) {
// if (!nodes.contains(nodeId)) {
// continue;
// }
//
// const QString type = nodes.value(nodeId).type.toLower();
//
// if (type.contains(QStringLiteral("delay")) ||
// type.contains(QStringLiteral("signal"))) {
// return true;
// }
// }
//
// return false;
//}
bool FlowV2Graph::componentHasThrottleNode(const QSet<QString>& component) const
{
for (const QString& nodeId : component) {
if (isThrottleNode(nodeId)) {
return true;
}
}
return false;
}
FlowV2ValidateResult FlowV2Graph::validateLoopRegions() const
{
FlowV2ValidateResult result;
result.ok = true;
const QVector<QSet<QString>> components = stronglyConnectedComponents();
const QSet<QString> canReachTerminalSet = nodesCanReachTerminal();
for (const QSet<QString>& component : components) {
if (!componentHasCycle(component)) {
continue;
}
bool hasExitToTerminal = false;
bool hasExitToEnd = false;
bool hasThrottle = componentHasThrottleNode(component);
for (const FlowV2Edge& edge : edges) {
if (!component.contains(edge.sourceNodeId)) {
continue;
}
// 还在循环区域内部,不算出口。
if (component.contains(edge.targetNodeId)) {
continue;
}
// 只要出口目标能到 End 或 chart 这类 terminal sink,就算合法出口。
if (canReachTerminalSet.contains(edge.targetNodeId)) {
hasExitToTerminal = true;
}
if (canReachEndTool(edge.targetNodeId)) {
hasExitToEnd = true;
}
}
if (!hasExitToTerminal) {
result.ok = false;
result.error = QStringLiteral("loop region has no exit to endtool or terminal sink.");
return result;
}
// 如果没有 End 出口,只有 chart 这类 terminal sink,
// 那就是持续手动停止流程,必须有 delay/signal 之类节流节点。
/*if (!hasExitToEnd && !hasThrottle) {
result.ok = false;
result.error = QStringLiteral("continuous loop region requires delay/signal throttle node.");
return result;
}*/
if (!hasThrottle) {
qWarning() << "[FlowV2][Graph] loop region has no explicit throttle node,"
<< "but it has an exit path, so allow it.";
}
qDebug() << "[FlowV2][Graph] valid loop region =" << component
<< "hasExitToEnd =" << hasExitToEnd
<< "hasThrottle =" << hasThrottle;
}
return result;
}
FlowV2Runtime是 FlowV2 的核心调度类。
它负责把 graph、runtimeState、executor 串起来。
可以把它理解为:FlowV2 的事件调度器 + 状态机
#pragma once
#include "FlowV2Graph.h"
#include "FlowRuntimeState.h"
#include "FlowV2Executor.h"
#include <QQueue>
#include <QMap>
#include <QVariant>
#include <QDateTime>
#include "qtpubliclib_global.h"
struct FlowV2Event
{
FlowContext context;
QString sourceNodeId;
QString targetNodeId;
int sourcePort = 0;
int targetPort = 0;
QVariant value;
// add by Momo 2026.05.25
// FlowV2 中 Ignore :不执行插件,只做账本传播。
FlowSignalStatus status = FlowSignalStatus::Complete;
};
// 做异步
struct FlowV2PendingNode
{
FlowV2Event event;
FlowV2ExecuteInput input;
QString nodeId;
QDateTime startTime;
};
enum class FlowV2InputPolicy
{
AnyCompleteAfterReady = 0
};
enum class FlowV2RunDecision
{
Wait = 0,
Run = 1,
Skip = 2,
Fail = 3
};
class QTPUBLICLIB_EXPORT FlowV2Runtime
{
public:
FlowV2Runtime() = default;
bool start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState);
QString lastError() const { return m_lastError; }
bool startSubFlow(
const FlowV2Graph& graph,
FlowRuntimeState& runtimeState,
const FlowContext& parentContext,
const QString& callerNodeId
);
// 增加异步功能
void completePendingNode(
const FlowContext& context,
const QVariant& outputValue = QVariant());
void failPendingNode(
const FlowContext& context,
const QString& reason);
void requestStop();
bool hasPendingNodes() const { return !m_pendingNodes.isEmpty(); }
// 子流程的单步走深度优先
bool hasPendingEvents() const
{
return !m_pendingEvents.isEmpty();
}
bool isFinished() const
{
return m_finishedEmitted
|| m_stopRequested
|| !m_lastError.isEmpty();
}
bool hasError() const { return !m_lastError.isEmpty(); }
void setFinishCallback(const std::function<void()>& callback)
{
m_finishCallback = callback;
}
void setFailCallback(const std::function<void(const QString&)>& callback)
{
m_failCallback = callback;
}
// 增加单步执行
void setStepMode(bool enabled)
{
m_stepMode = enabled;
}
bool isStepMode() const
{
return m_stepMode;
}
bool stepOnce();
bool continueRun();
private:
void enqueueEvent(const FlowV2Event& event);
void runEventLoop();
bool dispatchEvent(const FlowV2Event& event);
void executeNode(const FlowV2Event& event, const FlowV2ExecuteInput& input);
/*void completeNode(const FlowV2Event& event, const QVariant& outputValue);*/
void reachEnd(const FlowV2Event& event);
void tryFinishRun();
void fail(const QString& reason);
FlowV2Edge edgeBetween(const QString& sourceNodeId, const QString& targetNodeId) const;
bool startInternal(
const FlowV2Graph& graph,
FlowRuntimeState& runtimeState,
const FlowContext& rootContext,
bool isSubFlow
);
void completeNode(
const FlowV2Event& event,
const FlowV2ExecuteResult& result);
FlowV2RunDecision prepareNodeExecution(
const FlowV2Event& event,
FlowV2ExecuteInput& input);
QVector<FlowV2Edge> selectedOutputEdges(
const QString& nodeId,
const FlowV2ExecuteResult& result) const;
void propagateIgnoreEvent(const FlowV2Event& event);
void propagateIgnoreFromEdge(
const FlowV2Edge& edge,
const FlowContext& parentContext);
FlowJoinKey makeJoinKey(const FlowV2Event& event) const;
FlowEdgeKey makeEdgeKey(const FlowV2Event& event) const;
void finishJoinedInputTokens(const FlowJoinState& joinState, quint64 keepTokenId);
// 添加循环功能
bool enqueueSelectedEdge(
const FlowV2Edge& edge,
const FlowContext& parentContext,
const QVariant& value,
bool forceNewBranch);
void registerExpectedInput(const FlowV2Event& event);
// 等待所有边到达
QVector<FlowV2Edge> candidateInputEdgesForEvent(const FlowV2Event& event) const;
bool sameJoinScope(const FlowContext& a, const FlowContext& b) const;
bool canReachByNormalEdges(
const QString& fromNodeId,
const QString& targetNodeId) const;
bool canReachByAnyEdges(
const QString& fromNodeId,
const QString& targetNodeId,
const QString& ignoredEdgeId = QString()) const;
bool isCycleFormingEdge(const FlowV2Edge& edge) const;
bool missingInputCanStillArrive(
const FlowV2Edge& missingEdge,
const FlowContext& context) const;
bool hasMissingInputThatCanStillArrive(
const FlowV2Event& event,
const FlowJoinState& joinState) const;
FlowEdgeKey makeEdgeKeyFromEdge(const FlowV2Edge& edge) const;
// 添加多终点
bool isTerminalSinkNode(const QString& nodeId) const;
void reachTerminal(const FlowV2Event& event);
//添加动态回边
bool wouldRevisitNode(
const FlowContext& context,
const QString& targetNodeId) const;
bool isRuntimeLoopBackEdge(
const FlowV2Edge& edge,
const FlowContext& context) const;
bool isFeedbackInputEdge(const FlowV2Edge& edge) const;
// 增加异步功能
void suspendToken(
const FlowV2Event& event,
const FlowV2ExecuteInput& input);
private:
FlowV2Graph m_graph;
FlowRuntimeState* m_runtimeState = nullptr;
FlowContext m_rootContext;
QQueue<FlowV2Event> m_pendingEvents;
QMap<quint64, FlowV2Branch> m_branches;
QString m_startNodeId;
QString m_endNodeId;
QString m_lastError;
bool m_finishedEmitted = false;
bool m_isSubFlow = false;
// add by Momo.
// true 表示当前图存在“terminal sink + 循环”的持续流程,靠用户手动停止。
bool m_isContinuousManualLoop = false;
// 增加异步功能
QMap<quint64, FlowV2PendingNode> m_pendingNodes;
bool m_stopRequested = false;
std::function<void()> m_finishCallback;
std::function<void(const QString&)> m_failCallback;
bool m_runningEventLoop = false;
bool m_eventLoopRerunRequested = false;
// add by Momo 2026.06.02
// FlowV2 原生单步控制。
// m_stepMode=true 时,runEventLoop 不会一次性跑空事件队列。
bool m_stepMode = false;
// -1 表示连续执行;>0 表示本轮单步还允许执行几个真实节点。
int m_stepBudget = -1;
QString m_lastHighlightedNodeId; // 单步执行时候的高亮
// add by Momo 2026.06.02
// 数据流展示
void clearDataFlowDisplayRecords();
void recordDataFlowForDisplay(const FlowV2Event& event);
};
#include "FlowV2Runtime.h"
#include <QDebug>
#include <QStringList>
#include "dataDefine.h"
namespace {
void markFlowV2NodeState(
const FlowV2Graph& graph,
const QString& nodeId,
int state)
{
if (!graph.nodes.contains(nodeId)) {
return;
}
modelObject* obj = graph.nodes.value(nodeId).legacyObject;
if (!obj || obj->isDestroyPrepared()) {
return;
}
// setmodelState 内部会调用 setNodeShowColor()
obj->setmodelState(state);
}
void clearFlowV2NodeHighlight(
const FlowV2Graph& graph,
const QString& nodeId)
{
if (nodeId.isEmpty()) {
return;
}
if (!graph.nodes.contains(nodeId)) {
return;
}
modelObject* obj = graph.nodes.value(nodeId).legacyObject;
if (!obj || obj->isDestroyPrepared()) {
return;
}
obj->setmodelState(STATE_FINISHED);
}
}
//bool FlowV2Runtime::start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState)
//{
// m_graph = graph;
// m_runtimeState = &runtimeState;
// m_lastError.clear();
// m_pendingEvents.clear();
// m_branches.clear();
// m_finishedEmitted = false;
//
// const FlowV2ValidateResult validateResult = m_graph.validateStage1Dag();
// if (!validateResult.ok) {
// fail(validateResult.error);
// return false;
// }
//
// m_startNodeId = m_graph.findStartNodeId();
// m_endNodeId = m_graph.findEndNodeId();
//
// m_rootContext = m_runtimeState->createRootContext();
//
// const QVector<FlowV2Edge> startOutputs = m_graph.outgoingEdges(m_startNodeId);
//
// qDebug() << "start runId =" << m_rootContext.runId
// << "initialBranchCount =" << startOutputs.size();
//
// int branchIndex = 0;
//
// for (const FlowV2Edge& edge : startOutputs) {
// ++branchIndex;
//
// FlowContext branchContext = m_runtimeState->createBranchContext(m_rootContext, branchIndex);
// if (!branchContext.isValid()) {
// fail(QStringLiteral("failed to create branch context."));
// return false;
// }
//
// FlowV2Branch branch;
// branch.branchId = branchContext.tokenId;
// branch.context = branchContext;
// branch.currentIndex = 0;
// branch.state = FlowV2BranchState::Pending;
// branch.nodePath.push_back(m_startNodeId);
// branch.nodePath.push_back(edge.targetNodeId);
//
// m_branches.insert(branch.branchId, branch);
//
// qDebug() << "initial branch" << branch.branchId
// << "edge =" << edge.sourceNodeId << "->" << edge.targetNodeId;
//
// FlowV2Event event;
// event.context = branchContext;
// event.sourceNodeId = edge.sourceNodeId;
// event.targetNodeId = edge.targetNodeId;
// event.sourcePort = edge.sourcePort;
// event.targetPort = edge.targetPort;
// event.value = QVariant();
//
// enqueueEvent(event);
// }
//
// // root token 只表示本次 run 的启动,不参与 End barrier。
// m_runtimeState->finishToken(m_rootContext);
//
// runEventLoop();
// tryFinishRun();
//
// return m_lastError.isEmpty();
//}
bool FlowV2Runtime::startInternal(
const FlowV2Graph& graph,
FlowRuntimeState& runtimeState,
const FlowContext& rootContext,
bool isSubFlow)
{
m_graph = graph;
// add by Momo 2026.06.02
// 新一轮 FlowV2 运行开始前清空上一次数据流展示缓存。
clearDataFlowDisplayRecords();
m_isContinuousManualLoop = m_graph.hasContinuousManualLoopRegion(); // 添加动态回边
qDebug() << "continuousManualLoop ="
<< m_isContinuousManualLoop;
m_runtimeState = &runtimeState;
m_rootContext = rootContext;
m_isSubFlow = isSubFlow;
m_lastError.clear();
m_pendingEvents.clear();
m_branches.clear();
m_finishedEmitted = false;
m_pendingNodes.clear(); // 清理异步
m_stopRequested = false;
const FlowV2ValidateResult validateResult = m_graph.validateStage1Dag();
if (!validateResult.ok) {
fail(validateResult.error);
return false;
}
m_startNodeId = m_graph.findStartNodeId();
m_endNodeId = m_graph.findEndNodeId();
const QVector<FlowV2Edge> startOutputs = m_graph.outgoingEdges(m_startNodeId);
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "start runId =" << m_rootContext.runId
<< "frameId =" << m_rootContext.frameId
<< "parentFrameId =" << m_rootContext.parentFrameId
<< "initialBranchCount =" << startOutputs.size();
int branchIndex = 0;
for (const FlowV2Edge& edge : startOutputs) {
++branchIndex;
FlowContext branchContext = m_runtimeState->createBranchContext(m_rootContext, branchIndex);
if (!branchContext.isValid()) {
fail(QStringLiteral("failed to create branch context."));
return false;
}
FlowV2Branch branch;
branch.branchId = branchContext.tokenId;
branch.context = branchContext;
branch.currentIndex = 0;
branch.state = FlowV2BranchState::Pending;
branch.nodePath.push_back(m_startNodeId);
branch.nodePath.push_back(edge.targetNodeId);
m_branches.insert(branch.branchId, branch);
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "initial branch" << branch.branchId
<< "frameId =" << branchContext.frameId
<< "edge =" << edge.sourceNodeId << "->" << edge.targetNodeId;
FlowV2Event event;
event.context = branchContext;
event.sourceNodeId = edge.sourceNodeId;
event.targetNodeId = edge.targetNodeId;
event.sourcePort = edge.sourcePort;
event.targetPort = edge.targetPort;
event.value = QVariant();
enqueueEvent(event);
}
// root token 只表示当前 frame 的启动,不参与本 frame 的 End barrier。
m_runtimeState->finishToken(m_rootContext);
// 单步模式下:第一次点击只完成 Start,并把 Start 的下游节点标记为待执行。
// 注意:不要 runEventLoop,否则 Start 后面的节点会在第一次点击时直接执行。
if (m_stepMode) {
m_stepBudget = 0;
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "step start paused at first frontier. pendingEvents ="
<< m_pendingEvents.size();
return m_lastError.isEmpty();
}
runEventLoop();
tryFinishRun();
return m_lastError.isEmpty();
}
bool FlowV2Runtime::start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState)
{
FlowContext rootContext = runtimeState.createRootContext();
return startInternal(graph, runtimeState, rootContext, false);
}
bool FlowV2Runtime::startSubFlow(
const FlowV2Graph& graph,
FlowRuntimeState& runtimeState,
const FlowContext& parentContext,
const QString& callerNodeId)
{
FlowContext childContext =
runtimeState.createChildFrameContext(parentContext, callerNodeId);
if (!childContext.isValid()) {
m_lastError = QStringLiteral("failed to create child frame context. caller=%1")
.arg(callerNodeId);
runtimeState.markFailed(m_lastError);
return false;
}
return startInternal(graph, runtimeState, childContext, true);
}
// 单步执行
bool FlowV2Runtime::stepOnce()
{
if (!m_runtimeState) {
return false;
}
if (m_stopRequested || m_finishedEmitted || !m_lastError.isEmpty()) {
return false;
}
// 正在等异步节点完成时,不强行推进。
// 等异步 completePendingNode 后,会把后续事件留在队列里。
if (m_pendingEvents.isEmpty()) {
tryFinishRun();
return m_lastError.isEmpty();
}
m_stepMode = true;
// 单步语义:一次点击只执行一个真实节点。
// 不再按当前 pendingEvents 数量执行一批,否则同步节点会一闪而过。
m_stepBudget = 1;
runEventLoop();
return m_lastError.isEmpty();
}
bool FlowV2Runtime::continueRun()
{
if (!m_runtimeState) {
return false;
}
if (m_stopRequested || m_finishedEmitted || !m_lastError.isEmpty()) {
return false;
}
m_stepMode = false;
m_stepBudget = -1;
runEventLoop();
return m_lastError.isEmpty();
}
void FlowV2Runtime::enqueueEvent(const FlowV2Event& event)
{
if (!m_runtimeState) {
return;
}
if (m_stopRequested || m_finishedEmitted) {
return;
}
registerExpectedInput(event);
++m_runtimeState->currentRun.pendingEvents;
m_pendingEvents.enqueue(event);
// 单步模式下,队列里的 Complete 事件代表“下一步待执行节点”。
// Ignore/Skip 分支不应该高亮。
if (m_stepMode && event.status == FlowSignalStatus::Complete) {
markFlowV2NodeState(m_graph, event.targetNodeId, STATE_PAUSE);
}
}
void FlowV2Runtime::registerExpectedInput(const FlowV2Event& event)
{
if (!m_runtimeState) {
return;
}
if (!event.context.isValid()) {
return;
}
if (event.targetNodeId.isEmpty()) {
return;
}
// End 不走普通 join。
// End 的收口由 token active count 处理。
if (event.targetNodeId == m_endNodeId) {
return;
}
if (isTerminalSinkNode(event.targetNodeId)) {
return;
}
const FlowJoinKey joinKey = makeJoinKey(event);
FlowJoinState& joinState = m_runtimeState->currentRun.pendingJoins[joinKey];
const FlowEdgeKey edgeKey = makeEdgeKey(event);
joinState.expectedInputs.insert(edgeKey, true);
qDebug() << "[FlowV2] register expected input node =" << event.targetNodeId
<< "edge =" << edgeKey.fromNodeId << ":" << edgeKey.fromPort
<< "->" << edgeKey.toNodeId << ":" << edgeKey.toPort
<< "runId =" << event.context.runId
<< "frameId =" << event.context.frameId
<< "joinBatchId =" << event.context.joinBatchId
<< "iteration =" << event.context.iteration
<< "expected =" << joinState.expectedInputs.size();
}
void FlowV2Runtime::runEventLoop()
{
if (m_runningEventLoop) {
m_eventLoopRerunRequested = true;
return;
}
m_runningEventLoop = true;
do {
m_eventLoopRerunRequested = false;
while (!m_pendingEvents.isEmpty()) {
if (m_stopRequested || m_finishedEmitted || !m_runtimeState) {
m_pendingEvents.clear();
m_runningEventLoop = false;
return;
}
FlowV2Event event = m_pendingEvents.dequeue();
if (m_runtimeState && m_runtimeState->currentRun.pendingEvents > 0) {
--m_runtimeState->currentRun.pendingEvents;
}
/*dispatchEvent(event);*/
// 加入单步执行
const bool executedNode = dispatchEvent(event);
if (m_stepMode && executedNode && m_stepBudget > 0) {
--m_stepBudget;
if (m_stepBudget <= 0) {
m_runningEventLoop = false;
if (!m_stopRequested
&& !m_finishedEmitted
&& m_runtimeState
&& m_lastError.isEmpty()) {
tryFinishRun();
}
return;
}
}
if (!m_lastError.isEmpty()
|| m_stopRequested
|| m_finishedEmitted
|| !m_runtimeState) {
m_runningEventLoop = false;
return;
}
}
} while (m_eventLoopRerunRequested
&& !m_stopRequested
&& !m_finishedEmitted
&& m_runtimeState
&& m_lastError.isEmpty());
m_runningEventLoop = false;
if (!m_stopRequested
&& !m_finishedEmitted
&& m_runtimeState
&& m_lastError.isEmpty()) {
tryFinishRun();
}
}
bool FlowV2Runtime::dispatchEvent(const FlowV2Event& event)
{
if (!m_runtimeState) {
fail(QStringLiteral("runtime state is null."));
return false;
}
if (!event.context.isValid()) {
fail(QStringLiteral("invalid event context."));
return false;
}
if (!m_graph.nodes.contains(event.targetNodeId)) {
fail(QStringLiteral("target node not found: %1").arg(event.targetNodeId));
return false;
}
if (!m_branches.contains(event.context.tokenId)) {
fail(QStringLiteral("branch token not found: %1").arg(event.context.tokenId));
return false;
}
FlowV2Branch& branch = m_branches[event.context.tokenId];
branch.state = FlowV2BranchState::Running;
// 这里只作为调试路径记录,不再作为合法性判断依据。
if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
branch.nodePath.push_back(event.targetNodeId);
}
/*FlowV2ExecuteInput input;
const bool ready = prepareNodeExecution(event, input);
if (!ready) {
return;
}
executeNode(event, input);*/
// add by Momo 2026.06.02
// FlowV2 不再走旧 notifyObject_receive/send_to_next_modelobject,
// 因此这里把事件补记到 modelObject 的数据流展示缓存。
recordDataFlowForDisplay(event);
FlowV2ExecuteInput input;
const FlowV2RunDecision decision = prepareNodeExecution(event, input);
if (decision == FlowV2RunDecision::Wait) {
return false;
}
if (decision == FlowV2RunDecision::Fail) {
fail(QStringLiteral("FlowV2 input failed at node: %1").arg(event.targetNodeId));
return false;
}
if (decision == FlowV2RunDecision::Skip) {
propagateIgnoreEvent(event);
return false;
}
// 多输入 join 场景下,最后到达的 event 可能是 Ignore,
// 但真正应该继续向下走的是某条 Complete 输入对应的 token。
// 所以这里用 input.context 修正执行上下文。
FlowV2Event executeEvent = event;
executeEvent.context = input.context;
executeNode(executeEvent, input);
return true; // 添加单步执行用
}
void FlowV2Runtime::executeNode(const FlowV2Event& event, const FlowV2ExecuteInput& input)
{
const FlowV2Node node = m_graph.nodes.value(event.targetNodeId);
// 单步运行时绑定高亮
if (m_stepMode &&
!m_lastHighlightedNodeId.isEmpty() &&
m_lastHighlightedNodeId != node.nodeId) {
clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
}
markFlowV2NodeState(m_graph, node.nodeId, STATE_EXECUTING);
if (m_stepMode) {
m_lastHighlightedNodeId = node.nodeId;
}
qDebug() << "[FlowV2] execute node =" << node.nodeId
<< "type =" << node.type
<< "branch =" << event.context.tokenId;
FlowV2ExecuteResult result =
FlowV2ExecutorRegistry::instance().execute(node, input);
// 单步模式下,插件内部可能把自己置 FINISHED。
// Runtime 要在插件执行结束后重新把“刚刚执行的节点”点亮。
if (m_stepMode && result.status != FlowSignalStatus::Failed) {
markFlowV2NodeState(m_graph, node.nodeId, STATE_EXECUTING);
m_lastHighlightedNodeId = node.nodeId;
}
if (result.status == FlowSignalStatus::Failed) {
markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
const QString reason = result.errorMessage.isEmpty()
? QStringLiteral("FlowV2 node execute failed: %1").arg(node.nodeId)
: result.errorMessage;
fail(reason);
return;
}
if (result.status == FlowSignalStatus::Pending) {
suspendToken(event, input);
return;
}
if (result.status == FlowSignalStatus::Ignore) {
if (!m_stepMode) {
markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
}
propagateIgnoreEvent(event);
return;
}
if (!m_stepMode) {
markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
}
completeNode(event, result);
}
void FlowV2Runtime::completeNode(
const FlowV2Event& event,
const FlowV2ExecuteResult& result)
{
if (event.targetNodeId == m_endNodeId) {
reachEnd(event);
return;
}
const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(event.targetNodeId);
if (outputs.isEmpty()) {
if (m_graph.isTerminalSinkNode(event.targetNodeId)) {
reachTerminal(event);
return;
}
fail(QStringLiteral("node has no output and is not terminal: %1").arg(event.targetNodeId));
return;
}
const QVector<FlowV2Edge> selectedOutputs =
selectedOutputEdges(event.targetNodeId, result);
/*if (outputs.isEmpty()) {
fail(QStringLiteral("node has no output and is not endtool: %1").arg(event.targetNodeId));
return;
}*/
// 添加多终点
if (isTerminalSinkNode(event.targetNodeId)) {
reachTerminal(event);
return;
}
if (selectedOutputs.isEmpty()) {
fail(QStringLiteral("node selected no output: %1").arg(event.targetNodeId));
return;
}
// 未选中的输出边进入 Ignore 传播。
for (const FlowV2Edge& edge : outputs) {
bool selected = false;
for (const FlowV2Edge& selectedEdge : selectedOutputs) {
if (selectedEdge.edgeId == edge.edgeId) {
selected = true;
break;
}
}
if (!selected) {
// 未选的成环边代表“不进入下一轮/不继续循环”,
// 不能传播 Ignore。
if (isCycleFormingEdge(edge)) {
qDebug() << "[FlowV2] ignore unselected cycle-forming edge ="
<< edge.sourceNodeId << "->" << edge.targetNodeId
<< "edgeId =" << edge.edgeId;
continue;
}
propagateIgnoreFromEdge(edge, event.context);
}
}
// 选中输出只有一条:当前 token 继续。
if (selectedOutputs.size() == 1) {
const FlowV2Edge edge = selectedOutputs.first();
qDebug() << "[FlowV2] complete node =" << event.targetNodeId
<< "next =" << edge.targetNodeId
<< "branch =" << event.context.tokenId
<< "edgeKind =" << static_cast<int>(edge.kind);
const bool createdNewToken =
enqueueSelectedEdge(edge, event.context, result.outputValue, false);
if (!m_lastError.isEmpty()) {
return;
}
// LoopBack 会创建新 token,当前 token 到这里结束。
if (createdNewToken) {
m_runtimeState->finishToken(event.context);
auto it = m_branches.find(event.context.tokenId);
if (it != m_branches.end()) {
it.value().state = FlowV2BranchState::Finished;
}
}
return;
}
// 多条选中输出:fan-out。
qDebug() << "[FlowV2] fan-out node =" << event.targetNodeId
<< "selectedOutCount =" << selectedOutputs.size()
<< "parentBranch =" << event.context.tokenId;
for (const FlowV2Edge& edge : selectedOutputs) {
enqueueSelectedEdge(edge, event.context, result.outputValue, true);
if (!m_lastError.isEmpty()) {
return;
}
}
m_runtimeState->finishToken(event.context);
auto it = m_branches.find(event.context.tokenId);
if (it != m_branches.end()) {
it.value().state = FlowV2BranchState::Finished;
}
}
void FlowV2Runtime::reachEnd(const FlowV2Event& event)
{
if (!m_runtimeState) {
return;
}
FlowV2Branch& branch = m_branches[event.context.tokenId];
branch.state = FlowV2BranchState::Finished;
if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
branch.nodePath.push_back(event.targetNodeId);
}
branch.currentIndex = branch.nodePath.size() - 1;
m_runtimeState->finishToken(event.context);
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "branch reach End. branch =" << event.context.tokenId
<< "frameId =" << event.context.frameId
<< "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
tryFinishRun();
}
void FlowV2Runtime::tryFinishRun()
{
if (!m_runtimeState) {
return;
}
if (m_finishedEmitted) {
return;
}
if (m_runtimeState->hasFailed()) {
return;
}
// 增加异步判断
if (!m_pendingNodes.isEmpty()) {
return;
}
// 只判断当前 frame/root token 派生出来的 child tokens。
if (m_runtimeState->activeChildTokenCount(m_rootContext.tokenId) > 0) {
return;
}
if (m_isSubFlow) {
m_runtimeState->finishFrame(m_rootContext);
m_finishedEmitted = true;
qDebug() << "[FlowV2][subflow] all branches finished. runId ="
<< m_rootContext.runId
<< "frameId =" << m_rootContext.frameId
<< "parentFrameId =" << m_rootContext.parentFrameId;
if (m_finishCallback) {
m_finishCallback();
}
return;
}
m_runtimeState->requestFinish();
/*if (m_runtimeState->canFinishStage1()) {
m_finishedEmitted = true;
qDebug() << "[FlowV2][main] all branches finished. runId ="
<< m_runtimeState->currentRun.runId;
if (m_finishCallback) {
m_finishCallback();
}
}*/
if (m_runtimeState->canFinishStage1()) {
m_finishedEmitted = true;
qDebug() << "[FlowV2][main] all branches finished. runId ="
<< m_runtimeState->currentRun.runId;
// FlowV2:所有分支都已经到达 EndTool 后,才允许 EndTool 真正结束主流程。
if (m_graph.nodes.contains(m_endNodeId)) {
modelObject* endObj = m_graph.nodes.value(m_endNodeId).legacyObject;
if (endObj && !endObj->isDestroyPrepared()) {
QMetaObject::invokeMethod(
endObj,
"finishFlowV2MainProcess",
Qt::DirectConnection
);
}
}
if (m_finishCallback) {
m_finishCallback();
}
}
}
FlowV2Edge FlowV2Runtime::edgeBetween(const QString& sourceNodeId, const QString& targetNodeId) const
{
for (const FlowV2Edge& edge : m_graph.edges) {
if (edge.sourceNodeId == sourceNodeId &&
edge.targetNodeId == targetNodeId) {
return edge;
}
}
return FlowV2Edge();
}
FlowJoinKey FlowV2Runtime::makeJoinKey(const FlowV2Event& event) const
{
FlowJoinKey key;
key.runId = event.context.runId;
key.tokenGroupId = event.context.tokenGroupId;
key.frameId = event.context.frameId;
key.joinBatchId = event.context.joinBatchId;
key.nodeId = event.targetNodeId;
key.iteration = event.context.iteration;
return key;
}
FlowEdgeKey FlowV2Runtime::makeEdgeKey(const FlowV2Event& event) const
{
FlowEdgeKey key;
key.fromNodeId = event.sourceNodeId;
key.fromPort = event.sourcePort;
key.toNodeId = event.targetNodeId;
key.toPort = event.targetPort;
key.edgeId = QStringLiteral("%1:%2->%3:%4")
.arg(event.sourceNodeId)
.arg(event.sourcePort)
.arg(event.targetNodeId)
.arg(event.targetPort);
return key;
}
void FlowV2Runtime::finishJoinedInputTokens(const FlowJoinState& joinState, quint64 keepTokenId)
{
if (!m_runtimeState) {
return;
}
for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
const FlowContext ctx = it.value().context;
if (ctx.tokenId == keepTokenId) {
continue;
}
m_runtimeState->finishToken(ctx);
}
}
FlowV2RunDecision FlowV2Runtime::prepareNodeExecution(
const FlowV2Event& event,
FlowV2ExecuteInput& input)
{
input.context = event.context;
input.runtimeState = m_runtimeState;
input.runtime = this; // 增加异步的功能
input.sourceNodeId = event.sourceNodeId;
input.targetNodeId = event.targetNodeId;
input.sourcePort = event.sourcePort;
input.targetPort = event.targetPort;
input.inputValue = event.value;
input.inputValuesByPort.clear();
input.inputValues.clear();
if (isTerminalSinkNode(event.targetNodeId)) {
input.inputValuesByPort.insert(event.targetPort, event.value);
FlowV2InputValue item;
item.sourceNodeId = event.sourceNodeId;
item.sourcePort = event.sourcePort;
item.targetNodeId = event.targetNodeId;
item.targetPort = event.targetPort;
item.value = event.value;
input.inputValues.push_back(item);
if (event.status == FlowSignalStatus::Failed) {
return FlowV2RunDecision::Fail;
}
if (event.status == FlowSignalStatus::Ignore) {
return FlowV2RunDecision::Skip;
}
return FlowV2RunDecision::Run;
}
// End 是特殊 barrier。
if (event.targetNodeId == m_endNodeId) {
input.inputValuesByPort.insert(event.targetPort, event.value);
if (event.status == FlowSignalStatus::Failed) {
return FlowV2RunDecision::Fail;
}
if (event.status == FlowSignalStatus::Ignore) {
return FlowV2RunDecision::Skip;
}
return FlowV2RunDecision::Run;
}
const FlowJoinKey joinKey = makeJoinKey(event);
FlowJoinState& joinState = m_runtimeState->currentRun.pendingJoins[joinKey];
const FlowEdgeKey currentEdgeKey = makeEdgeKey(event);
// 兜底:
// 正常情况下 expectedInputs 已经在 enqueueEvent() 里注册。
// 如果某个事件绕过 enqueueEvent 直接 dispatch,这里也能补上。
if (!joinState.expectedInputs.contains(currentEdgeKey)) {
joinState.expectedInputs.insert(currentEdgeKey, true);
}
FlowInputRecord record;
record.status = event.status;
record.context = event.context;
record.data = event.value;
record.arrivedTime = QDateTime::currentDateTime();
// 1. 当前 event 登记 arrived
joinState.arrivedInputs.insert(currentEdgeKey, record);
// 2. 先等已经 registerExpectedInput() 明确注册过的输入
if (!joinState.isReady()) {
return FlowV2RunDecision::Wait;
}
// 3. 再判断是否还有“同轮可能到达”的静态候选输入
// 但反馈输入不阻塞。
if (hasMissingInputThatCanStillArrive(event, joinState)) {
qDebug() << "[FlowV2] wait active upstream node =" << event.targetNodeId
<< "arrived =" << joinState.arrivedInputs.size()
<< "joinBatchId =" << event.context.joinBatchId
<< "iteration =" << event.context.iteration;
return FlowV2RunDecision::Wait;
}
if (!joinState.isReady()) {
qDebug() << "[FlowV2] wait activated inputs node =" << event.targetNodeId
<< "arrived =" << joinState.arrivedInputs.size()
<< "expected =" << joinState.expectedInputs.size()
<< "joinBatchId =" << event.context.joinBatchId
<< "iteration =" << event.context.iteration;
return FlowV2RunDecision::Wait;
}
bool hasComplete = false;
bool hasFailed = false;
bool hasIgnore = false;
FlowContext firstCompleteContext;
for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
const FlowInputRecord inputRecord = it.value();
if (inputRecord.status == FlowSignalStatus::Complete) {
hasComplete = true;
if (!firstCompleteContext.isValid()) {
firstCompleteContext = inputRecord.context;
}
}
else if (inputRecord.status == FlowSignalStatus::Failed) {
hasFailed = true;
}
else if (inputRecord.status == FlowSignalStatus::Ignore) {
hasIgnore = true;
}
}
if (hasFailed) {
m_runtimeState->currentRun.pendingJoins.remove(joinKey);
qWarning() << "[FlowV2] input failed at join node =" << event.targetNodeId;
return FlowV2RunDecision::Fail;
}
if (!hasComplete && hasIgnore) {
finishJoinedInputTokens(joinState, event.context.tokenId);
m_runtimeState->currentRun.pendingJoins.remove(joinKey);
return FlowV2RunDecision::Skip;
}
for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
const FlowEdgeKey edgeKey = it.key();
const FlowInputRecord inputRecord = it.value();
/*if (inputRecord.status == FlowSignalStatus::Complete) {
input.inputValuesByPort.insert(edgeKey.toPort, inputRecord.data);
}*/
if (inputRecord.status == FlowSignalStatus::Complete) {
input.inputValuesByPort.insert(edgeKey.toPort, inputRecord.data);
FlowV2InputValue item;
item.sourceNodeId = edgeKey.fromNodeId;
item.sourcePort = edgeKey.fromPort;
item.targetNodeId = edgeKey.toNodeId;
item.targetPort = edgeKey.toPort;
item.value = inputRecord.data;
input.inputValues.push_back(item);
}
}
if (firstCompleteContext.isValid()) {
input.context = firstCompleteContext;
}
finishJoinedInputTokens(joinState, input.context.tokenId);
m_runtimeState->currentRun.pendingJoins.remove(joinKey);
qDebug() << "[FlowV2] activated join ready node =" << event.targetNodeId
<< "completeInputCount =" << input.inputValuesByPort.size()
<< "hasIgnore =" << hasIgnore
<< "keepBranch =" << input.context.tokenId
<< "joinBatchId =" << input.context.joinBatchId
<< "iteration =" << input.context.iteration;
return FlowV2RunDecision::Run;
}
QVector<FlowV2Edge> FlowV2Runtime::selectedOutputEdges(
const QString& nodeId,
const FlowV2ExecuteResult& result) const
{
const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
// selectedOutputPorts 为空:普通节点,无条件 fan-out,所有输出都走。
if (result.selectedOutputPorts.isEmpty()) {
return outputs;
}
QVector<FlowV2Edge> selected;
for (const FlowV2Edge& edge : outputs) {
if (result.selectedOutputPorts.contains(edge.sourcePort)) {
selected.push_back(edge);
}
}
return selected;
}
bool FlowV2Runtime::enqueueSelectedEdge(
const FlowV2Edge& edge,
const FlowContext& parentContext,
const QVariant& value,
bool forceNewBranch)
{
FlowContext nextContext = parentContext;
bool createdNewToken = false;
const bool runtimeLoopBack = isRuntimeLoopBackEdge(edge, parentContext);
if (runtimeLoopBack) {
nextContext = m_runtimeState->createLoopContext(
parentContext,
edge.sourcePort,
m_isContinuousManualLoop);
createdNewToken = true;
if (!nextContext.isValid()) {
fail(QStringLiteral("failed to create loop context. edge=%1, iteration=%2")
.arg(edge.edgeId)
.arg(parentContext.iteration + 1));
return false;
}
qDebug() << "[FlowV2] runtime loop-back edge =" << edge.sourceNodeId
<< "->" << edge.targetNodeId
<< "parentToken =" << parentContext.tokenId
<< "nextToken =" << nextContext.tokenId
<< "iteration =" << nextContext.iteration
<< "frameId =" << nextContext.frameId;
}
else if (forceNewBranch) {
nextContext = m_runtimeState->createBranchContext(parentContext, edge.sourcePort);
createdNewToken = true;
if (!nextContext.isValid()) {
fail(QStringLiteral("failed to create branch context. edge=%1").arg(edge.edgeId));
return false;
}
}
if (createdNewToken) {
FlowV2Branch branch;
branch.branchId = nextContext.tokenId;
branch.context = nextContext;
branch.currentIndex = 0;
branch.state = FlowV2BranchState::Pending;
branch.nodePath.push_back(edge.sourceNodeId);
branch.nodePath.push_back(edge.targetNodeId);
m_branches.insert(branch.branchId, branch);
}
FlowV2Event nextEvent;
nextEvent.context = nextContext;
nextEvent.sourceNodeId = edge.sourceNodeId;
nextEvent.targetNodeId = edge.targetNodeId;
nextEvent.sourcePort = edge.sourcePort;
nextEvent.targetPort = edge.targetPort;
nextEvent.value = value;
nextEvent.status = FlowSignalStatus::Complete;
enqueueEvent(nextEvent);
return createdNewToken;
}
void FlowV2Runtime::propagateIgnoreFromEdge(
const FlowV2Edge& edge,
const FlowContext& parentContext)
{
// Ignore 不允许进入循环边。
// 否则未选分支会把 Ignore 传进循环体,导致 active token 无法收口。
if (isCycleFormingEdge(edge)) {
qDebug() << "[FlowV2] ignore propagation skip cycle-forming edge ="
<< edge.sourceNodeId << "->" << edge.targetNodeId
<< "edgeId =" << edge.edgeId;
return;
}
if (isRuntimeLoopBackEdge(edge, parentContext)) {
qDebug() << "[FlowV2] ignore propagation skip runtime LoopBack edge ="
<< edge.sourceNodeId << "->" << edge.targetNodeId
<< "edgeId =" << edge.edgeId;
return;
}
FlowContext childContext =
m_runtimeState->createBranchContext(parentContext, edge.sourcePort);
if (!childContext.isValid()) {
fail(QStringLiteral("failed to create ignore branch context. edge=%1").arg(edge.edgeId));
return;
}
FlowV2Branch childBranch;
childBranch.branchId = childContext.tokenId;
childBranch.context = childContext;
childBranch.currentIndex = 0;
childBranch.state = FlowV2BranchState::Pending;
childBranch.nodePath.push_back(edge.sourceNodeId);
childBranch.nodePath.push_back(edge.targetNodeId);
m_branches.insert(childBranch.branchId, childBranch);
FlowV2Event ignoreEvent;
ignoreEvent.context = childContext;
ignoreEvent.sourceNodeId = edge.sourceNodeId;
ignoreEvent.targetNodeId = edge.targetNodeId;
ignoreEvent.sourcePort = edge.sourcePort;
ignoreEvent.targetPort = edge.targetPort;
ignoreEvent.status = FlowSignalStatus::Ignore;
ignoreEvent.value = QVariant();
qDebug() << "[FlowV2] ignore edge =" << edge.sourceNodeId
<< "->" << edge.targetNodeId
<< "port =" << edge.sourcePort
<< "ignoreBranch =" << childContext.tokenId
<< "parent =" << parentContext.tokenId;
enqueueEvent(ignoreEvent);
}
void FlowV2Runtime::propagateIgnoreEvent(const FlowV2Event& event)
{
if (!m_runtimeState) {
return;
}
if (event.targetNodeId == m_endNodeId) {
m_runtimeState->finishToken(event.context);
auto it = m_branches.find(event.context.tokenId);
if (it != m_branches.end()) {
it.value().state = FlowV2BranchState::Finished;
}
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "ignore branch reach End. branch =" << event.context.tokenId
<< "frameId =" << event.context.frameId
<< "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
tryFinishRun();
return;
}
const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(event.targetNodeId);
QVector<FlowV2Edge> normalOutputs;
for (const FlowV2Edge& edge : outputs) {
// Ignore 不进入任何结构成环边。
if (isCycleFormingEdge(edge)) {
qDebug() << "[FlowV2] ignore propagation skip cycle-forming edge ="
<< edge.sourceNodeId << "->" << edge.targetNodeId
<< "edgeId =" << edge.edgeId;
continue;
}
normalOutputs.push_back(edge);
}
if (normalOutputs.isEmpty()) {
m_runtimeState->finishToken(event.context);
auto it = m_branches.find(event.context.tokenId);
if (it != m_branches.end()) {
it.value().state = FlowV2BranchState::Finished;
}
qDebug() << "[FlowV2] ignore branch stopped before cycle. branch ="
<< event.context.tokenId
<< "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
tryFinishRun();
return;
}
qDebug() << "[FlowV2] propagate ignore node =" << event.targetNodeId
<< "outCount =" << normalOutputs.size()
<< "branch =" << event.context.tokenId;
if (normalOutputs.size() == 1) {
const FlowV2Edge edge = normalOutputs.first();
FlowV2Event nextEvent;
nextEvent.context = event.context;
nextEvent.sourceNodeId = edge.sourceNodeId;
nextEvent.targetNodeId = edge.targetNodeId;
nextEvent.sourcePort = edge.sourcePort;
nextEvent.targetPort = edge.targetPort;
nextEvent.value = QVariant();
nextEvent.status = FlowSignalStatus::Ignore;
enqueueEvent(nextEvent);
return;
}
int childIndex = 0;
for (const FlowV2Edge& edge : normalOutputs) {
++childIndex;
FlowContext childContext = m_runtimeState->createBranchContext(event.context, childIndex);
if (!childContext.isValid()) {
fail(QStringLiteral("failed to create ignore child branch at node: %1").arg(event.targetNodeId));
return;
}
FlowV2Branch childBranch;
childBranch.branchId = childContext.tokenId;
childBranch.context = childContext;
childBranch.currentIndex = 0;
childBranch.state = FlowV2BranchState::Pending;
childBranch.nodePath.push_back(event.targetNodeId);
childBranch.nodePath.push_back(edge.targetNodeId);
m_branches.insert(childBranch.branchId, childBranch);
FlowV2Event nextEvent;
nextEvent.context = childContext;
nextEvent.sourceNodeId = edge.sourceNodeId;
nextEvent.targetNodeId = edge.targetNodeId;
nextEvent.sourcePort = edge.sourcePort;
nextEvent.targetPort = edge.targetPort;
nextEvent.value = QVariant();
nextEvent.status = FlowSignalStatus::Ignore;
enqueueEvent(nextEvent);
}
m_runtimeState->finishToken(event.context);
auto it = m_branches.find(event.context.tokenId);
if (it != m_branches.end()) {
it.value().state = FlowV2BranchState::Finished;
}
}
FlowEdgeKey FlowV2Runtime::makeEdgeKeyFromEdge(const FlowV2Edge& edge) const
{
FlowEdgeKey key;
key.fromNodeId = edge.sourceNodeId;
key.fromPort = edge.sourcePort;
key.toNodeId = edge.targetNodeId;
key.toPort = edge.targetPort;
key.edgeId = edge.edgeId;
return key;
}
bool FlowV2Runtime::sameJoinScope(const FlowContext& a, const FlowContext& b) const
{
return a.runId == b.runId
&& a.frameId == b.frameId
&& a.joinBatchId == b.joinBatchId
&& a.iteration == b.iteration;
}
bool FlowV2Runtime::canReachByNormalEdges(
const QString& fromNodeId,
const QString& targetNodeId) const
{
if (fromNodeId == targetNodeId) {
return true;
}
QSet<QString> visited;
QVector<QString> stack;
stack.push_back(fromNodeId);
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (nodeId == targetNodeId) {
return true;
}
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
if (!visited.contains(edge.targetNodeId)) {
stack.push_back(edge.targetNodeId);
}
}
}
return false;
}
bool FlowV2Runtime::canReachByAnyEdges(
const QString& fromNodeId,
const QString& targetNodeId,
const QString& ignoredEdgeId) const
{
if (fromNodeId.isEmpty() || targetNodeId.isEmpty()) {
return false;
}
if (fromNodeId == targetNodeId) {
return true;
}
QSet<QString> visited;
QVector<QString> stack;
stack.push_back(fromNodeId);
while (!stack.isEmpty()) {
const QString nodeId = stack.takeLast();
if (nodeId == targetNodeId) {
return true;
}
if (visited.contains(nodeId)) {
continue;
}
visited.insert(nodeId);
const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
for (const FlowV2Edge& edge : outputs) {
if (!ignoredEdgeId.isEmpty() && edge.edgeId == ignoredEdgeId) {
continue;
}
if (!visited.contains(edge.targetNodeId)) {
stack.push_back(edge.targetNodeId);
}
}
}
return false;
}
bool FlowV2Runtime::isCycleFormingEdge(const FlowV2Edge& edge) const
{
// source -> target 这条边如果加上后,target 还能走回 source,
// 那它在结构上就是一条成环边。
return canReachByAnyEdges(
edge.targetNodeId,
edge.sourceNodeId,
edge.edgeId);
}
QVector<FlowV2Edge> FlowV2Runtime::candidateInputEdgesForEvent(
const FlowV2Event& event) const
{
const QVector<FlowV2Edge> allInputs =
m_graph.incomingEdges(event.targetNodeId);
QVector<FlowV2Edge> candidates;
// End 不走普通 join。
if (event.targetNodeId == m_endNodeId) {
return candidates;
}
for (const FlowV2Edge& edge : allInputs) {
// LoopBack 入边代表下一轮入口,不参与本轮普通 join。
if (edge.kind == FlowV2EdgeKind::LoopBack) {
continue;
}
candidates.push_back(edge);
}
return candidates;
}
bool FlowV2Runtime::missingInputCanStillArrive(
const FlowV2Edge& missingEdge,
const FlowContext& context) const
{
// 1. 还在事件队列里的路径。
for (const FlowV2Event& pendingEvent : m_pendingEvents) {
if (!sameJoinScope(pendingEvent.context, context)) {
continue;
}
if (canReachByNormalEdges(
pendingEvent.targetNodeId,
missingEdge.sourceNodeId)) {
return true;
}
}
// 2. 已经开始执行、但进入 Pending 的路径。
// 例如 Tips / Delay / SubProcess / 运动控制。
for (auto it = m_pendingNodes.begin(); it != m_pendingNodes.end(); ++it) {
const FlowV2PendingNode& pendingNode = it.value();
if (!sameJoinScope(pendingNode.event.context, context)) {
continue;
}
if (canReachByNormalEdges(
pendingNode.nodeId,
missingEdge.sourceNodeId)) {
qDebug() << "[FlowV2] wait possible pending upstream node ="
<< missingEdge.sourceNodeId
<< "pendingNode =" << pendingNode.nodeId
<< "token =" << pendingNode.event.context.tokenId;
return true;
}
}
return false;
}
bool FlowV2Runtime::hasMissingInputThatCanStillArrive(
const FlowV2Event& event,
const FlowJoinState& joinState) const
{
const QVector<FlowV2Edge> candidates =
candidateInputEdgesForEvent(event);
for (const FlowV2Edge& edge : candidates) {
const FlowEdgeKey edgeKey = makeEdgeKeyFromEdge(edge);
if (joinState.arrivedInputs.contains(edgeKey)) {
continue;
}
// add by Momo
// 递归反馈输入不能阻塞当前轮执行。
//
// 例如:
// tipstool_2 -> globalvalue_1
// 如果 globalvalue_1 能走到 tipstool_2,
// 那 tipstool_2 是后续循环才会产生的输入,
// 当前 globalvalue_1 不应该等待它。
if (isFeedbackInputEdge(edge)) {
qDebug() << "[FlowV2] missing feedback input ignored node =" << event.targetNodeId
<< "missingEdge =" << edge.sourceNodeId << ":" << edge.sourcePort
<< "->" << edge.targetNodeId << ":" << edge.targetPort;
continue;
}
if (missingInputCanStillArrive(edge, event.context)) {
qDebug() << "[FlowV2] wait possible future input node =" << event.targetNodeId
<< "missingEdge =" << edge.sourceNodeId << ":" << edge.sourcePort
<< "->" << edge.targetNodeId << ":" << edge.targetPort;
return true;
}
}
return false;
}
bool FlowV2Runtime::isTerminalSinkNode(const QString& nodeId) const
{
return m_graph.isTerminalSinkNode(nodeId);
}
void FlowV2Runtime::reachTerminal(const FlowV2Event& event)
{
if (!m_runtimeState) {
return;
}
FlowV2Branch& branch = m_branches[event.context.tokenId];
branch.state = FlowV2BranchState::Finished;
if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
branch.nodePath.push_back(event.targetNodeId);
}
branch.currentIndex = branch.nodePath.size() - 1;
m_runtimeState->finishToken(event.context);
qDebug() << "[FlowV2]"
<< (m_isSubFlow ? "[subflow]" : "[main]")
<< "branch reach terminal. node =" << event.targetNodeId
<< "branch =" << event.context.tokenId
<< "frameId =" << event.context.frameId
<< "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
tryFinishRun();
}
bool FlowV2Runtime::wouldRevisitNode(
const FlowContext& context,
const QString& targetNodeId) const
{
auto it = m_branches.find(context.tokenId);
if (it == m_branches.end()) {
return false;
}
const FlowV2Branch& branch = it.value();
return branch.nodePath.contains(targetNodeId);
}
bool FlowV2Runtime::isRuntimeLoopBackEdge(
const FlowV2Edge& edge,
const FlowContext& context) const
{
if (edge.kind == FlowV2EdgeKind::LoopBack) {
return true;
}
return wouldRevisitNode(context, edge.targetNodeId);
}
bool FlowV2Runtime::isFeedbackInputEdge(const FlowV2Edge& edge) const
{
// edge.sourceNodeId -> edge.targetNodeId
//
// 如果 target 能再走回 source,
// 说明这条输入来自 target 后续流程,是递归/循环反馈输入。
// 它不能阻塞当前轮 target 的执行。
return canReachByAnyEdges(
edge.targetNodeId,
edge.sourceNodeId,
edge.edgeId);
}
void FlowV2Runtime::suspendToken(
const FlowV2Event& event,
const FlowV2ExecuteInput& input)
{
if (!m_runtimeState) {
return;
}
FlowV2PendingNode pending;
pending.event = event;
pending.input = input;
pending.nodeId = event.targetNodeId;
pending.startTime = QDateTime::currentDateTime();
// add by momo 2026.06.01 防止重复insert
if (m_pendingNodes.contains(event.context.tokenId)) {
qWarning() << "[FlowV2] token already pending =" << event.context.tokenId
<< "node =" << event.targetNodeId;
return;
}
m_pendingNodes.insert(event.context.tokenId, pending);
auto branchIt = m_branches.find(event.context.tokenId);
if (branchIt != m_branches.end()) {
branchIt.value().state = FlowV2BranchState::Pending;
}
auto tokenIt = m_runtimeState->currentRun.tokens.find(event.context.tokenId);
if (tokenIt != m_runtimeState->currentRun.tokens.end()) {
tokenIt.value().state = FlowTokenState::Waiting;
}
qDebug() << "[FlowV2] token pending node =" << event.targetNodeId
<< "token =" << event.context.tokenId
<< "frameId =" << event.context.frameId
<< "joinBatchId =" << event.context.joinBatchId
<< "iteration =" << event.context.iteration;
}
void FlowV2Runtime::completePendingNode(
const FlowContext& context,
const QVariant& outputValue)
{
if (!m_runtimeState) {
return;
}
if (m_stopRequested || m_finishedEmitted) {
return;
}
if (context.runId != m_runtimeState->currentRun.runId) {
qWarning() << "[FlowV2] completePendingNode ignored. stale runId ="
<< context.runId
<< "current =" << m_runtimeState->currentRun.runId;
return;
}
auto pendingIt = m_pendingNodes.find(context.tokenId);
if (pendingIt == m_pendingNodes.end()) {
qWarning() << "[FlowV2] completePendingNode ignored. token not pending ="
<< context.tokenId;
return;
}
FlowV2PendingNode pending = pendingIt.value();
m_pendingNodes.erase(pendingIt);
// 单步执行时高亮
if (!m_stepMode) {
markFlowV2NodeState(m_graph, pending.nodeId, STATE_FINISHED);
}
else {
markFlowV2NodeState(m_graph, pending.nodeId, STATE_EXECUTING);
m_lastHighlightedNodeId = pending.nodeId;
}
auto tokenIt = m_runtimeState->currentRun.tokens.find(context.tokenId);
if (tokenIt != m_runtimeState->currentRun.tokens.end()) {
tokenIt.value().state = FlowTokenState::Active;
}
auto branchIt = m_branches.find(context.tokenId);
if (branchIt != m_branches.end()) {
branchIt.value().state = FlowV2BranchState::Running;
}
// 关键:不要 fallback 到 pending.input.inputValue。
// Pending 节点输出什么,就传什么。
// Tips / Delay completePendingNode(context, QVariant()) 就表示不输出业务值。
FlowV2ExecuteResult result = FlowV2ExecuteResult::complete(outputValue);
qDebug() << "[FlowV2] complete pending node =" << pending.nodeId
<< "token =" << context.tokenId
<< "outputValid =" << outputValue.isValid()
<< "outputType =" << outputValue.typeName();
completeNode(pending.event, result);
if (m_stopRequested || m_finishedEmitted || !m_runtimeState || !m_lastError.isEmpty()) {
return;
}
// 单步模式下:异步节点完成后,只把后续事件放进队列,
// 不自动继续跑。下一次点击“单步”再推进。
if (m_stepMode) {
tryFinishRun();
return;
}
runEventLoop();
if (!m_stopRequested && !m_finishedEmitted && m_runtimeState && m_lastError.isEmpty()) {
tryFinishRun();
}
}
void FlowV2Runtime::failPendingNode(
const FlowContext& context,
const QString& reason)
{
auto pendingIt = m_pendingNodes.find(context.tokenId);
if (pendingIt != m_pendingNodes.end()) {
m_pendingNodes.erase(pendingIt);
}
fail(reason);
}
void FlowV2Runtime::fail(const QString& reason)
{
if (!m_lastError.isEmpty()) {
return;
}
m_lastError = reason;
m_pendingEvents.clear();
m_pendingNodes.clear();
if (m_runtimeState) {
m_runtimeState->markFailed(reason);
}
for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
if (it.value().state == FlowV2BranchState::Running ||
it.value().state == FlowV2BranchState::Pending) {
it.value().state = FlowV2BranchState::Failed;
}
}
qWarning() << "[FlowV2][FAILED]" << reason;
if (m_failCallback) {
m_failCallback(reason);
}
clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
m_lastHighlightedNodeId.clear();
}
void FlowV2Runtime::requestStop()
{
m_stopRequested = true;
m_pendingEvents.clear();
m_pendingNodes.clear();
if (m_runtimeState) {
for (auto it = m_runtimeState->currentRun.tokens.begin();
it != m_runtimeState->currentRun.tokens.end();
++it) {
if (it.value().state == FlowTokenState::Active ||
it.value().state == FlowTokenState::Waiting) {
it.value().state = FlowTokenState::Cancelled;
}
}
}
for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
if (it.value().state == FlowV2BranchState::Running ||
it.value().state == FlowV2BranchState::Pending) {
it.value().state = FlowV2BranchState::Finished;
}
}
qDebug() << "runtime stop requested.";
for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
const FlowV2Branch& branch = it.value();
if (!branch.nodePath.isEmpty()) {
markFlowV2NodeState(m_graph, branch.nodePath.last(), STATE_FINISHED);
}
}
clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
m_lastHighlightedNodeId.clear();
}
// add by Momo 20260.06.02 数据流展示
void FlowV2Runtime::clearDataFlowDisplayRecords()
{
QSet<modelObject*> visited;
for (auto it = m_graph.nodes.begin(); it != m_graph.nodes.end(); ++it) {
modelObject* obj = it.value().legacyObject;
if (!obj || obj->isDestroyPrepared()) {
continue;
}
if (visited.contains(obj)) {
continue;
}
visited.insert(obj);
obj->clearDataFlowDisplayCache();
}
}
void FlowV2Runtime::recordDataFlowForDisplay(const FlowV2Event& event)
{
// Ignore 是 FlowV2 的 inactive-edge 语义,不是业务数据。
// 数据流展示默认只记录真实 Complete 数据,避免未选中分支污染展示。
if (event.status != FlowSignalStatus::Complete) {
return;
}
if (!m_graph.nodes.contains(event.sourceNodeId) ||
!m_graph.nodes.contains(event.targetNodeId)) {
return;
}
modelObject* sourceObj = m_graph.nodes.value(event.sourceNodeId).legacyObject;
modelObject* targetObj = m_graph.nodes.value(event.targetNodeId).legacyObject;
if (!sourceObj || !targetObj) {
return;
}
if (sourceObj->isDestroyPrepared() || targetObj->isDestroyPrepared()) {
return;
}
sourceObj->recordDataFlowSendForDisplay(
targetObj,
event.sourcePort,
event.value);
targetObj->recordDataFlowReceiveForDisplay(
sourceObj,
event.targetPort,
event.value);
}
FlowV2Executor 负责节点业务执行入口。
Runtime 不直接处理具体插件逻辑,而是统一通过 ExecutorRegistry 调用。
#pragma once
#include <QString>
#include <QVariant>
#include <QVector>
#include <QMap>
#include <QSharedPointer>
#include "FlowV2Graph.h"
#include "FlowRuntimeState.h"
#include "qtpubliclib_global.h"
class FlowV2Runtime;
struct FlowV2InputValue
{
QString sourceNodeId;
int sourcePort = 0;
QString targetNodeId;
int targetPort = 0;
QVariant value;
};
/**
* @brief FlowV2 节点执行输入
*
* Runtime 调度到某个节点时,会把当前事件上下文整理成该结构。
*
* 注意:
* 这是新运行时的输入结构,不走旧 receive(source, channel, value)。
*/
struct FlowV2ExecuteInput
{
FlowContext context;
// add by Momo. Used by subprocess / async / future frame-aware executors.
// Do not store this pointer after executeFlowV2 returns.
FlowRuntimeState* runtimeState = nullptr;
FlowV2Runtime* runtime = nullptr;
QString sourceNodeId;
QString targetNodeId;
int sourcePort = 0;
int targetPort = 0;
QVariant inputValue;
// 多输入节点到齐后,按输入端口传给 executor。
QMap<int, QVariant> inputValuesByPort;
// add by Momo 2026.05.26
// 保留当前本轮所有 Complete 输入。
// 适合逻辑模块这种“一个输入端口,但允许多个输入流”的节点。
QVector<FlowV2InputValue> inputValues;
};
/**
* @brief FlowV2 节点执行结果
*
* 第一阶段主要使用:
* - Complete
* - Failed
*
* selectedOutputPorts 为后续 Judge / Loop 预留。
*/
struct FlowV2ExecuteResult
{
FlowSignalStatus status = FlowSignalStatus::Complete;
QVariant outputValue;
QVector<int> selectedOutputPorts;
QString errorMessage;
static FlowV2ExecuteResult complete(const QVariant& output = QVariant())
{
FlowV2ExecuteResult result;
result.status = FlowSignalStatus::Complete;
result.outputValue = output;
return result;
}
static FlowV2ExecuteResult failed(const QString& reason)
{
FlowV2ExecuteResult result;
result.status = FlowSignalStatus::Failed;
result.errorMessage = reason;
return result;
}
static FlowV2ExecuteResult pending()
{
FlowV2ExecuteResult result;
result.status = FlowSignalStatus::Pending;
return result;
}
};
/**
* @brief FlowV2 节点执行器接口
*
* 后续如果不想改插件类本体,可以单独注册 executor。
* 例如:
* - FlowV2TipsExecutor
* - FlowV2ComputeExecutor
* - FlowV2JudgeExecutor
*/
class QTPUBLICLIB_EXPORT IFlowV2NodeExecutor
{
public:
virtual ~IFlowV2NodeExecutor() = default;
virtual QString type() const = 0;
virtual FlowV2ExecuteResult execute(
const FlowV2Node& node,
const FlowV2ExecuteInput& input
) = 0;
};
/**
* @brief FlowV2 执行器注册表
*
* 第一优先级:
* - 如果注册了 type 对应 executor,则调用 executor。
*
* 第二优先级:
* - 如果 node.legacyObject 存在,则调用 legacyObject->executeFlowV2(input)。
*
* 第三优先级:
* - 默认模拟执行,直接 Complete。
*/
class QTPUBLICLIB_EXPORT FlowV2ExecutorRegistry
{
public:
static FlowV2ExecutorRegistry& instance();
void registerExecutor(const QSharedPointer<IFlowV2NodeExecutor>& executor);
bool hasExecutor(const QString& type) const;
FlowV2ExecuteResult execute(
const FlowV2Node& node,
const FlowV2ExecuteInput& input
);
private:
FlowV2ExecutorRegistry() = default;
private:
QMap<QString, QSharedPointer<IFlowV2NodeExecutor>> m_executors;
};
#include "FlowV2Executor.h"
#include "dataDefine.h"
#include <QDebug>
FlowV2ExecutorRegistry& FlowV2ExecutorRegistry::instance()
{
static FlowV2ExecutorRegistry registry;
return registry;
}
void FlowV2ExecutorRegistry::registerExecutor(const QSharedPointer<IFlowV2NodeExecutor>& executor)
{
if (!executor) {
return;
}
const QString type = executor->type();
if (type.isEmpty()) {
return;
}
m_executors.insert(type, executor);
}
bool FlowV2ExecutorRegistry::hasExecutor(const QString& type) const
{
return m_executors.contains(type);
}
FlowV2ExecuteResult FlowV2ExecutorRegistry::execute(
const FlowV2Node& node,
const FlowV2ExecuteInput& input)
{
if (m_executors.contains(node.type)) {
return m_executors.value(node.type)->execute(node, input);
}
if (node.legacyObject) {
return node.legacyObject->executeFlowV2(input);
}
qDebug() << "[FlowV2] simulate execute node =" << node.nodeId
<< "type =" << node.type;
return FlowV2ExecuteResult::complete(input.inputValue);
}
更多推荐



所有评论(0)