零代码平台涉及 顺序 / 选择 / 循环的功能。

这次 FlowV2 Runtime 的改造,主要是为了解决旧流程运行方式在复杂图结构下不可靠的问题。

旧逻辑更像是“模块执行完以后直接通知下一个模块”,适合简单单链路流程。但是一旦流程里出现以下情况,就会很容易出问题:

  1. 一个 Start 后面分出多条分支;
  2.  多条分支最后汇合到同一个模块;
  3.  判断模块只选择其中一条输出,其余分支不能真的执行;
  4.  循环流程中同一个模块会被多轮反复触发;
  5. Tips、Delay、位置控制、归零、子流程等模块不是立即结束,而是要等待外部回调;
  6. 图表模块这类节点只消费数据,不一定要继续连 EndTool;
  7. EndTool 收到一次信号不代表整个主流程结束,必须等所有活跃分支都结束。

因此单纯替换某几个模块调用是无法达成设想的需求的。考虑将问题从“模块互相通知”抽象为“有向图上的状态流转”。在明确业务问题后,进一步选择了图结构建模、可达性分析、SCC 强连通分量、token 账本和事件队列调度等方法,将流程执行从旧的递归式模块调用,改造成 Runtime 统一调度的状态机模型。

所以这次的核心思路是:  

不要再把流程运行理解成“模块互相调用”,而是理解成“一个 Runtime 在调度一批带上下文的事件”。

也就是说,每次有信号从一条边流向一个节点,Runtime 都会把它包装成一个事件,然后统一进入事件队列,由 Runtime 判断:

  • - 这个节点现在能不能执行;
  • - 输入是不是到齐;
  • - 当前信号属于哪一次运行;
  • - 属于哪条分支 token;
  • - 属于哪一轮循环;
  • - 属于哪个子流程 frame;
  • - 如果是未选中分支,是否只做账本传播;
  • - 如果是异步节点,是否要暂停当前 token;
  • - 如果到达终点,是否真的可以结束整个流程。

2. 总体设计思路

FlowV2 的整体设计是先将旧流程对象“拓印”为一张有向图。图结构校验时,普通可达性问题通过栈式图遍历解决。

例如:判断节点主要通过DFS/BFS识别是否能从 Start 到达、是否最终能到达 EndTool 或 chart 这类 terminal sink;循环区域则主要通过 SCC(强连通分量)识别。

也就是说,DFS/BFS 这类遍历用于回答“路径是否可达”,SCC 用于回答“哪些节点处在同一个循环区域里”。

真正运行时的 LoopBack 不再完全依赖静态预标记,而是 Runtime 根据当前 token 的 nodePath 动态判断是否回到已访问节点。

FlowV2 的整体设计可以拆成四层:

text

旧流程图对象

    ↓

FlowV2Graph:把旧模块和连线转换成统一图结构

    ↓

FlowRuntimeState:记录一次运行中的 token / frame / join / pending 等账本

    ↓

FlowV2Runtime:用事件队列驱动流程执行

    ↓

FlowV2Executor / legacyObject->executeFlowV2:真正执行插件逻辑

其中最关键的是三件事:

 2.1 用 Graph 描述“流程长什么样”

`FlowV2Graph` 不负责执行模块,只负责保存节点和边。

它关心的问题是:

  • - 有哪些节点;
  • - 有哪些边;
  • - 每条边的 source / target / port 是什么;
  • - Start 在哪里;
  • - End 在哪里;
  • - 哪些节点可以作为终点;
  • - 图结构是否合法;
  • - 节点是否从 Start 可达;
  • - 节点是否最终能到达某个终点;
  • - 是否存在循环区域。

简单来说,Graph 是流程的静态结构。

2.2 用 RuntimeState 记录“当前跑到哪里了”

`FlowRuntimeState` 是账本。

它不负责找下一个节点,也不负责执行插件,而是记录本次运行的所有上下文。

这里最重要的几个概念是:

| 概念            | 作用                     |

| ------------- | ---------------------- |

| runId         | 区分第几次点击运行,避免旧回调污染新流程   |

| tokenId       | 表示一条正在流程图上移动的执行路径      |

| parentTokenId | 表示当前 token 是从哪条父路径分出来的 |

| tokenGroupId  | 表示同一次分叉产生的一组兄弟 token   |

| frameId       | 表示当前属于哪个子流程 / 递归栈帧     |

| joinBatchId   | 表示当前是哪一次汇合批次           |

| iteration     | 表示当前循环轮次               |

| pendingJoins  | 记录多输入节点当前还在等哪些输入       |

| tokens        | 记录所有执行路径的状态            |

| frames        | 记录主流程 / 子流程调用栈         |

| leases        | 预留给异步、子流程、循环生命周期控制     |

这些字段的目的,是让 Runtime 不再只看“哪个节点发来了信号”,而是能判断:

  • 这个信号属于哪一次运行?
  • 属于哪条分支?
  • 属于哪一轮循环?
  • 属于哪个子流程?
  • 是否和其他输入属于同一次汇合?

尤其是 `joinBatchId + iteration`,是为了解决循环场景下的汇合错位问题。

例如:

A 分支循环 3 次

B 分支只执行 1 次

A 和 B 最后汇合到 C

如果没有 `joinBatchId` 和 `iteration`,C 很容易把 A 第 2 轮的输入和 B 第 1 轮的输入错误地凑成一组。  

现在每一轮循环都会创建新的 `joinBatchId`,所以不同轮次不会混在一起。

2.3 用 Runtime 统一调度“事件怎么流”

`FlowV2Runtime` 是真正的调度器。

它不让模块自己随便调下一个模块,而是统一通过:

enqueueEvent()

runEventLoop()

dispatchEvent()

prepareNodeExecution()

executeNode()

completeNode()

这一套流程向下推进。

这样做的好处是:

1. 分支可以同时存在,但每条分支都有自己的 token;

2. 多输入节点不会早执行;

3. 判断模块未选中分支不会真的执行插件;

4. 异步节点可以暂停当前 token,不阻塞其他 token;

5. 循环可以生成新 token 和新 joinBatch;

6. EndTool / chart 这类终点可以统一收口;

7. 主流程结束时可以判断所有分支是否都结束,而不是谁先到 EndTool 谁就结束。

------------------------------------------------------------------------------------------------------------------

FlowRunTimeState 是 FlowV2 的运行账本定义文件。

它不负责执行流程,也不负责找下一个节点,而是定义 Runtime 运行过程中需要保存的所有状态结构。

可以把这个文件理解为:FlowV2 的“状态表结构定义”

#pragma once

#include <QString>
#include <QVariant>
#include <QMap>
#include <QDateTime>
#include <QtGlobal>
#include <tuple>

/**

* @brief 流程信号状态

*

* 注意:

* 这些状态只表示“当前信号”的语义,

* 不能单独作为主流程是否结束的依据。
  */
enum class FlowSignalStatus
{
    Unknown = 0,

    // 正常执行完成,可以继续触发下游模块
    Complete = 1,

    // 当前路径执行失败
    Failed = 2,

    // 当前路径本轮不执行,只做账
    Ignore = 3,

    // 节点已经启动,但还没有完成。
    // 当前 token 暂停,等待外部事件恢复。
    Pending = 4
};

/**

* @brief Token 状态

*

* token 可以理解成“一条正在流程图上移动的执行路径”。
  */
enum class FlowTokenState
{
    Active = 0,
    // 当前 token 暂停,等待外部事件恢复。
    Waiting = 1,

    Finished = 2,
    Cancelled = 3
};

/**

* @brief Frame 状态

*

* frame 可以理解成“子流程 / 递归调用栈帧”。
  */
enum class FlowFrameState
{
    Active = 0,
    Returned = 1,
    Cancelled = 2
};

/**

* @brief Lease 类型

*

* lease 可以理解成“流程生命周期租约”。

* 只要 lease 没释放,主流程就不能 initial。
  */
enum class FlowLeaseType
{
    Unknown = 0,

    // QTimer / 硬件回调 / 运动完成等待
    Async = 1,

    // 子流程 / 递归等待
    Frame = 2,

    // 循环体本轮还没完成
    Loop = 3
};

/**

* @brief 流程上下文

*

* 每条流程信号都应该携带这个上下文。

*

* runId:

* 本次点击运行的 ID。

*

* tokenId:

* 当前执行路径 ID。

*

* tokenGroupId:

* 同一次分叉产生的一组兄弟 token。

* 多输入汇合时常用它来判断是不是同一批分支。

*

* frameId:

* 当前子流程 / 递归栈帧 ID。

*

* parentFrameId:

* 父级 frame。

*

* joinBatchId:

* 当前汇合批次 ID。

* 解决 A 循环多轮、B 不循环时的错位问题。

*

* iteration:

* 当前循环轮次。

*

* depth:

* 当前递归深度。

*

* eventId:

* 当前派发事件 ID。
  */
struct FlowContext
{
    quint64 runId = 0;
    quint64 tokenId = 0;
    quint64 parentTokenId = 0;   // add by Momo. FlowV2 branch token parent.
    quint64 tokenGroupId = 0;

    quint64 frameId = 0;
    quint64 parentFrameId = 0;

    quint64 joinBatchId = 0;

    int iteration = 0;
    int depth = 0;

    quint64 eventId = 0;

    bool isValid() const
    {
        return runId != 0 && tokenId != 0;
    }
};

/**

* @brief 流程边 key

*

* 表示一条具体连线:

* fromNodeId:fromPort -> toNodeId:toPort
  */
struct FlowEdgeKey
{
    QString fromNodeId;
    int fromPort = 0;

    QString toNodeId;
    int toPort = 0;

    QString edgeId;

    bool operator<(const FlowEdgeKey& other) const
    {

        return std::tie(fromNodeId, fromPort, toNodeId, toPort, edgeId)
            < std::tie(other.fromNodeId, other.fromPort, other.toNodeId, other.toPort, other.edgeId);

    }
};

/**

* @brief 一条实际传递的流程信号
  */
struct FlowSignal
{
    FlowContext context;
    FlowEdgeKey edge;

    FlowSignalStatus status = FlowSignalStatus::Unknown;

    QVariant data;
};

/**

* @brief 多输入汇合 key

*

* 用来区分:

* 同一个模块在不同 run / tokenGroup / frame / joinBatch / iteration 下的输入状态。
  */
struct FlowJoinKey
{
    quint64 runId = 0;
    quint64 tokenGroupId = 0;
    quint64 frameId = 0;
    quint64 joinBatchId = 0;

    QString nodeId;

    int iteration = 0;

    bool operator<(const FlowJoinKey& other) const
    {

        return std::tie(runId, tokenGroupId, frameId, joinBatchId, nodeId, iteration)
            < std::tie(other.runId, other.tokenGroupId, other.frameId, other.joinBatchId, other.nodeId, other.iteration);

    }
};

/**

* @brief 多输入节点的一条输入记录
  */
struct FlowInputRecord
{
    FlowSignalStatus status = FlowSignalStatus::Unknown;
    FlowContext context;
    QVariant data;

    QDateTime arrivedTime;
};

/**

* @brief 多输入节点当前上下文下的汇合状态

*

* expectedInputs:

* 当前这一次汇合应该等待哪些输入。

*

* arrivedInputs:

* 当前已经到达的输入。
  */
struct FlowJoinState
{
    QMap<FlowEdgeKey, bool> expectedInputs;
    QMap<FlowEdgeKey, FlowInputRecord> arrivedInputs;

    bool isReady() const
    {
        if (expectedInputs.isEmpty()) {

            return true;

        }

        for (auto it = expectedInputs.begin(); it != expectedInputs.end(); ++it) {

            if (!arrivedInputs.contains(it.key())) {
                return false;
            }

        }

        return true;
    }

    FlowSignalStatus reduceStatus() const
    {
        bool hasComplete = false;
        bool hasFailed = false;

        for (auto it = arrivedInputs.begin(); it != arrivedInputs.end(); ++it) {

            if (it.value().status == FlowSignalStatus::Failed) {
                hasFailed = true;
            }

            if (it.value().status == FlowSignalStatus::Complete) {
                hasComplete = true;
            }

        }

        if (hasFailed) {

            return FlowSignalStatus::Failed;

        }

        if (hasComplete) {

            return FlowSignalStatus::Complete;

        }

        return FlowSignalStatus::Ignore;
    }
};

/**

* @brief token 记录
  */
struct FlowTokenRecord
{
    quint64 tokenId = 0;
    quint64 parentTokenId = 0;   // add by Momo. 0 means root token.
    quint64 tokenGroupId = 0;
    quint64 frameId = 0;

    FlowTokenState state = FlowTokenState::Active;

    QString currentNodeId;

    QDateTime createTime;
    QDateTime finishTime;
};

/**

* @brief 子流程 / 递归 frame 记录
  */
struct FlowFrameRecord
{
    quint64 frameId = 0;
    quint64 parentFrameId = 0;

    quint64 callerTokenId = 0;

    int depth = 0;

    FlowFrameState state = FlowFrameState::Active;

    QMap<QString, QVariant> inputs;
    QMap<QString, QVariant> locals;

    QVariant returnValue;

    QDateTime createTime;
    QDateTime finishTime;
};

/**

* @brief lease 记录

*

* 用来表示:

* 现在还有一个异步任务 / 子流程 / 循环实例没结束。
  */
struct FlowLeaseRecord
{
    quint64 leaseId = 0;

    FlowLeaseType type = FlowLeaseType::Unknown;

    FlowContext context;

    QString ownerNodeId;
    QString reason;

    bool released = false;

    QDateTime createTime;
    QDateTime releaseTime;
};

/**

* @brief 单次主流程运行的完整账本
  */
struct FlowRunState
{
    quint64 runId = 0;

    bool running = false;

    bool failed = false;
    QString failureReason;

    FlowContext rootContext;

    /**

   * @brief 是否已经有 EndTool 请求结束。

   *

   * 注意:

   * finishRequested == true 不代表马上 initial。

   * 还必须等 pendingEvents / runningReceives / leases / tokens / frames 都清空。
     */
    bool finishRequested = false;

    /**

  * @brief 已经投递出去,但目标节点还没开始处理的事件数量。
    */
    int pendingEvents = 0;

    /**

  * @brief 正在执行 receive() 的节点数量。
    */
    int runningReceives = 0;

    /**

  * @brief 当前 run 下的所有 token。
    */
    QMap<quint64, FlowTokenRecord> tokens;

    /**

  * @brief 当前 run 下的所有 frame。
    */
    QMap<quint64, FlowFrameRecord> frames;

    /**

  * @brief 当前 run 下的所有 lease。
    */
    QMap<quint64, FlowLeaseRecord> leases;

    /**

  * @brief 当前 run 下所有等待中的多输入汇合状态。
    */
    QMap<FlowJoinKey, FlowJoinState> pendingJoins;

    QDateTime startTime;
    QDateTime finishTime;

    int activeTokenCount() const
    {
        int count = 0;

        for (auto it = tokens.begin(); it != tokens.end(); ++it) {

            if (it.value().state == FlowTokenState::Active) {
                ++count;
            }

        }

        return count;
    }

    int activeBranchTokenCount() const
    {
        int count = 0;

        for (auto it = tokens.begin(); it != tokens.end(); ++it) {
            const FlowTokenRecord& token = it.value();

            // parentTokenId != 0 表示这是 Start fan-out 后生成的分支 token。
            // root token 只代表本次运行入口,不参与 End barrier 等待。
            if (token.parentTokenId != 0 &&
                token.state == FlowTokenState::Active) {
                ++count;
            }
        }

        return count;
    }

    bool canFinishStage1() const
    {
        return finishRequested
            && !failed
            && activeBranchTokenCount() == 0
            && activeFrameCount() <= 1;
    }


    int activeFrameCount() const
    {
        int count = 0;

        for (auto it = frames.begin(); it != frames.end(); ++it) {

            if (it.value().state == FlowFrameState::Active) {
                ++count;
            }

        }

        return count;
    }

    int activeLeaseCount() const
    {
        int count = 0;

        for (auto it = leases.begin(); it != leases.end(); ++it) {

            if (!it.value().released) {
                ++count;
            }

        }

        return count;
    }

    bool isIdle() const
    {
        return pendingEvents == 0

            && runningReceives == 0
            && activeTokenCount() == 0
            && activeFrameCount() == 0
            && activeLeaseCount() == 0
            && pendingJoins.isEmpty();

    }

    bool canInitial() const
    {
        return finishRequested && isIdle();
    }

    void clear()
    {
        runId = 0;
        running = false;
        finishRequested = false;

        pendingEvents = 0;
        runningReceives = 0;

        tokens.clear();
        frames.clear();
        leases.clear();
        pendingJoins.clear();

        startTime = QDateTime();
        finishTime = QDateTime();

        failed = false;
        failureReason.clear();
        rootContext = FlowContext();
    }
};

/**

* @brief FlowRuntime 全局状态

*

* 建议作为 modelObjectmainprocess 的成员变量:

*

* class modelObjectmainprocess {

* private:

*  FlowRuntimeState _flowRuntime;

* };
  */
struct FlowRuntimeState
{
    FlowRunState currentRun;

    quint64 nextRunId = 0;
    quint64 nextTokenId = 0;
    quint64 nextTokenGroupId = 0;
    quint64 nextFrameId = 0;
    quint64 nextJoinBatchId = 0;
    quint64 nextEventId = 0;
    quint64 nextLeaseId = 0;

    int maxDepth = 32;
    int maxLoopIteration = 1000;
    int maxTotalEvents = 100000;

    void resetAll()
    {

        currentRun.clear();

        nextRunId = 0;
        nextTokenId = 0;
        nextTokenGroupId = 0;
        nextFrameId = 0;
        nextJoinBatchId = 0;
        nextEventId = 0;
        nextLeaseId = 0;

    }

    FlowContext createRootContext()
    {

        currentRun.clear();

        currentRun.runId = ++nextRunId;
        currentRun.running = true;
        currentRun.finishRequested = false;
        currentRun.startTime = QDateTime::currentDateTime();

        const quint64 rootTokenId = ++nextTokenId;
        const quint64 rootGroupId = ++nextTokenGroupId;
        const quint64 rootFrameId = ++nextFrameId;
        const quint64 rootJoinBatchId = ++nextJoinBatchId;

        FlowTokenRecord token;
        token.tokenId = rootTokenId;
        token.tokenGroupId = rootGroupId;
        token.frameId = rootFrameId;
        token.state = FlowTokenState::Active;
        token.createTime = QDateTime::currentDateTime();
        token.parentTokenId = 0;

        currentRun.tokens.insert(rootTokenId, token);

        FlowFrameRecord frame;
        frame.frameId = rootFrameId;
        frame.parentFrameId = 0;
        frame.depth = 0;
        frame.state = FlowFrameState::Active;
        frame.createTime = QDateTime::currentDateTime();

        currentRun.frames.insert(rootFrameId, frame);

        FlowContext ctx;
        ctx.runId = currentRun.runId;
        ctx.tokenId = rootTokenId;
        ctx.tokenGroupId = rootGroupId;
        ctx.frameId = rootFrameId;
        ctx.parentFrameId = 0;
        ctx.joinBatchId = rootJoinBatchId;
        ctx.iteration = 0;
        ctx.depth = 0;
        ctx.eventId = 0;
        ctx.parentTokenId = 0;

        currentRun.rootContext = ctx;

        return ctx;

    }


    // 处理子流程的层级关系
    FlowContext createChildFrameContext(const FlowContext& parentContext, const QString& callerNodeId)
    {
        if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
            return FlowContext();
        }

        if (parentContext.depth + 1 > maxDepth) {
            return FlowContext();
        }

        const quint64 childFrameId = ++nextFrameId;
        const quint64 childRootTokenId = ++nextTokenId;
        const quint64 childTokenGroupId = ++nextTokenGroupId;
        const quint64 childJoinBatchId = ++nextJoinBatchId;

        FlowFrameRecord frame;
        frame.frameId = childFrameId;
        frame.parentFrameId = parentContext.frameId;
        frame.callerTokenId = parentContext.tokenId;
        frame.depth = parentContext.depth + 1;
        frame.state = FlowFrameState::Active;
        frame.createTime = QDateTime::currentDateTime();

        currentRun.frames.insert(childFrameId, frame);

        FlowTokenRecord token;
        token.tokenId = childRootTokenId;
        token.parentTokenId = parentContext.tokenId;
        token.tokenGroupId = childTokenGroupId;
        token.frameId = childFrameId;
        token.state = FlowTokenState::Active;
        token.currentNodeId = callerNodeId;
        token.createTime = QDateTime::currentDateTime();

        currentRun.tokens.insert(childRootTokenId, token);

        FlowContext ctx;
        ctx.runId = parentContext.runId;
        ctx.tokenId = childRootTokenId;
        ctx.parentTokenId = parentContext.tokenId;
        ctx.tokenGroupId = childTokenGroupId;
        ctx.frameId = childFrameId;
        ctx.parentFrameId = parentContext.frameId;
        ctx.joinBatchId = childJoinBatchId;
        ctx.iteration = 0;
        ctx.depth = parentContext.depth + 1;
        ctx.eventId = ++nextEventId;

        return ctx;
    }

    void finishFrame(const FlowContext& context)
    {
        auto it = currentRun.frames.find(context.frameId);
        if (it == currentRun.frames.end()) {
            return;
        }

        if (it.value().state == FlowFrameState::Active) {
            it.value().state = FlowFrameState::Returned;
            it.value().finishTime = QDateTime::currentDateTime();
        }
    }

    int activeChildTokenCount(quint64 parentTokenId) const
    {
        int count = 0;

        for (auto it = currentRun.tokens.begin(); it != currentRun.tokens.end(); ++it) {
            const FlowTokenRecord& token = it.value();

            if (token.state != FlowTokenState::Active &&
                token.state != FlowTokenState::Waiting) {
                continue;
            }

            quint64 currentParentId = token.parentTokenId;

            while (currentParentId != 0) {
                if (currentParentId == parentTokenId) {
                    ++count;
                    break;
                }

                auto parentIt = currentRun.tokens.find(currentParentId);
                if (parentIt == currentRun.tokens.end()) {
                    break;
                }

                currentParentId = parentIt.value().parentTokenId;
            }
        }

        return count;
    }



    FlowContext createBranchContext(const FlowContext& parentContext, int branchIndex)
    {
        Q_UNUSED(branchIndex);

        if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
            return FlowContext();
        }

        FlowContext ctx = parentContext;
        ctx.parentTokenId = parentContext.tokenId;
        ctx.tokenId = ++nextTokenId;
        ctx.eventId = ++nextEventId;

        // 注意:
        // 不要重置 iteration。
        // 普通 fan-out 属于当前轮内部执行,必须继承 parentContext.iteration。
        // 不要新建 joinBatchId。
        // 同一轮 fan-out 后再 join,应该仍然属于同一批 joinBatchId。

        FlowTokenRecord token;
        token.tokenId = ctx.tokenId;
        token.parentTokenId = ctx.parentTokenId;
        token.tokenGroupId = ctx.tokenGroupId;
        token.frameId = ctx.frameId;
        token.state = FlowTokenState::Active;
        token.createTime = QDateTime::currentDateTime();

        currentRun.tokens.insert(ctx.tokenId, token);

        return ctx;
    }

    FlowContext createLoopContext(
        const FlowContext& parentContext,
        int loopIndex,
        bool ignoreMaxLoopIteration = false)
    {
        Q_UNUSED(loopIndex);

        if (!parentContext.isValid() || parentContext.runId != currentRun.runId) {
            return FlowContext();
        }

        if (!ignoreMaxLoopIteration &&
            parentContext.iteration + 1 > maxLoopIteration) {
            return FlowContext();
        }

        FlowContext ctx = parentContext;
        ctx.parentTokenId = parentContext.tokenId;
        ctx.tokenId = ++nextTokenId;
        ctx.eventId = ++nextEventId;

        ctx.iteration = parentContext.iteration + 1;
        ctx.joinBatchId = ++nextJoinBatchId;

        FlowTokenRecord token;
        token.tokenId = ctx.tokenId;
        token.parentTokenId = ctx.parentTokenId;
        token.tokenGroupId = ctx.tokenGroupId;
        token.frameId = ctx.frameId;
        token.state = FlowTokenState::Active;
        token.createTime = QDateTime::currentDateTime();

        currentRun.tokens.insert(ctx.tokenId, token);

        return ctx;
    }

    void finishToken(const FlowContext& context)
    {
        auto it = currentRun.tokens.find(context.tokenId);
        if (it == currentRun.tokens.end()) {
            return;
        }

        if (it.value().state == FlowTokenState::Active) {
            it.value().state = FlowTokenState::Finished;
            it.value().finishTime = QDateTime::currentDateTime();
        }
    }

    void cancelToken(const FlowContext& context)
    {
        auto it = currentRun.tokens.find(context.tokenId);
        if (it == currentRun.tokens.end()) {
            return;
        }

        if (it.value().state == FlowTokenState::Active) {
            it.value().state = FlowTokenState::Cancelled;
            it.value().finishTime = QDateTime::currentDateTime();
        }
    }

    void requestFinish()
    {
        currentRun.finishRequested = true;
        currentRun.running = false;
        currentRun.finishTime = QDateTime::currentDateTime();
    }

    void markFailed(const QString& reason)
    {
        currentRun.failed = true;
        currentRun.failureReason = reason;
        currentRun.finishRequested = true;
        currentRun.running = false;
        currentRun.finishTime = QDateTime::currentDateTime();

        for (auto it = currentRun.tokens.begin(); it != currentRun.tokens.end(); ++it) {
            if (it.value().state == FlowTokenState::Active ||
                it.value().state == FlowTokenState::Waiting) {
                it.value().state = FlowTokenState::Cancelled;
                it.value().finishTime = currentRun.finishTime;
            }
        }
    }

    bool hasFailed() const
    {
        return currentRun.failed;
    }

    int activeBranchTokenCount() const
    {
        return currentRun.activeBranchTokenCount();
    }

    bool canFinishStage1() const
    {
        return currentRun.canFinishStage1();
    }

};

FlowV2Graph 负责保存和校验流程图结构。

它不负责运行流程,只负责描述:

  • 有哪些节点?
  • 有哪些边?
  • Start 在哪里?
  • End 在哪里?
  • 哪些节点能到终点?
  • 哪些区域是循环?
#pragma once

#include <QString>
#include <QVector>
#include <QMap>
#include <QVariantMap>
#include <QSet>

#include "FlowRuntimeState.h"
#include "qtpubliclib_global.h"

class modelObject;
class modelObjectprocess;

enum class FlowV2EdgeKind
{
    Normal = 0,

    // 后续循环阶段使用。
    // Stage1 不允许 LoopBack。
    LoopBack = 1
};

struct FlowV2Node
{
    QString nodeId;
    QString type;
    QString displayName;
    QVariantMap config;

    bool isStart = false;
    bool isEnd = false;

    // 只作为旧图结构来源和后续插件适配入口。
    // FlowV2Runtime Stage1 不调用 legacyObject->receive/send。
    modelObject* legacyObject = nullptr;
};

struct FlowV2Edge
{
    QString edgeId;

    QString sourceNodeId;
    int sourcePort = 0;

    QString targetNodeId;
    int targetPort = 0;

    FlowV2EdgeKind kind = FlowV2EdgeKind::Normal;
};

enum class FlowV2BranchState
{
    Pending = 0,
    Running = 1,
    Finished = 2,
    Failed = 3
};

struct FlowV2Branch
{
    quint64 branchId = 0;
    FlowContext context;

    QVector<QString> nodePath;
    int currentIndex = 0;

    FlowV2BranchState state = FlowV2BranchState::Pending;
};

struct FlowV2ValidateResult
{
    bool ok = false;
    QString error;
};

class QTPUBLICLIB_EXPORT FlowV2Graph
{
public:
    QMap<QString, FlowV2Node> nodes;
    QVector<FlowV2Edge> edges;

    QString findStartNodeId() const;
    QString findEndNodeId() const;

    QVector<FlowV2Edge> incomingEdges(const QString& nodeId) const;
    QVector<FlowV2Edge> outgoingEdges(const QString& nodeId) const;

    FlowV2ValidateResult validateStage1Dag() const;
    QVector<FlowV2Branch> buildBranchesFromStart(QString* errorMessage = nullptr) const;

    void markLoopBackEdges(); // 添加循环

    bool isTerminalSinkNode(const QString& nodeId) const; // 添加多终点

    bool hasContinuousManualLoopRegion() const; // 添加动态回边

private:
    bool hasCycle(QString* cycleNodeId = nullptr) const;
    bool hasCycleDfs(
        const QString& nodeId,
        QSet<QString>& visiting,
        QSet<QString>& visited,
        QString* cycleNodeId) const;

    QSet<QString> reachableFromStart(const QString& startId) const;
    QSet<QString> nodesCanReachEnd(const QString& endId) const;
    QSet<QString> nodesCanReachTerminal() const; // 添加多终点

    // 添加循环
    void markLoopBackEdgesDfs(
        const QString& nodeId,
        QSet<QString>& visiting,
        QSet<QString>& visited);

    QVector<QSet<QString>> stronglyConnectedComponents() const;

    void strongConnect(
        const QString& nodeId,
        int& index,
        QMap<QString, int>& indexMap,
        QMap<QString, int>& lowMap,
        QVector<QString>& stack,
        QSet<QString>& onStack,
        QVector<QSet<QString>>& components) const;

    bool canReachNode(
        const QString& fromNodeId,
        const QString& targetNodeId) const;

    // 添加动态回边
    bool isThrottleNode(const QString& nodeId) const;
    bool canReachEndTool(const QString& fromNodeId) const;

    FlowV2ValidateResult validateLoopRegions() const;
    bool componentHasCycle(const QSet<QString>& component) const;
    bool componentHasThrottleNode(const QSet<QString>& component) const;
};

class QTPUBLICLIB_EXPORT FlowV2GraphBuilder
{
public:
    static FlowV2Graph fromLegacyProcess(modelObjectprocess* process);

private:
    static int targetPortFor(modelObject* source, modelObject* target);
};

    #include "FlowV2Graph.h"
    #include "dataDefine.h"
    
    #include <QSet>
    #include <QStringList>
    #include <QDebug>
    
    namespace {
    
        bool isStartType(const QString& type)
        {
            return type.contains(QStringLiteral("starttool"), Qt::CaseInsensitive);
        }
    
        bool isEndType(const QString& type)
        {
            return type.contains(QStringLiteral("endtool"), Qt::CaseInsensitive);
        }
    
        QString nodeDisplayName(modelObject* obj)
        {
            if (!obj) {
                return QString();
            }
    
            const QString winText = obj->getWinTxt();
            if (!winText.isEmpty()) {
                return winText;
            }
    
            return obj->getIdname();
        }
    
        QString makeEdgeId(const QString& sourceId, int sourcePort, const QString& targetId, int targetPort)
        {
            return QStringLiteral("%1:%2->%3:%4")
                .arg(sourceId)
                .arg(sourcePort)
                .arg(targetId)
                .arg(targetPort);
        }
    
    } // namespace
    
    QString FlowV2Graph::findStartNodeId() const
    {
        QString result;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            if (it.value().isStart) {
                if (!result.isEmpty()) {
                    return QString();
                }
    
                result = it.key();
            }
        }
    
        return result;
    }
    
    QString FlowV2Graph::findEndNodeId() const
    {
        QString result;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            if (it.value().isEnd) {
                if (!result.isEmpty()) {
                    return QString();
                }
    
                result = it.key();
            }
        }
    
        return result;
    }
    
    QVector<FlowV2Edge> FlowV2Graph::incomingEdges(const QString& nodeId) const
    {
        QVector<FlowV2Edge> result;
    
        for (const FlowV2Edge& edge : edges) {
            if (edge.targetNodeId == nodeId) {
                result.push_back(edge);
            }
        }
    
        return result;
    }
    
    QVector<FlowV2Edge> FlowV2Graph::outgoingEdges(const QString& nodeId) const
    {
        QVector<FlowV2Edge> result;
    
        for (const FlowV2Edge& edge : edges) {
            if (edge.sourceNodeId == nodeId) {
                result.push_back(edge);
            }
        }
    
        return result;
    }
    
    FlowV2ValidateResult FlowV2Graph::validateStage1Dag() const
    {
        FlowV2ValidateResult result;
    
        const QString startId = findStartNodeId();
        if (startId.isEmpty()) {
            result.error = QStringLiteral("FlowV2 Stage1 requires exactly one starttool.");
            return result;
        }
    
       /* const QString endId = findEndNodeId();
        if (endId.isEmpty()) {
            result.error = QStringLiteral("FlowV2 Stage1 requires exactly one endtool.");
            return result;
        }*/
    
        // 添加多终点
        const QString endId = findEndNodeId();
    
        bool hasTerminalSink = false;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
    
            if (isTerminalSinkNode(nodeId)) {
                hasTerminalSink = true;
                break;
            }
        }
    
        if (!hasTerminalSink) {
            result.error = QStringLiteral("FlowV2 requires at least one terminal node, such as endtool or chart.");
            return result;
        }
    
        // 1. 所有边的端点必须存在
        // Normal 边参与 DAG 校验;LoopBack 边允许存在,但不参与普通无环校验。
        for (const FlowV2Edge& edge : edges) {
            if (!nodes.contains(edge.sourceNodeId)) {
                result.error = QStringLiteral("edge source node not found: %1").arg(edge.sourceNodeId);
                return result;
            }
    
            if (!nodes.contains(edge.targetNodeId)) {
                result.error = QStringLiteral("edge target node not found: %1").arg(edge.targetNodeId);
                return result;
            }
    
            if (edge.sourceNodeId == edge.targetNodeId) {
                result.error = QStringLiteral("self-loop is not allowed in FlowV2: %1")
                    .arg(edge.sourceNodeId);
                return result;
            }
        }
    
        // 2. Start / End 基本约束。
        if (!incomingEdges(startId).isEmpty()) {
            result.error = QStringLiteral("starttool must not have input edges.");
            return result;
        }
    
        if (outgoingEdges(startId).isEmpty()) {
            result.error = QStringLiteral("starttool must have at least one output edge.");
            return result;
        }
    
        /*if (!outgoingEdges(endId).isEmpty()) {
            result.error = QStringLiteral("endtool must not have output edges.");
            return result;
        }
    
        if (incomingEdges(endId).isEmpty()) {
            result.error = QStringLiteral("endtool must have at least one input edge.");
            return result;
        }*/
    
        // EndTool 现在不是唯一终点。
    // 如果存在 endtool,才校验它;
    // 如果流程以 chart 这类 terminal sink 结束,可以没有 endtool。
        if (!endId.isEmpty()) {
            if (!outgoingEdges(endId).isEmpty()) {
                result.error = QStringLiteral("endtool must not have output edges.");
                return result;
            }
    
            // 只有当 endtool 参与流程时,才要求它有输入。
            // 如果当前流程用 chart 作为终点,孤立/不存在的 endtool 不应该阻塞运行。
            if (incomingEdges(endId).isEmpty()) {
                bool hasOtherTerminalSink = false;
    
                for (auto it = nodes.begin(); it != nodes.end(); ++it) {
                    const QString nodeId = it.key();
    
                    if (nodeId == endId) {
                        continue;
                    }
    
                    if (isTerminalSinkNode(nodeId)) {
                        hasOtherTerminalSink = true;
                        break;
                    }
                }
    
                if (!hasOtherTerminalSink) {
                    result.error = QStringLiteral("endtool must have at least one input edge.");
                    return result;
                }
            }
        }
    
    
        // 3. 普通节点允许 N 入 M 出,但不能没有输入或没有输出。
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
            const FlowV2Node& node = it.value();
    
            if (node.isStart || node.isEnd) {
                continue;
            }
    
            const int inCount = incomingEdges(nodeId).size();
            const int outCount = outgoingEdges(nodeId).size();
    
            if (inCount < 1) {
                result.error = QStringLiteral("middle node must have at least one input: %1")
                    .arg(nodeId);
                return result;
            }
    
            if (outCount < 1 && !isTerminalSinkNode(nodeId)) {
                result.error = QStringLiteral("middle node must have at least one output: %1")
                    .arg(nodeId);
                return result;
            }
        }
    
        // 4. 必须无环。
        const FlowV2ValidateResult loopRegionResult = validateLoopRegions();
        if (!loopRegionResult.ok) {
            return loopRegionResult;
        }
    
        // 5. 所有节点必须从 Start 可达。
        /*const QSet<QString> reachable = reachableFromStart(startId);
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            if (!reachable.contains(it.key())) {
                result.error = QStringLiteral("node is not reachable from starttool: %1")
                    .arg(it.key());
                return result;
            }
        }*/
    
        const QSet<QString> reachable = reachableFromStart(startId);
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
            const FlowV2Node& node = it.value();
    
            // 如果存在 chart 等 terminal sink,孤立 endtool 可以忽略。
            if (node.isEnd &&
                incomingEdges(nodeId).isEmpty() &&
                outgoingEdges(nodeId).isEmpty()) {
                continue;
            }
    
            if (!reachable.contains(nodeId)) {
                result.error = QStringLiteral("node is not reachable from starttool: %1")
                    .arg(nodeId);
                return result;
            }
        }
    
        // 6. 所有节点必须能到 End。
       /* const QSet<QString> canReachEnd = nodesCanReachEnd(endId);
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            if (!canReachEnd.contains(it.key())) {
                result.error = QStringLiteral("node cannot reach endtool: %1")
                    .arg(it.key());
                return result;
            }
        }*/
    
        const QSet<QString> canReachTerminal = nodesCanReachTerminal();
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
            const FlowV2Node& node = it.value();
    
            if (node.isEnd &&
                incomingEdges(nodeId).isEmpty() &&
                outgoingEdges(nodeId).isEmpty()) {
                continue;
            }
    
            if (!canReachTerminal.contains(nodeId)) {
                result.error = QStringLiteral("node cannot reach terminal node: %1")
                    .arg(nodeId);
                return result;
            }
        }
    
        result.ok = true;
        return result;
    }
    
    
    QVector<FlowV2Branch> FlowV2Graph::buildBranchesFromStart(QString* errorMessage) const
    {
        QVector<FlowV2Branch> result;
    
        if (errorMessage) {
            errorMessage->clear();
        }
    
        const QString startId = findStartNodeId();
        const QString endId = findEndNodeId();
    
        if (startId.isEmpty() || endId.isEmpty()) {
            if (errorMessage) {
                *errorMessage = QStringLiteral("missing starttool or endtool.");
            }
            return result;
        }
    
        const QVector<FlowV2Edge> startOutputs = outgoingEdges(startId);
    
        for (const FlowV2Edge& firstEdge : startOutputs) {
            FlowV2Branch branch;
            branch.nodePath.push_back(startId);
    
            QSet<QString> visited;
            visited.insert(startId);
    
            FlowV2Edge currentEdge = firstEdge;
    
            while (true) {
                const QString currentNodeId = currentEdge.targetNodeId;
    
                if (!nodes.contains(currentNodeId)) {
                    if (errorMessage) {
                        *errorMessage = QStringLiteral("edge target node not found: %1").arg(currentNodeId);
                    }
                    return QVector<FlowV2Branch>();
                }
    
                if (visited.contains(currentNodeId)) {
                    if (errorMessage) {
                        *errorMessage = QStringLiteral("cycle detected in FlowV2 Stage1 branch at node: %1").arg(currentNodeId);
                    }
                    return QVector<FlowV2Branch>();
                }
    
                visited.insert(currentNodeId);
                branch.nodePath.push_back(currentNodeId);
    
                if (currentNodeId == endId) {
                    break;
                }
    
                const QVector<FlowV2Edge> outputs = outgoingEdges(currentNodeId);
                if (outputs.size() != 1) {
                    if (errorMessage) {
                        *errorMessage = QStringLiteral("middle node must have exactly one output in Stage1: %1").arg(currentNodeId);
                    }
                    return QVector<FlowV2Branch>();
                }
    
                currentEdge = outputs.first();
            }
    
            result.push_back(branch);
        }
    
        return result;
    }
    
    FlowV2Graph FlowV2GraphBuilder::fromLegacyProcess(modelObjectprocess* process)
    {
        FlowV2Graph graph;
    
        if (!process) {
            return graph;
        }
    
        const QMap<QString, modelObject*> children = process->getChild();
    
        for (auto it = children.begin(); it != children.end(); ++it) {
            modelObject* obj = it.value();
            if (!obj) {
                continue;
            }
    
            FlowV2Node node;
            node.nodeId = obj->getIdname();
            node.type = obj->getType();
            node.displayName = nodeDisplayName(obj);
            node.isStart = isStartType(node.type);
            node.isEnd = isEndType(node.type);
            node.legacyObject = obj;
    
            graph.nodes.insert(node.nodeId, node);
        }
    
        QSet<QString> edgeIds;
    
        for (auto it = children.begin(); it != children.end(); ++it) {
            modelObject* source = it.value();
            if (!source) {
                continue;
            }
    
            const QString sourceId = source->getIdname();
    
            for (auto outIt = source->nextModules.begin(); outIt != source->nextModules.end(); ++outIt) {
                const int sourcePort = outIt.key();
    
                for (modelObject* target : outIt.value()) {
                    if (!target) {
                        continue;
                    }
    
                    const QString targetId = target->getIdname();
                    const int targetPort = targetPortFor(source, target);
                    const QString edgeId = makeEdgeId(sourceId, sourcePort, targetId, targetPort);
    
                    if (edgeIds.contains(edgeId)) {
                        continue;
                    }
    
                    FlowV2Edge edge;
                    edge.edgeId = edgeId;
                    edge.sourceNodeId = sourceId;
                    edge.sourcePort = sourcePort;
                    edge.targetNodeId = targetId;
                    edge.targetPort = targetPort;
    
                    graph.edges.push_back(edge);
                    edgeIds.insert(edgeId);
                }
            }
        }
    
        /*return graph;*/
        graph.markLoopBackEdges();
    
        return graph;
    }
    
    int FlowV2GraphBuilder::targetPortFor(modelObject* source, modelObject* target)
    {
        if (!source || !target) {
            return 0;
        }
    
        for (auto it = target->inModule.begin(); it != target->inModule.end(); ++it) {
            if (it.value().contains(source)) {
                return it.key();
            }
        }
    
        return 0;
    }
    
    bool FlowV2Graph::hasCycle(QString* cycleNodeId) const
    {
        QSet<QString> visiting;
        QSet<QString> visited;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            if (hasCycleDfs(nodeId, visiting, visited, cycleNodeId)) {
                return true;
            }
        }
    
        return false;
    }
    
    bool FlowV2Graph::hasCycleDfs(
        const QString& nodeId,
        QSet<QString>& visiting,
        QSet<QString>& visited,
        QString* cycleNodeId) const
    {
        visiting.insert(nodeId);
    
        const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
    
        for (const FlowV2Edge& edge : outputs) {
            // Stage1 所有边都应该是 Normal。
            // 后续循环阶段可以选择跳过 LoopBack,再单独校验 loop 语义。
            if (edge.kind == FlowV2EdgeKind::LoopBack) {
                continue;
            }
    
            const QString nextNodeId = edge.targetNodeId;
    
            if (visiting.contains(nextNodeId)) {
                if (cycleNodeId) {
                    *cycleNodeId = nextNodeId;
                }
                return true;
            }
    
            if (!visited.contains(nextNodeId)) {
                if (hasCycleDfs(nextNodeId, visiting, visited, cycleNodeId)) {
                    return true;
                }
            }
        }
    
        visiting.remove(nodeId);
        visited.insert(nodeId);
    
        return false;
    }
    
    QSet<QString> FlowV2Graph::reachableFromStart(const QString& startId) const
    {
        QSet<QString> visited;
        QVector<QString> stack;
        stack.push_back(startId);
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
            for (const FlowV2Edge& edge : outputs) {
                if (!visited.contains(edge.targetNodeId)) {
                    stack.push_back(edge.targetNodeId);
                }
            }
        }
    
        return visited;
    }
    
    
    
    QSet<QString> FlowV2Graph::nodesCanReachEnd(const QString& endId) const
    {
        QSet<QString> visited;
        QVector<QString> stack;
        stack.push_back(endId);
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> inputs = incomingEdges(nodeId);
            for (const FlowV2Edge& edge : inputs) {
                if (edge.kind == FlowV2EdgeKind::LoopBack) {
                    continue;
                }
    
                if (!visited.contains(edge.sourceNodeId)) {
                    stack.push_back(edge.sourceNodeId);
                }
            }
        }
    
        return visited;
    }
    
    QSet<QString> FlowV2Graph::nodesCanReachTerminal() const
    {
        QSet<QString> visited;
        QVector<QString> stack;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
    
            if (isTerminalSinkNode(nodeId)) {
                stack.push_back(nodeId);
            }
        }
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> inputs = incomingEdges(nodeId);
            for (const FlowV2Edge& edge : inputs) {
                if (edge.kind == FlowV2EdgeKind::LoopBack) {
                    continue;
                }
    
                if (!visited.contains(edge.sourceNodeId)) {
                    stack.push_back(edge.sourceNodeId);
                }
            }
        }
    
        return visited;
    }
    
    
    //void FlowV2Graph::markLoopBackEdges()
    //{
    //    for (FlowV2Edge& edge : edges) {
    //        edge.kind = FlowV2EdgeKind::Normal;
    //    }
    //
    //    const QString endId = findEndNodeId();
    //    if (endId.isEmpty()) {
    //        return;
    //    }
    //
    //    const QVector<QSet<QString>> components = stronglyConnectedComponents();
    //
    //    for (const QSet<QString>& component : components) {
    //        if (component.size() <= 1) {
    //            continue;
    //        }
    //
    //        QSet<QString> exitNodes;
    //
    //        // 找循环区域里的“出口节点”:
    //        // 这个节点既有 SCC 内部边,又有 SCC 外部边能到 End。
    //        for (const FlowV2Edge& edge : edges) {
    //            if (!component.contains(edge.sourceNodeId)) {
    //                continue;
    //            }
    //
    //            if (component.contains(edge.targetNodeId)) {
    //                continue;
    //            }
    //
    //            if (canReachNode(edge.targetNodeId, endId)) {
    //                exitNodes.insert(edge.sourceNodeId);
    //            }
    //        }
    //
    //        if (exitNodes.isEmpty()) {
    //            qWarning() << "[FlowV2][Graph] loop region has no exit node. nodes =" << component;
    //            continue;
    //        }
    //
    //        // 只有出口节点留在 SCC 内部的边才是 LoopBack。
    //        // SCC 内其他边仍然是 Normal。
    //        for (FlowV2Edge& edge : edges) {
    //            if (!exitNodes.contains(edge.sourceNodeId)) {
    //                continue;
    //            }
    //
    //            if (!component.contains(edge.targetNodeId)) {
    //                continue;
    //            }
    //
    //            edge.kind = FlowV2EdgeKind::LoopBack;
    //
    //            qDebug() << "[FlowV2][Graph] mark LoopBack edge by SCC ="
    //                << edge.sourceNodeId << "->" << edge.targetNodeId
    //                << "edgeId =" << edge.edgeId;
    //        }
    //    }
    //}
    
    void FlowV2Graph::markLoopBackEdges()
    {
        // 不再用 DFS 自动猜 LoopBack。
        // LoopBack 在 Runtime 中根据当前 token 的 nodePath 动态判断。
        for (FlowV2Edge& edge : edges) {
            edge.kind = FlowV2EdgeKind::Normal;
        }
    }
    
    void FlowV2Graph::markLoopBackEdgesDfs(
        const QString& nodeId,
        QSet<QString>& visiting,
        QSet<QString>& visited)
    {
        visiting.insert(nodeId);
    
        for (FlowV2Edge& edge : edges) {
            if (edge.sourceNodeId != nodeId) {
                continue;
            }
    
            if (edge.kind == FlowV2EdgeKind::LoopBack) {
                continue;
            }
    
            const QString nextNodeId = edge.targetNodeId;
    
            if (visiting.contains(nextNodeId)) {
                edge.kind = FlowV2EdgeKind::LoopBack;
    
                qDebug() << "[FlowV2][Graph] mark LoopBack edge ="
                    << edge.sourceNodeId << "->" << edge.targetNodeId
                    << "edgeId =" << edge.edgeId;
    
                continue;
            }
    
            if (!visited.contains(nextNodeId)) {
                markLoopBackEdgesDfs(nextNodeId, visiting, visited);
            }
        }
    
        visiting.remove(nodeId);
        visited.insert(nodeId);
    }
    
    
    QVector<QSet<QString>> FlowV2Graph::stronglyConnectedComponents() const
    {
        QVector<QSet<QString>> components;
    
        QMap<QString, int> indexMap;
        QMap<QString, int> lowMap;
        QVector<QString> stack;
        QSet<QString> onStack;
    
        int index = 0;
    
        for (auto it = nodes.begin(); it != nodes.end(); ++it) {
            const QString nodeId = it.key();
    
            if (!indexMap.contains(nodeId)) {
                strongConnect(
                    nodeId,
                    index,
                    indexMap,
                    lowMap,
                    stack,
                    onStack,
                    components);
            }
        }
    
        return components;
    }
    
    void FlowV2Graph::strongConnect(
        const QString& nodeId,
        int& index,
        QMap<QString, int>& indexMap,
        QMap<QString, int>& lowMap,
        QVector<QString>& stack,
        QSet<QString>& onStack,
        QVector<QSet<QString>>& components) const
    {
        indexMap.insert(nodeId, index);
        lowMap.insert(nodeId, index);
        ++index;
    
        stack.push_back(nodeId);
        onStack.insert(nodeId);
    
        const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
    
        for (const FlowV2Edge& edge : outputs) {
            const QString nextNodeId = edge.targetNodeId;
    
            if (!indexMap.contains(nextNodeId)) {
                strongConnect(
                    nextNodeId,
                    index,
                    indexMap,
                    lowMap,
                    stack,
                    onStack,
                    components);
    
                lowMap[nodeId] = qMin(lowMap.value(nodeId), lowMap.value(nextNodeId));
            }
            else if (onStack.contains(nextNodeId)) {
                lowMap[nodeId] = qMin(lowMap.value(nodeId), indexMap.value(nextNodeId));
            }
        }
    
        if (lowMap.value(nodeId) != indexMap.value(nodeId)) {
            return;
        }
    
        QSet<QString> component;
    
        while (!stack.isEmpty()) {
            const QString current = stack.takeLast();
            onStack.remove(current);
            component.insert(current);
    
            if (current == nodeId) {
                break;
            }
        }
    
        components.push_back(component);
    }
    
    bool FlowV2Graph::canReachNode(
        const QString& fromNodeId,
        const QString& targetNodeId) const
    {
        if (fromNodeId.isEmpty() || targetNodeId.isEmpty()) {
            return false;
        }
    
        QSet<QString> visited;
        QVector<QString> stack;
        stack.push_back(fromNodeId);
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (nodeId == targetNodeId) {
                return true;
            }
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> outputs = outgoingEdges(nodeId);
            for (const FlowV2Edge& edge : outputs) {
                if (!visited.contains(edge.targetNodeId)) {
                    stack.push_back(edge.targetNodeId);
                }
            }
        }
    
        return false;
    }
    
    bool FlowV2Graph::isTerminalSinkNode(const QString& nodeId) const
    {
        if (!nodes.contains(nodeId)) {
            return false;
        }
    
        const FlowV2Node node = nodes.value(nodeId);
        const QString type = node.type.toLower();
        const QString displayName = node.displayName.toLower();
    
        if (node.isEnd) {
            return true;
        }
    
        // 图表模块是终端消费节点:
        // 它接收数据并绘图,不需要继续连接 endtool。
        if (type == QStringLiteral("chart") ||
            type.contains(QStringLiteral("chart")) 
            //|| displayName.contains(QStringLiteral("图表"))
            ) {
            return true;
        }
    
        return false;
    }
    
    //bool FlowV2Graph::isThrottleNode(const QString& nodeId) const
    //{
    //    if (!nodes.contains(nodeId)) {
    //        return false;
    //    }
    //
    //    const QString type = nodes.value(nodeId).type.toLower();
    //
    //    return type.contains(QStringLiteral("delay"))
    //        || type.contains(QStringLiteral("signal"));
    //}
    
    bool FlowV2Graph::isThrottleNode(const QString& nodeId) const
    {
        if (!nodes.contains(nodeId)) {
            return false;
        }
    
        const FlowV2Node node = nodes.value(nodeId);
        const QString type = node.type.toLower();
        const QString name = node.displayName.toLower();
    
        const auto containsThrottleKeyword = [](const QString& text) -> bool {
            return text.contains(QStringLiteral("delay"))
                || text.contains(QStringLiteral("signal"))
                || text.contains(QStringLiteral("mathgenerate"))
                || text.contains(QStringLiteral("math generate"))
                || text.contains(QStringLiteral("point to point"))
                || text.contains(QStringLiteral("point_to_point"))
                || text.contains(QStringLiteral("position"))
                || text.contains(QStringLiteral("resettoorigin"))
                || text.contains(QStringLiteral("reset to origin"))
                || text.contains(QStringLiteral("calibrate"))
               /* || text.contains(QStringLiteral("延迟"))
                || text.contains(QStringLiteral("信号"))
                || text.contains(QStringLiteral("位置"))
                || text.contains(QStringLiteral("归零"))*/
                ;
            };
    
        return containsThrottleKeyword(type) || containsThrottleKeyword(name);
    }
    
    bool FlowV2Graph::canReachEndTool(const QString& fromNodeId) const
    {
        const QString endId = findEndNodeId();
        if (endId.isEmpty()) {
            return false;
        }
    
        return canReachNode(fromNodeId, endId);
    }
    
    bool FlowV2Graph::hasContinuousManualLoopRegion() const
    {
        const QVector<QSet<QString>> components = stronglyConnectedComponents();
    
        const QSet<QString> canReachTerminal = nodesCanReachTerminal();
    
        for (const QSet<QString>& component : components) {
            if (component.size() <= 1) {
                continue;
            }
    
            bool hasTerminalExit = false;
            bool hasEndExit = false;
            bool hasThrottle = false;
    
            for (const QString& nodeId : component) {
                if (isThrottleNode(nodeId)) {
                    hasThrottle = true;
                }
            }
    
            for (const FlowV2Edge& edge : edges) {
                if (!component.contains(edge.sourceNodeId)) {
                    continue;
                }
    
                if (component.contains(edge.targetNodeId)) {
                    continue;
                }
    
                if (canReachTerminal.contains(edge.targetNodeId)) {
                    hasTerminalExit = true;
                }
    
                if (canReachEndTool(edge.targetNodeId)) {
                    hasEndExit = true;
                }
            }
    
            // 有 terminal sink,但没有 endtool 出口,且循环中有节流节点:
            // 认为是持续手动停止流程。
            if (hasTerminalExit && !hasEndExit && hasThrottle) {
                qDebug() << "[FlowV2][Graph] continuous manual loop region =" << component;
                return true;
            }
        }
    
        return false;
    }
    
    bool FlowV2Graph::componentHasCycle(const QSet<QString>& component) const
    {
        if (component.size() > 1) {
            return true;
        }
    
        if (component.size() == 1) {
            const QString nodeId = *component.begin();
    
            for (const FlowV2Edge& edge : edges) {
                if (edge.sourceNodeId == nodeId &&
                    edge.targetNodeId == nodeId) {
                    return true;
                }
            }
        }
    
        return false;
    }
    
    //bool FlowV2Graph::componentHasThrottleNode(const QSet<QString>& component) const
    //{
    //    for (const QString& nodeId : component) {
    //        if (!nodes.contains(nodeId)) {
    //            continue;
    //        }
    //
    //        const QString type = nodes.value(nodeId).type.toLower();
    //
    //        if (type.contains(QStringLiteral("delay")) ||
    //            type.contains(QStringLiteral("signal"))) {
    //            return true;
    //        }
    //    }
    //
    //    return false;
    //}
    bool FlowV2Graph::componentHasThrottleNode(const QSet<QString>& component) const
    {
        for (const QString& nodeId : component) {
            if (isThrottleNode(nodeId)) {
                return true;
            }
        }
    
        return false;
    }
    
    FlowV2ValidateResult FlowV2Graph::validateLoopRegions() const
    {
        FlowV2ValidateResult result;
        result.ok = true;
    
        const QVector<QSet<QString>> components = stronglyConnectedComponents();
        const QSet<QString> canReachTerminalSet = nodesCanReachTerminal();
    
        for (const QSet<QString>& component : components) {
            if (!componentHasCycle(component)) {
                continue;
            }
    
            bool hasExitToTerminal = false;
            bool hasExitToEnd = false;
            bool hasThrottle = componentHasThrottleNode(component);
    
            for (const FlowV2Edge& edge : edges) {
                if (!component.contains(edge.sourceNodeId)) {
                    continue;
                }
    
                // 还在循环区域内部,不算出口。
                if (component.contains(edge.targetNodeId)) {
                    continue;
                }
    
                // 只要出口目标能到 End 或 chart 这类 terminal sink,就算合法出口。
                if (canReachTerminalSet.contains(edge.targetNodeId)) {
                    hasExitToTerminal = true;
                }
    
                if (canReachEndTool(edge.targetNodeId)) {
                    hasExitToEnd = true;
                }
            }
    
            if (!hasExitToTerminal) {
                result.ok = false;
                result.error = QStringLiteral("loop region has no exit to endtool or terminal sink.");
                return result;
            }
    
            // 如果没有 End 出口,只有 chart 这类 terminal sink,
            // 那就是持续手动停止流程,必须有 delay/signal 之类节流节点。
            /*if (!hasExitToEnd && !hasThrottle) {
                result.ok = false;
                result.error = QStringLiteral("continuous loop region requires delay/signal throttle node.");
                return result;
            }*/
            if (!hasThrottle) {
                qWarning() << "[FlowV2][Graph] loop region has no explicit throttle node,"
                    << "but it has an exit path, so allow it.";
            }
    
            qDebug() << "[FlowV2][Graph] valid loop region =" << component
                << "hasExitToEnd =" << hasExitToEnd
                << "hasThrottle =" << hasThrottle;
        }
    
        return result;
    }

    FlowV2Runtime是 FlowV2 的核心调度类。

    它负责把 graph、runtimeState、executor 串起来。

    可以把它理解为:FlowV2 的事件调度器 + 状态机

    #pragma once
    
    #include "FlowV2Graph.h"
    #include "FlowRuntimeState.h"
    #include "FlowV2Executor.h"
    
    #include <QQueue>
    #include <QMap>
    #include <QVariant>
    #include <QDateTime>
    
    #include "qtpubliclib_global.h"
    
    struct FlowV2Event
    {
        FlowContext context;
    
        QString sourceNodeId;
        QString targetNodeId;
    
        int sourcePort = 0;
        int targetPort = 0;
    
        QVariant value;
    
        // add by Momo 2026.05.25
        // FlowV2 中 Ignore :不执行插件,只做账本传播。
        FlowSignalStatus status = FlowSignalStatus::Complete;
    };
    
    // 做异步
    struct FlowV2PendingNode
    {
        FlowV2Event event;
        FlowV2ExecuteInput input;
    
        QString nodeId;
        QDateTime startTime;
    };
    
    enum class FlowV2InputPolicy
    {
        AnyCompleteAfterReady = 0
    };
    
    enum class FlowV2RunDecision
    {
        Wait = 0,
        Run = 1,
        Skip = 2,
        Fail = 3
    };
    
    class QTPUBLICLIB_EXPORT  FlowV2Runtime
    {
    public:
        FlowV2Runtime() = default;
    
        bool start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState);
        QString lastError() const { return m_lastError; }
    
        bool startSubFlow(
            const FlowV2Graph& graph,
            FlowRuntimeState& runtimeState,
            const FlowContext& parentContext,
            const QString& callerNodeId
        );
    
        // 增加异步功能
        void completePendingNode(
            const FlowContext& context,
            const QVariant& outputValue = QVariant());
    
        void failPendingNode(
            const FlowContext& context,
            const QString& reason);
    
        void requestStop();
    
        bool hasPendingNodes() const { return !m_pendingNodes.isEmpty(); }
    
        // 子流程的单步走深度优先
        bool hasPendingEvents() const
        {
            return !m_pendingEvents.isEmpty();
        }
    
        bool isFinished() const
        {
            return m_finishedEmitted
                || m_stopRequested
                || !m_lastError.isEmpty();
        }
    
    
        bool hasError() const { return !m_lastError.isEmpty(); }
    
        void setFinishCallback(const std::function<void()>& callback)
        {
            m_finishCallback = callback;
        }
    
        void setFailCallback(const std::function<void(const QString&)>& callback)
        {
            m_failCallback = callback;
        }
    
        // 增加单步执行
        void setStepMode(bool enabled)
        {
            m_stepMode = enabled;
        }
    
        bool isStepMode() const
        {
            return m_stepMode;
        }
    
        bool stepOnce();
        bool continueRun();
    
    
    private:
        void enqueueEvent(const FlowV2Event& event);
        void runEventLoop();
    
        bool dispatchEvent(const FlowV2Event& event);
        void executeNode(const FlowV2Event& event, const FlowV2ExecuteInput& input);
        /*void completeNode(const FlowV2Event& event, const QVariant& outputValue);*/
        
        void reachEnd(const FlowV2Event& event);
    
        void tryFinishRun();
        void fail(const QString& reason);
    
        FlowV2Edge edgeBetween(const QString& sourceNodeId, const QString& targetNodeId) const;
    
    
        bool startInternal(
            const FlowV2Graph& graph,
            FlowRuntimeState& runtimeState,
            const FlowContext& rootContext,
            bool isSubFlow
        );
    
    
        void completeNode(
            const FlowV2Event& event,
            const FlowV2ExecuteResult& result);
    
        FlowV2RunDecision prepareNodeExecution(
            const FlowV2Event& event,
            FlowV2ExecuteInput& input);
    
        QVector<FlowV2Edge> selectedOutputEdges(
            const QString& nodeId,
            const FlowV2ExecuteResult& result) const;
    
        void propagateIgnoreEvent(const FlowV2Event& event);
    
        void propagateIgnoreFromEdge(
            const FlowV2Edge& edge,
            const FlowContext& parentContext);
    
        
    
        FlowJoinKey makeJoinKey(const FlowV2Event& event) const;
        FlowEdgeKey makeEdgeKey(const FlowV2Event& event) const;
        void finishJoinedInputTokens(const FlowJoinState& joinState, quint64 keepTokenId);
    
        // 添加循环功能
        bool enqueueSelectedEdge(
            const FlowV2Edge& edge,
            const FlowContext& parentContext,
            const QVariant& value,
            bool forceNewBranch);
    
        void registerExpectedInput(const FlowV2Event& event);
    
        // 等待所有边到达
        QVector<FlowV2Edge> candidateInputEdgesForEvent(const FlowV2Event& event) const;
    
        bool sameJoinScope(const FlowContext& a, const FlowContext& b) const;
    
        bool canReachByNormalEdges(
            const QString& fromNodeId,
            const QString& targetNodeId) const;
    
        bool canReachByAnyEdges(
            const QString& fromNodeId,
            const QString& targetNodeId,
            const QString& ignoredEdgeId = QString()) const;
    
        bool isCycleFormingEdge(const FlowV2Edge& edge) const;
    
        bool missingInputCanStillArrive(
            const FlowV2Edge& missingEdge,
            const FlowContext& context) const;
    
        bool hasMissingInputThatCanStillArrive(
            const FlowV2Event& event,
            const FlowJoinState& joinState) const;
    
        FlowEdgeKey makeEdgeKeyFromEdge(const FlowV2Edge& edge) const;
    
        // 添加多终点
        bool isTerminalSinkNode(const QString& nodeId) const;
        void reachTerminal(const FlowV2Event& event);
    
        //添加动态回边
        bool wouldRevisitNode(
            const FlowContext& context,
            const QString& targetNodeId) const;
    
        bool isRuntimeLoopBackEdge(
            const FlowV2Edge& edge,
            const FlowContext& context) const;
    
        bool isFeedbackInputEdge(const FlowV2Edge& edge) const;
    
        // 增加异步功能
        void suspendToken(
            const FlowV2Event& event,
            const FlowV2ExecuteInput& input);
    
    private:
        FlowV2Graph m_graph;
        FlowRuntimeState* m_runtimeState = nullptr;
    
        FlowContext m_rootContext;
    
        QQueue<FlowV2Event> m_pendingEvents;
        QMap<quint64, FlowV2Branch> m_branches;
    
        QString m_startNodeId;
        QString m_endNodeId;
    
        QString m_lastError;
    
        bool m_finishedEmitted = false;
        bool m_isSubFlow = false;
    
        // add by Momo.
        // true 表示当前图存在“terminal sink + 循环”的持续流程,靠用户手动停止。
        bool m_isContinuousManualLoop = false;
    
        // 增加异步功能
        QMap<quint64, FlowV2PendingNode> m_pendingNodes;
        bool m_stopRequested = false;
        std::function<void()> m_finishCallback;
        std::function<void(const QString&)> m_failCallback;
    
    
        bool m_runningEventLoop = false;
        bool m_eventLoopRerunRequested = false;
    
        // add by Momo 2026.06.02
        // FlowV2 原生单步控制。
        // m_stepMode=true 时,runEventLoop 不会一次性跑空事件队列。
        bool m_stepMode = false;
    
        // -1 表示连续执行;>0 表示本轮单步还允许执行几个真实节点。
        int m_stepBudget = -1;
    
        QString m_lastHighlightedNodeId; // 单步执行时候的高亮
    
        // add by Momo 2026.06.02
        // 数据流展示
        void clearDataFlowDisplayRecords();
    
        void recordDataFlowForDisplay(const FlowV2Event& event);
    };

    #include "FlowV2Runtime.h"
    
    #include <QDebug>
    #include <QStringList>
    
    #include "dataDefine.h"
    
    namespace {
    
        void markFlowV2NodeState(
            const FlowV2Graph& graph,
            const QString& nodeId,
            int state)
        {
            if (!graph.nodes.contains(nodeId)) {
                return;
            }
    
            modelObject* obj = graph.nodes.value(nodeId).legacyObject;
            if (!obj || obj->isDestroyPrepared()) {
                return;
            }
    
            // setmodelState 内部会调用 setNodeShowColor()
            obj->setmodelState(state);
        }
    
        void clearFlowV2NodeHighlight(
            const FlowV2Graph& graph,
            const QString& nodeId)
        {
            if (nodeId.isEmpty()) {
                return;
            }
    
            if (!graph.nodes.contains(nodeId)) {
                return;
            }
    
            modelObject* obj = graph.nodes.value(nodeId).legacyObject;
            if (!obj || obj->isDestroyPrepared()) {
                return;
            }
    
            obj->setmodelState(STATE_FINISHED);
        }
    
    }
    
    //bool FlowV2Runtime::start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState)
    //{
    //    m_graph = graph;
    //    m_runtimeState = &runtimeState;
    //    m_lastError.clear();
    //    m_pendingEvents.clear();
    //    m_branches.clear();
    //    m_finishedEmitted = false;
    //
    //    const FlowV2ValidateResult validateResult = m_graph.validateStage1Dag();
    //    if (!validateResult.ok) {
    //        fail(validateResult.error);
    //        return false;
    //    }
    //
    //    m_startNodeId = m_graph.findStartNodeId();
    //    m_endNodeId = m_graph.findEndNodeId();
    //
    //    m_rootContext = m_runtimeState->createRootContext();
    //
    //    const QVector<FlowV2Edge> startOutputs = m_graph.outgoingEdges(m_startNodeId);
    //
    //    qDebug() << "start runId =" << m_rootContext.runId
    //        << "initialBranchCount =" << startOutputs.size();
    //
    //    int branchIndex = 0;
    //
    //    for (const FlowV2Edge& edge : startOutputs) {
    //        ++branchIndex;
    //
    //        FlowContext branchContext = m_runtimeState->createBranchContext(m_rootContext, branchIndex);
    //        if (!branchContext.isValid()) {
    //            fail(QStringLiteral("failed to create branch context."));
    //            return false;
    //        }
    //
    //        FlowV2Branch branch;
    //        branch.branchId = branchContext.tokenId;
    //        branch.context = branchContext;
    //        branch.currentIndex = 0;
    //        branch.state = FlowV2BranchState::Pending;
    //        branch.nodePath.push_back(m_startNodeId);
    //        branch.nodePath.push_back(edge.targetNodeId);
    //
    //        m_branches.insert(branch.branchId, branch);
    //
    //        qDebug() << "initial branch" << branch.branchId
    //            << "edge =" << edge.sourceNodeId << "->" << edge.targetNodeId;
    //
    //        FlowV2Event event;
    //        event.context = branchContext;
    //        event.sourceNodeId = edge.sourceNodeId;
    //        event.targetNodeId = edge.targetNodeId;
    //        event.sourcePort = edge.sourcePort;
    //        event.targetPort = edge.targetPort;
    //        event.value = QVariant();
    //
    //        enqueueEvent(event);
    //    }
    //
    //    // root token 只表示本次 run 的启动,不参与 End barrier。
    //    m_runtimeState->finishToken(m_rootContext);
    //
    //    runEventLoop();
    //    tryFinishRun();
    //
    //    return m_lastError.isEmpty();
    //}
    
    bool FlowV2Runtime::startInternal(
        const FlowV2Graph& graph,
        FlowRuntimeState& runtimeState,
        const FlowContext& rootContext,
        bool isSubFlow)
    {
        m_graph = graph;
    
        // add by Momo 2026.06.02
        // 新一轮 FlowV2 运行开始前清空上一次数据流展示缓存。
        clearDataFlowDisplayRecords();
    
        m_isContinuousManualLoop = m_graph.hasContinuousManualLoopRegion(); // 添加动态回边
        qDebug() << "continuousManualLoop ="
            << m_isContinuousManualLoop;
        m_runtimeState = &runtimeState;
        m_rootContext = rootContext;
        m_isSubFlow = isSubFlow;
    
        m_lastError.clear();
        m_pendingEvents.clear();
        m_branches.clear();
        m_finishedEmitted = false;
        m_pendingNodes.clear(); // 清理异步
        m_stopRequested = false;
    
        const FlowV2ValidateResult validateResult = m_graph.validateStage1Dag();
        if (!validateResult.ok) {
            fail(validateResult.error);
            return false;
        }
    
        m_startNodeId = m_graph.findStartNodeId();
        m_endNodeId = m_graph.findEndNodeId();
    
        const QVector<FlowV2Edge> startOutputs = m_graph.outgoingEdges(m_startNodeId);
    
        qDebug() << "[FlowV2]"
            << (m_isSubFlow ? "[subflow]" : "[main]")
            << "start runId =" << m_rootContext.runId
            << "frameId =" << m_rootContext.frameId
            << "parentFrameId =" << m_rootContext.parentFrameId
            << "initialBranchCount =" << startOutputs.size();
    
        int branchIndex = 0;
    
        for (const FlowV2Edge& edge : startOutputs) {
            ++branchIndex;
    
            FlowContext branchContext = m_runtimeState->createBranchContext(m_rootContext, branchIndex);
            if (!branchContext.isValid()) {
                fail(QStringLiteral("failed to create branch context."));
                return false;
            }
    
            FlowV2Branch branch;
            branch.branchId = branchContext.tokenId;
            branch.context = branchContext;
            branch.currentIndex = 0;
            branch.state = FlowV2BranchState::Pending;
            branch.nodePath.push_back(m_startNodeId);
            branch.nodePath.push_back(edge.targetNodeId);
    
            m_branches.insert(branch.branchId, branch);
    
            qDebug() << "[FlowV2]"
                << (m_isSubFlow ? "[subflow]" : "[main]")
                << "initial branch" << branch.branchId
                << "frameId =" << branchContext.frameId
                << "edge =" << edge.sourceNodeId << "->" << edge.targetNodeId;
    
            FlowV2Event event;
            event.context = branchContext;
            event.sourceNodeId = edge.sourceNodeId;
            event.targetNodeId = edge.targetNodeId;
            event.sourcePort = edge.sourcePort;
            event.targetPort = edge.targetPort;
            event.value = QVariant();
    
            enqueueEvent(event);
        }
    
        // root token 只表示当前 frame 的启动,不参与本 frame 的 End barrier。
        m_runtimeState->finishToken(m_rootContext);
    
        // 单步模式下:第一次点击只完成 Start,并把 Start 的下游节点标记为待执行。
        // 注意:不要 runEventLoop,否则 Start 后面的节点会在第一次点击时直接执行。
        if (m_stepMode) {
            m_stepBudget = 0;
    
            qDebug() << "[FlowV2]"
                << (m_isSubFlow ? "[subflow]" : "[main]")
                << "step start paused at first frontier. pendingEvents ="
                << m_pendingEvents.size();
    
            return m_lastError.isEmpty();
        }
    
        runEventLoop();
        tryFinishRun();
    
        return m_lastError.isEmpty();
    }
    
    bool FlowV2Runtime::start(const FlowV2Graph& graph, FlowRuntimeState& runtimeState)
    {
        FlowContext rootContext = runtimeState.createRootContext();
        return startInternal(graph, runtimeState, rootContext, false);
    }
    
    bool FlowV2Runtime::startSubFlow(
        const FlowV2Graph& graph,
        FlowRuntimeState& runtimeState,
        const FlowContext& parentContext,
        const QString& callerNodeId)
    {
        FlowContext childContext =
            runtimeState.createChildFrameContext(parentContext, callerNodeId);
    
        if (!childContext.isValid()) {
            m_lastError = QStringLiteral("failed to create child frame context. caller=%1")
                .arg(callerNodeId);
    
            runtimeState.markFailed(m_lastError);
            return false;
        }
    
        return startInternal(graph, runtimeState, childContext, true);
    }
    
    // 单步执行
    bool FlowV2Runtime::stepOnce()
    {
        if (!m_runtimeState) {
            return false;
        }
    
        if (m_stopRequested || m_finishedEmitted || !m_lastError.isEmpty()) {
            return false;
        }
    
        // 正在等异步节点完成时,不强行推进。
        // 等异步 completePendingNode 后,会把后续事件留在队列里。
        if (m_pendingEvents.isEmpty()) {
            tryFinishRun();
            return m_lastError.isEmpty();
        }
    
        m_stepMode = true;
    
        // 单步语义:一次点击只执行一个真实节点。
         // 不再按当前 pendingEvents 数量执行一批,否则同步节点会一闪而过。
        m_stepBudget = 1;
        
        runEventLoop();
    
        return m_lastError.isEmpty();
    }
    
    bool FlowV2Runtime::continueRun()
    {
        if (!m_runtimeState) {
            return false;
        }
    
        if (m_stopRequested || m_finishedEmitted || !m_lastError.isEmpty()) {
            return false;
        }
    
        m_stepMode = false;
        m_stepBudget = -1;
    
        runEventLoop();
    
        return m_lastError.isEmpty();
    }
    
    void FlowV2Runtime::enqueueEvent(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            return;
        }
    
        if (m_stopRequested || m_finishedEmitted) {
            return;
        }
    
        registerExpectedInput(event);
    
        ++m_runtimeState->currentRun.pendingEvents;
        m_pendingEvents.enqueue(event);
    
        // 单步模式下,队列里的 Complete 事件代表“下一步待执行节点”。
        // Ignore/Skip 分支不应该高亮。
        if (m_stepMode && event.status == FlowSignalStatus::Complete) {
            markFlowV2NodeState(m_graph, event.targetNodeId, STATE_PAUSE);
        }
    }
    
    void FlowV2Runtime::registerExpectedInput(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            return;
        }
    
        if (!event.context.isValid()) {
            return;
        }
    
        if (event.targetNodeId.isEmpty()) {
            return;
        }
    
        // End 不走普通 join。
        // End 的收口由 token active count 处理。
        if (event.targetNodeId == m_endNodeId) {
            return;
        }
    
        if (isTerminalSinkNode(event.targetNodeId)) {
            return;
        }
    
        const FlowJoinKey joinKey = makeJoinKey(event);
        FlowJoinState& joinState = m_runtimeState->currentRun.pendingJoins[joinKey];
    
        const FlowEdgeKey edgeKey = makeEdgeKey(event);
        joinState.expectedInputs.insert(edgeKey, true);
    
        qDebug() << "[FlowV2] register expected input node =" << event.targetNodeId
            << "edge =" << edgeKey.fromNodeId << ":" << edgeKey.fromPort
            << "->" << edgeKey.toNodeId << ":" << edgeKey.toPort
            << "runId =" << event.context.runId
            << "frameId =" << event.context.frameId
            << "joinBatchId =" << event.context.joinBatchId
            << "iteration =" << event.context.iteration
            << "expected =" << joinState.expectedInputs.size();
    }
    
    
    void FlowV2Runtime::runEventLoop()
    {
        if (m_runningEventLoop) {
            m_eventLoopRerunRequested = true;
            return;
        }
    
        m_runningEventLoop = true;
    
        do {
            m_eventLoopRerunRequested = false;
    
            while (!m_pendingEvents.isEmpty()) {
                if (m_stopRequested || m_finishedEmitted || !m_runtimeState) {
                    m_pendingEvents.clear();
                    m_runningEventLoop = false;
                    return;
                }
    
                FlowV2Event event = m_pendingEvents.dequeue();
    
                if (m_runtimeState && m_runtimeState->currentRun.pendingEvents > 0) {
                    --m_runtimeState->currentRun.pendingEvents;
                }
    
                /*dispatchEvent(event);*/
                // 加入单步执行
                const bool executedNode = dispatchEvent(event);
    
                if (m_stepMode && executedNode && m_stepBudget > 0) {
                    --m_stepBudget;
    
                    if (m_stepBudget <= 0) {
                        m_runningEventLoop = false;
    
                        if (!m_stopRequested
                            && !m_finishedEmitted
                            && m_runtimeState
                            && m_lastError.isEmpty()) {
                            tryFinishRun();
                        }
    
                        return;
                    }
                }
                
    
                if (!m_lastError.isEmpty()
                    || m_stopRequested
                    || m_finishedEmitted
                    || !m_runtimeState) {
                    m_runningEventLoop = false;
                    return;
                }
            }
    
        } while (m_eventLoopRerunRequested
            && !m_stopRequested
            && !m_finishedEmitted
            && m_runtimeState
            && m_lastError.isEmpty());
    
        m_runningEventLoop = false;
    
        if (!m_stopRequested
            && !m_finishedEmitted
            && m_runtimeState
            && m_lastError.isEmpty()) {
            tryFinishRun();
        }
    }
    
    bool FlowV2Runtime::dispatchEvent(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            fail(QStringLiteral("runtime state is null."));
            return false;
        }
    
        if (!event.context.isValid()) {
            fail(QStringLiteral("invalid event context."));
            return false;
        }
    
        if (!m_graph.nodes.contains(event.targetNodeId)) {
            fail(QStringLiteral("target node not found: %1").arg(event.targetNodeId));
            return false;
        }
    
        if (!m_branches.contains(event.context.tokenId)) {
            fail(QStringLiteral("branch token not found: %1").arg(event.context.tokenId));
            return false;
        }
    
        FlowV2Branch& branch = m_branches[event.context.tokenId];
        branch.state = FlowV2BranchState::Running;
    
        // 这里只作为调试路径记录,不再作为合法性判断依据。
        if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
            branch.nodePath.push_back(event.targetNodeId);
        }
    
        /*FlowV2ExecuteInput input;
        const bool ready = prepareNodeExecution(event, input);
    
        if (!ready) {
            return;
        }
    
        executeNode(event, input);*/
    
        // add by Momo 2026.06.02
        // FlowV2 不再走旧 notifyObject_receive/send_to_next_modelobject,
        // 因此这里把事件补记到 modelObject 的数据流展示缓存。
        recordDataFlowForDisplay(event);
    
    
        FlowV2ExecuteInput input;
        const FlowV2RunDecision decision = prepareNodeExecution(event, input);
    
        if (decision == FlowV2RunDecision::Wait) {
            return false;
        }
    
        if (decision == FlowV2RunDecision::Fail) {
            fail(QStringLiteral("FlowV2 input failed at node: %1").arg(event.targetNodeId));
            return false;
        }
    
        if (decision == FlowV2RunDecision::Skip) {
            propagateIgnoreEvent(event);
            return false;
        }
    
        // 多输入 join 场景下,最后到达的 event 可能是 Ignore,
        // 但真正应该继续向下走的是某条 Complete 输入对应的 token。
        // 所以这里用 input.context 修正执行上下文。
        FlowV2Event executeEvent = event;
        executeEvent.context = input.context;
    
        executeNode(executeEvent, input);
    
        return true; // 添加单步执行用
    }
    
    
    void FlowV2Runtime::executeNode(const FlowV2Event& event, const FlowV2ExecuteInput& input)
    {
        const FlowV2Node node = m_graph.nodes.value(event.targetNodeId);
    
        // 单步运行时绑定高亮
        if (m_stepMode &&
            !m_lastHighlightedNodeId.isEmpty() &&
            m_lastHighlightedNodeId != node.nodeId) {
            clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
        }
        markFlowV2NodeState(m_graph, node.nodeId, STATE_EXECUTING);
        if (m_stepMode) {
            m_lastHighlightedNodeId = node.nodeId;
        }
    
        qDebug() << "[FlowV2] execute node =" << node.nodeId
            << "type =" << node.type
            << "branch =" << event.context.tokenId;
    
        FlowV2ExecuteResult result =
            FlowV2ExecutorRegistry::instance().execute(node, input);
    
        // 单步模式下,插件内部可能把自己置 FINISHED。
        // Runtime 要在插件执行结束后重新把“刚刚执行的节点”点亮。
        if (m_stepMode && result.status != FlowSignalStatus::Failed) {
            markFlowV2NodeState(m_graph, node.nodeId, STATE_EXECUTING);
            m_lastHighlightedNodeId = node.nodeId;
        }
    
    
        if (result.status == FlowSignalStatus::Failed) {
    
            markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
            const QString reason = result.errorMessage.isEmpty()
                ? QStringLiteral("FlowV2 node execute failed: %1").arg(node.nodeId)
                : result.errorMessage;
    
            fail(reason);
            return;
        }
    
        if (result.status == FlowSignalStatus::Pending) {
            suspendToken(event, input);
            return;
        }
    
        if (result.status == FlowSignalStatus::Ignore) {
            if (!m_stepMode) {
                markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
            }
            propagateIgnoreEvent(event);
            return;
        }
    
        if (!m_stepMode) {
            markFlowV2NodeState(m_graph, node.nodeId, STATE_FINISHED);
        }
    
        completeNode(event, result);
    }
    
    void FlowV2Runtime::completeNode(
        const FlowV2Event& event,
        const FlowV2ExecuteResult& result)
    {
        if (event.targetNodeId == m_endNodeId) {
            reachEnd(event);
            return;
        }
    
        const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(event.targetNodeId);
    
        if (outputs.isEmpty()) {
            if (m_graph.isTerminalSinkNode(event.targetNodeId)) {
                reachTerminal(event);
                return;
            }
    
            fail(QStringLiteral("node has no output and is not terminal: %1").arg(event.targetNodeId));
            return;
        }
    
        const QVector<FlowV2Edge> selectedOutputs =
            selectedOutputEdges(event.targetNodeId, result);
    
        /*if (outputs.isEmpty()) {
            fail(QStringLiteral("node has no output and is not endtool: %1").arg(event.targetNodeId));
            return;
        }*/
    
        // 添加多终点
        if (isTerminalSinkNode(event.targetNodeId)) {
            reachTerminal(event);
            return;
        }
    
        if (selectedOutputs.isEmpty()) {
            fail(QStringLiteral("node selected no output: %1").arg(event.targetNodeId));
            return;
        }
    
        // 未选中的输出边进入 Ignore 传播。
        for (const FlowV2Edge& edge : outputs) {
            bool selected = false;
    
            for (const FlowV2Edge& selectedEdge : selectedOutputs) {
                if (selectedEdge.edgeId == edge.edgeId) {
                    selected = true;
                    break;
                }
            }
    
            if (!selected) {
                // 未选的成环边代表“不进入下一轮/不继续循环”,
                // 不能传播 Ignore。
                if (isCycleFormingEdge(edge)) {
                    qDebug() << "[FlowV2] ignore unselected cycle-forming edge ="
                        << edge.sourceNodeId << "->" << edge.targetNodeId
                        << "edgeId =" << edge.edgeId;
                    continue;
                }
    
                propagateIgnoreFromEdge(edge, event.context);
            }
        }
    
        // 选中输出只有一条:当前 token 继续。
        if (selectedOutputs.size() == 1) {
            const FlowV2Edge edge = selectedOutputs.first();
    
            qDebug() << "[FlowV2] complete node =" << event.targetNodeId
                << "next =" << edge.targetNodeId
                << "branch =" << event.context.tokenId
                << "edgeKind =" << static_cast<int>(edge.kind);
    
            const bool createdNewToken =
                enqueueSelectedEdge(edge, event.context, result.outputValue, false);
    
            if (!m_lastError.isEmpty()) {
                return;
            }
    
            // LoopBack 会创建新 token,当前 token 到这里结束。
            if (createdNewToken) {
                m_runtimeState->finishToken(event.context);
    
                auto it = m_branches.find(event.context.tokenId);
                if (it != m_branches.end()) {
                    it.value().state = FlowV2BranchState::Finished;
                }
            }
    
            return;
        }
    
        // 多条选中输出:fan-out。
        qDebug() << "[FlowV2] fan-out node =" << event.targetNodeId
            << "selectedOutCount =" << selectedOutputs.size()
            << "parentBranch =" << event.context.tokenId;
    
        for (const FlowV2Edge& edge : selectedOutputs) {
            enqueueSelectedEdge(edge, event.context, result.outputValue, true);
    
            if (!m_lastError.isEmpty()) {
                return;
            }
        }
    
        m_runtimeState->finishToken(event.context);
    
        auto it = m_branches.find(event.context.tokenId);
        if (it != m_branches.end()) {
            it.value().state = FlowV2BranchState::Finished;
        }
    }
    
    
    void FlowV2Runtime::reachEnd(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            return;
        }
    
        FlowV2Branch& branch = m_branches[event.context.tokenId];
    
        branch.state = FlowV2BranchState::Finished;
    
        if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
            branch.nodePath.push_back(event.targetNodeId);
        }
    
        branch.currentIndex = branch.nodePath.size() - 1;
    
        m_runtimeState->finishToken(event.context);
    
        qDebug() << "[FlowV2]"
            << (m_isSubFlow ? "[subflow]" : "[main]")
            << "branch reach End. branch =" << event.context.tokenId
            << "frameId =" << event.context.frameId
            << "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
    
        tryFinishRun();
    }
    
    
    void FlowV2Runtime::tryFinishRun()
    {
        if (!m_runtimeState) {
            return;
        }
    
        if (m_finishedEmitted) {
            return;
        }
    
        if (m_runtimeState->hasFailed()) {
            return;
        }
    
        // 增加异步判断
        if (!m_pendingNodes.isEmpty()) {
            return;
        }
    
        // 只判断当前 frame/root token 派生出来的 child tokens。
        if (m_runtimeState->activeChildTokenCount(m_rootContext.tokenId) > 0) {
            return;
        }
    
        if (m_isSubFlow) {
            m_runtimeState->finishFrame(m_rootContext);
            m_finishedEmitted = true;
    
            qDebug() << "[FlowV2][subflow] all branches finished. runId ="
                << m_rootContext.runId
                << "frameId =" << m_rootContext.frameId
                << "parentFrameId =" << m_rootContext.parentFrameId;
    
            if (m_finishCallback) {
                m_finishCallback();
            }
    
            return;
        }
    
        m_runtimeState->requestFinish();
    
        /*if (m_runtimeState->canFinishStage1()) {
            m_finishedEmitted = true;
    
            qDebug() << "[FlowV2][main] all branches finished. runId ="
                << m_runtimeState->currentRun.runId;
    
            if (m_finishCallback) {
                m_finishCallback();
            }
        }*/
    
        if (m_runtimeState->canFinishStage1()) {
            m_finishedEmitted = true;
    
            qDebug() << "[FlowV2][main] all branches finished. runId ="
                << m_runtimeState->currentRun.runId;
    
            // FlowV2:所有分支都已经到达 EndTool 后,才允许 EndTool 真正结束主流程。
            if (m_graph.nodes.contains(m_endNodeId)) {
                modelObject* endObj = m_graph.nodes.value(m_endNodeId).legacyObject;
    
                if (endObj && !endObj->isDestroyPrepared()) {
                    QMetaObject::invokeMethod(
                        endObj,
                        "finishFlowV2MainProcess",
                        Qt::DirectConnection
                    );
                }
            }
    
            if (m_finishCallback) {
                m_finishCallback();
            }
        }
    }
    
    
    FlowV2Edge FlowV2Runtime::edgeBetween(const QString& sourceNodeId, const QString& targetNodeId) const
    {
        for (const FlowV2Edge& edge : m_graph.edges) {
            if (edge.sourceNodeId == sourceNodeId &&
                edge.targetNodeId == targetNodeId) {
                return edge;
            }
        }
    
        return FlowV2Edge();
    }
    
    FlowJoinKey FlowV2Runtime::makeJoinKey(const FlowV2Event& event) const
    {
        FlowJoinKey key;
        key.runId = event.context.runId;
        key.tokenGroupId = event.context.tokenGroupId;
        key.frameId = event.context.frameId;
        key.joinBatchId = event.context.joinBatchId;
        key.nodeId = event.targetNodeId;
        key.iteration = event.context.iteration;
        return key;
    }
    
    FlowEdgeKey FlowV2Runtime::makeEdgeKey(const FlowV2Event& event) const
    {
        FlowEdgeKey key;
        key.fromNodeId = event.sourceNodeId;
        key.fromPort = event.sourcePort;
        key.toNodeId = event.targetNodeId;
        key.toPort = event.targetPort;
        key.edgeId = QStringLiteral("%1:%2->%3:%4")
            .arg(event.sourceNodeId)
            .arg(event.sourcePort)
            .arg(event.targetNodeId)
            .arg(event.targetPort);
        return key;
    }
    
    void FlowV2Runtime::finishJoinedInputTokens(const FlowJoinState& joinState, quint64 keepTokenId)
    {
        if (!m_runtimeState) {
            return;
        }
    
        for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
            const FlowContext ctx = it.value().context;
    
            if (ctx.tokenId == keepTokenId) {
                continue;
            }
    
            m_runtimeState->finishToken(ctx);
        }
    }
    
    FlowV2RunDecision FlowV2Runtime::prepareNodeExecution(
        const FlowV2Event& event,
        FlowV2ExecuteInput& input)
    {
        input.context = event.context;
        input.runtimeState = m_runtimeState;
        input.runtime = this; // 增加异步的功能
        input.sourceNodeId = event.sourceNodeId;
        input.targetNodeId = event.targetNodeId;
        input.sourcePort = event.sourcePort;
        input.targetPort = event.targetPort;
        input.inputValue = event.value;
        input.inputValuesByPort.clear();
        input.inputValues.clear();
    
        if (isTerminalSinkNode(event.targetNodeId)) {
            input.inputValuesByPort.insert(event.targetPort, event.value);
    
            FlowV2InputValue item;
            item.sourceNodeId = event.sourceNodeId;
            item.sourcePort = event.sourcePort;
            item.targetNodeId = event.targetNodeId;
            item.targetPort = event.targetPort;
            item.value = event.value;
            input.inputValues.push_back(item);
    
            if (event.status == FlowSignalStatus::Failed) {
                return FlowV2RunDecision::Fail;
            }
    
            if (event.status == FlowSignalStatus::Ignore) {
                return FlowV2RunDecision::Skip;
            }
    
            return FlowV2RunDecision::Run;
        }
    
        // End 是特殊 barrier。
        if (event.targetNodeId == m_endNodeId) {
            input.inputValuesByPort.insert(event.targetPort, event.value);
    
            if (event.status == FlowSignalStatus::Failed) {
                return FlowV2RunDecision::Fail;
            }
    
            if (event.status == FlowSignalStatus::Ignore) {
                return FlowV2RunDecision::Skip;
            }
    
            return FlowV2RunDecision::Run;
        }
    
        const FlowJoinKey joinKey = makeJoinKey(event);
        FlowJoinState& joinState = m_runtimeState->currentRun.pendingJoins[joinKey];
    
        const FlowEdgeKey currentEdgeKey = makeEdgeKey(event);
    
        // 兜底:
        // 正常情况下 expectedInputs 已经在 enqueueEvent() 里注册。
        // 如果某个事件绕过 enqueueEvent 直接 dispatch,这里也能补上。
        if (!joinState.expectedInputs.contains(currentEdgeKey)) {
            joinState.expectedInputs.insert(currentEdgeKey, true);
        }
    
        FlowInputRecord record;
        record.status = event.status;
        record.context = event.context;
        record.data = event.value;
        record.arrivedTime = QDateTime::currentDateTime();
    
        // 1. 当前 event 登记 arrived
        joinState.arrivedInputs.insert(currentEdgeKey, record);
    
        // 2. 先等已经 registerExpectedInput() 明确注册过的输入
        if (!joinState.isReady()) {
            return FlowV2RunDecision::Wait;
        }
    
        // 3. 再判断是否还有“同轮可能到达”的静态候选输入
        //    但反馈输入不阻塞。
        if (hasMissingInputThatCanStillArrive(event, joinState)) {
            qDebug() << "[FlowV2] wait active upstream node =" << event.targetNodeId
                << "arrived =" << joinState.arrivedInputs.size()
                << "joinBatchId =" << event.context.joinBatchId
                << "iteration =" << event.context.iteration;
    
            return FlowV2RunDecision::Wait;
        }
    
        if (!joinState.isReady()) {
            qDebug() << "[FlowV2] wait activated inputs node =" << event.targetNodeId
                << "arrived =" << joinState.arrivedInputs.size()
                << "expected =" << joinState.expectedInputs.size()
                << "joinBatchId =" << event.context.joinBatchId
                << "iteration =" << event.context.iteration;
    
            return FlowV2RunDecision::Wait;
        }
    
        bool hasComplete = false;
        bool hasFailed = false;
        bool hasIgnore = false;
    
        FlowContext firstCompleteContext;
    
        for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
            const FlowInputRecord inputRecord = it.value();
    
            if (inputRecord.status == FlowSignalStatus::Complete) {
                hasComplete = true;
    
                if (!firstCompleteContext.isValid()) {
                    firstCompleteContext = inputRecord.context;
                }
            }
            else if (inputRecord.status == FlowSignalStatus::Failed) {
                hasFailed = true;
            }
            else if (inputRecord.status == FlowSignalStatus::Ignore) {
                hasIgnore = true;
            }
        }
    
        if (hasFailed) {
            m_runtimeState->currentRun.pendingJoins.remove(joinKey);
    
            qWarning() << "[FlowV2] input failed at join node =" << event.targetNodeId;
            return FlowV2RunDecision::Fail;
        }
    
        if (!hasComplete && hasIgnore) {
            finishJoinedInputTokens(joinState, event.context.tokenId);
            m_runtimeState->currentRun.pendingJoins.remove(joinKey);
    
            return FlowV2RunDecision::Skip;
        }
    
        for (auto it = joinState.arrivedInputs.begin(); it != joinState.arrivedInputs.end(); ++it) {
            const FlowEdgeKey edgeKey = it.key();
            const FlowInputRecord inputRecord = it.value();
    
            /*if (inputRecord.status == FlowSignalStatus::Complete) {
                input.inputValuesByPort.insert(edgeKey.toPort, inputRecord.data);
            }*/
    
            if (inputRecord.status == FlowSignalStatus::Complete) {
                input.inputValuesByPort.insert(edgeKey.toPort, inputRecord.data);
    
                FlowV2InputValue item;
                item.sourceNodeId = edgeKey.fromNodeId;
                item.sourcePort = edgeKey.fromPort;
                item.targetNodeId = edgeKey.toNodeId;
                item.targetPort = edgeKey.toPort;
                item.value = inputRecord.data;
    
                input.inputValues.push_back(item);
            }
        }
    
        if (firstCompleteContext.isValid()) {
            input.context = firstCompleteContext;
        }
    
        finishJoinedInputTokens(joinState, input.context.tokenId);
        m_runtimeState->currentRun.pendingJoins.remove(joinKey);
    
        qDebug() << "[FlowV2] activated join ready node =" << event.targetNodeId
            << "completeInputCount =" << input.inputValuesByPort.size()
            << "hasIgnore =" << hasIgnore
            << "keepBranch =" << input.context.tokenId
            << "joinBatchId =" << input.context.joinBatchId
            << "iteration =" << input.context.iteration;
    
        return FlowV2RunDecision::Run;
    }
    
    
    QVector<FlowV2Edge> FlowV2Runtime::selectedOutputEdges(
        const QString& nodeId,
        const FlowV2ExecuteResult& result) const
    {
        const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
    
        // selectedOutputPorts 为空:普通节点,无条件 fan-out,所有输出都走。
        if (result.selectedOutputPorts.isEmpty()) {
            return outputs;
        }
    
        QVector<FlowV2Edge> selected;
    
        for (const FlowV2Edge& edge : outputs) {
            if (result.selectedOutputPorts.contains(edge.sourcePort)) {
                selected.push_back(edge);
            }
        }
    
        return selected;
    }
    
    
    bool FlowV2Runtime::enqueueSelectedEdge(
        const FlowV2Edge& edge,
        const FlowContext& parentContext,
        const QVariant& value,
        bool forceNewBranch)
    {
        FlowContext nextContext = parentContext;
        bool createdNewToken = false;
    
        const bool runtimeLoopBack = isRuntimeLoopBackEdge(edge, parentContext);
    
        if (runtimeLoopBack) {
            nextContext = m_runtimeState->createLoopContext(
                parentContext,
                edge.sourcePort,
                m_isContinuousManualLoop);
    
            createdNewToken = true;
    
            if (!nextContext.isValid()) {
                fail(QStringLiteral("failed to create loop context. edge=%1, iteration=%2")
                    .arg(edge.edgeId)
                    .arg(parentContext.iteration + 1));
                return false;
            }
    
            qDebug() << "[FlowV2] runtime loop-back edge =" << edge.sourceNodeId
                << "->" << edge.targetNodeId
                << "parentToken =" << parentContext.tokenId
                << "nextToken =" << nextContext.tokenId
                << "iteration =" << nextContext.iteration
                << "frameId =" << nextContext.frameId;
        }
        else if (forceNewBranch) {
            nextContext = m_runtimeState->createBranchContext(parentContext, edge.sourcePort);
            createdNewToken = true;
    
            if (!nextContext.isValid()) {
                fail(QStringLiteral("failed to create branch context. edge=%1").arg(edge.edgeId));
                return false;
            }
        }
    
        if (createdNewToken) {
            FlowV2Branch branch;
            branch.branchId = nextContext.tokenId;
            branch.context = nextContext;
            branch.currentIndex = 0;
            branch.state = FlowV2BranchState::Pending;
            branch.nodePath.push_back(edge.sourceNodeId);
            branch.nodePath.push_back(edge.targetNodeId);
    
            m_branches.insert(branch.branchId, branch);
        }
    
        FlowV2Event nextEvent;
        nextEvent.context = nextContext;
        nextEvent.sourceNodeId = edge.sourceNodeId;
        nextEvent.targetNodeId = edge.targetNodeId;
        nextEvent.sourcePort = edge.sourcePort;
        nextEvent.targetPort = edge.targetPort;
        nextEvent.value = value;
        nextEvent.status = FlowSignalStatus::Complete;
    
        enqueueEvent(nextEvent);
    
        return createdNewToken;
    }
    
    void FlowV2Runtime::propagateIgnoreFromEdge(
        const FlowV2Edge& edge,
        const FlowContext& parentContext)
    {
    
        // Ignore 不允许进入循环边。
        // 否则未选分支会把 Ignore 传进循环体,导致 active token 无法收口。
        if (isCycleFormingEdge(edge)) {
            qDebug() << "[FlowV2] ignore propagation skip cycle-forming edge ="
                << edge.sourceNodeId << "->" << edge.targetNodeId
                << "edgeId =" << edge.edgeId;
            return;
        }
    
        if (isRuntimeLoopBackEdge(edge, parentContext)) {
            qDebug() << "[FlowV2] ignore propagation skip runtime LoopBack edge ="
                << edge.sourceNodeId << "->" << edge.targetNodeId
                << "edgeId =" << edge.edgeId;
            return;
        }
    
    
        FlowContext childContext =
            m_runtimeState->createBranchContext(parentContext, edge.sourcePort);
    
        if (!childContext.isValid()) {
            fail(QStringLiteral("failed to create ignore branch context. edge=%1").arg(edge.edgeId));
            return;
        }
    
        FlowV2Branch childBranch;
        childBranch.branchId = childContext.tokenId;
        childBranch.context = childContext;
        childBranch.currentIndex = 0;
        childBranch.state = FlowV2BranchState::Pending;
        childBranch.nodePath.push_back(edge.sourceNodeId);
        childBranch.nodePath.push_back(edge.targetNodeId);
    
        m_branches.insert(childBranch.branchId, childBranch);
    
        FlowV2Event ignoreEvent;
        ignoreEvent.context = childContext;
        ignoreEvent.sourceNodeId = edge.sourceNodeId;
        ignoreEvent.targetNodeId = edge.targetNodeId;
        ignoreEvent.sourcePort = edge.sourcePort;
        ignoreEvent.targetPort = edge.targetPort;
        ignoreEvent.status = FlowSignalStatus::Ignore;
        ignoreEvent.value = QVariant();
    
        qDebug() << "[FlowV2] ignore edge =" << edge.sourceNodeId
            << "->" << edge.targetNodeId
            << "port =" << edge.sourcePort
            << "ignoreBranch =" << childContext.tokenId
            << "parent =" << parentContext.tokenId;
    
        enqueueEvent(ignoreEvent);
    }
    
    
    void FlowV2Runtime::propagateIgnoreEvent(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            return;
        }
    
        if (event.targetNodeId == m_endNodeId) {
            m_runtimeState->finishToken(event.context);
    
            auto it = m_branches.find(event.context.tokenId);
            if (it != m_branches.end()) {
                it.value().state = FlowV2BranchState::Finished;
            }
    
            qDebug() << "[FlowV2]"
                << (m_isSubFlow ? "[subflow]" : "[main]")
                << "ignore branch reach End. branch =" << event.context.tokenId
                << "frameId =" << event.context.frameId
                << "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
    
            tryFinishRun();
            return;
        }
    
        const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(event.targetNodeId);
    
        QVector<FlowV2Edge> normalOutputs;
        for (const FlowV2Edge& edge : outputs) {
            // Ignore 不进入任何结构成环边。
            if (isCycleFormingEdge(edge)) {
                qDebug() << "[FlowV2] ignore propagation skip cycle-forming edge ="
                    << edge.sourceNodeId << "->" << edge.targetNodeId
                    << "edgeId =" << edge.edgeId;
                continue;
            }
    
            normalOutputs.push_back(edge);
        }
    
        if (normalOutputs.isEmpty()) {
            m_runtimeState->finishToken(event.context);
    
            auto it = m_branches.find(event.context.tokenId);
            if (it != m_branches.end()) {
                it.value().state = FlowV2BranchState::Finished;
            }
    
            qDebug() << "[FlowV2] ignore branch stopped before cycle. branch ="
                << event.context.tokenId
                << "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
    
            tryFinishRun();
            return;
        }
    
        qDebug() << "[FlowV2] propagate ignore node =" << event.targetNodeId
            << "outCount =" << normalOutputs.size()
            << "branch =" << event.context.tokenId;
    
        if (normalOutputs.size() == 1) {
            const FlowV2Edge edge = normalOutputs.first();
    
            FlowV2Event nextEvent;
            nextEvent.context = event.context;
            nextEvent.sourceNodeId = edge.sourceNodeId;
            nextEvent.targetNodeId = edge.targetNodeId;
            nextEvent.sourcePort = edge.sourcePort;
            nextEvent.targetPort = edge.targetPort;
            nextEvent.value = QVariant();
            nextEvent.status = FlowSignalStatus::Ignore;
    
            enqueueEvent(nextEvent);
            return;
        }
    
        int childIndex = 0;
    
        for (const FlowV2Edge& edge : normalOutputs) {
            ++childIndex;
    
            FlowContext childContext = m_runtimeState->createBranchContext(event.context, childIndex);
            if (!childContext.isValid()) {
                fail(QStringLiteral("failed to create ignore child branch at node: %1").arg(event.targetNodeId));
                return;
            }
    
            FlowV2Branch childBranch;
            childBranch.branchId = childContext.tokenId;
            childBranch.context = childContext;
            childBranch.currentIndex = 0;
            childBranch.state = FlowV2BranchState::Pending;
            childBranch.nodePath.push_back(event.targetNodeId);
            childBranch.nodePath.push_back(edge.targetNodeId);
    
            m_branches.insert(childBranch.branchId, childBranch);
    
            FlowV2Event nextEvent;
            nextEvent.context = childContext;
            nextEvent.sourceNodeId = edge.sourceNodeId;
            nextEvent.targetNodeId = edge.targetNodeId;
            nextEvent.sourcePort = edge.sourcePort;
            nextEvent.targetPort = edge.targetPort;
            nextEvent.value = QVariant();
            nextEvent.status = FlowSignalStatus::Ignore;
    
            enqueueEvent(nextEvent);
        }
    
        m_runtimeState->finishToken(event.context);
    
        auto it = m_branches.find(event.context.tokenId);
        if (it != m_branches.end()) {
            it.value().state = FlowV2BranchState::Finished;
        }
    }
    
    
    FlowEdgeKey FlowV2Runtime::makeEdgeKeyFromEdge(const FlowV2Edge& edge) const
    {
        FlowEdgeKey key;
        key.fromNodeId = edge.sourceNodeId;
        key.fromPort = edge.sourcePort;
        key.toNodeId = edge.targetNodeId;
        key.toPort = edge.targetPort;
        key.edgeId = edge.edgeId;
        return key;
    }
    
    bool FlowV2Runtime::sameJoinScope(const FlowContext& a, const FlowContext& b) const
    {
        return a.runId == b.runId
            && a.frameId == b.frameId
            && a.joinBatchId == b.joinBatchId
            && a.iteration == b.iteration;
    }
    
    bool FlowV2Runtime::canReachByNormalEdges(
        const QString& fromNodeId,
        const QString& targetNodeId) const
    {
        if (fromNodeId == targetNodeId) {
            return true;
        }
    
        QSet<QString> visited;
        QVector<QString> stack;
        stack.push_back(fromNodeId);
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (nodeId == targetNodeId) {
                return true;
            }
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
            for (const FlowV2Edge& edge : outputs) {
                if (edge.kind == FlowV2EdgeKind::LoopBack) {
                    continue;
                }
    
                if (!visited.contains(edge.targetNodeId)) {
                    stack.push_back(edge.targetNodeId);
                }
            }
        }
    
        return false;
    }
    
    
    bool FlowV2Runtime::canReachByAnyEdges(
        const QString& fromNodeId,
        const QString& targetNodeId,
        const QString& ignoredEdgeId) const
    {
        if (fromNodeId.isEmpty() || targetNodeId.isEmpty()) {
            return false;
        }
    
        if (fromNodeId == targetNodeId) {
            return true;
        }
    
        QSet<QString> visited;
        QVector<QString> stack;
        stack.push_back(fromNodeId);
    
        while (!stack.isEmpty()) {
            const QString nodeId = stack.takeLast();
    
            if (nodeId == targetNodeId) {
                return true;
            }
    
            if (visited.contains(nodeId)) {
                continue;
            }
    
            visited.insert(nodeId);
    
            const QVector<FlowV2Edge> outputs = m_graph.outgoingEdges(nodeId);
            for (const FlowV2Edge& edge : outputs) {
                if (!ignoredEdgeId.isEmpty() && edge.edgeId == ignoredEdgeId) {
                    continue;
                }
    
                if (!visited.contains(edge.targetNodeId)) {
                    stack.push_back(edge.targetNodeId);
                }
            }
        }
    
        return false;
    }
    
    bool FlowV2Runtime::isCycleFormingEdge(const FlowV2Edge& edge) const
    {
        // source -> target 这条边如果加上后,target 还能走回 source,
        // 那它在结构上就是一条成环边。
        return canReachByAnyEdges(
            edge.targetNodeId,
            edge.sourceNodeId,
            edge.edgeId);
    }
    
    
    
    QVector<FlowV2Edge> FlowV2Runtime::candidateInputEdgesForEvent(
        const FlowV2Event& event) const
    {
        const QVector<FlowV2Edge> allInputs =
            m_graph.incomingEdges(event.targetNodeId);
    
        QVector<FlowV2Edge> candidates;
    
        // End 不走普通 join。
        if (event.targetNodeId == m_endNodeId) {
            return candidates;
        }
    
        for (const FlowV2Edge& edge : allInputs) {
            // LoopBack 入边代表下一轮入口,不参与本轮普通 join。
            if (edge.kind == FlowV2EdgeKind::LoopBack) {
                continue;
            }
    
            candidates.push_back(edge);
        }
    
        return candidates;
    }
    
    bool FlowV2Runtime::missingInputCanStillArrive(
        const FlowV2Edge& missingEdge,
        const FlowContext& context) const
    {
        // 1. 还在事件队列里的路径。
        for (const FlowV2Event& pendingEvent : m_pendingEvents) {
            if (!sameJoinScope(pendingEvent.context, context)) {
                continue;
            }
    
            if (canReachByNormalEdges(
                pendingEvent.targetNodeId,
                missingEdge.sourceNodeId)) {
                return true;
            }
        }
    
        // 2. 已经开始执行、但进入 Pending 的路径。
        // 例如 Tips / Delay / SubProcess / 运动控制。
        for (auto it = m_pendingNodes.begin(); it != m_pendingNodes.end(); ++it) {
            const FlowV2PendingNode& pendingNode = it.value();
    
            if (!sameJoinScope(pendingNode.event.context, context)) {
                continue;
            }
    
            if (canReachByNormalEdges(
                pendingNode.nodeId,
                missingEdge.sourceNodeId)) {
                qDebug() << "[FlowV2] wait possible pending upstream node ="
                    << missingEdge.sourceNodeId
                    << "pendingNode =" << pendingNode.nodeId
                    << "token =" << pendingNode.event.context.tokenId;
    
                return true;
            }
        }
    
        return false;
    }
    
    
    bool FlowV2Runtime::hasMissingInputThatCanStillArrive(
        const FlowV2Event& event,
        const FlowJoinState& joinState) const
    {
        const QVector<FlowV2Edge> candidates =
            candidateInputEdgesForEvent(event);
    
        for (const FlowV2Edge& edge : candidates) {
            const FlowEdgeKey edgeKey = makeEdgeKeyFromEdge(edge);
    
            if (joinState.arrivedInputs.contains(edgeKey)) {
                continue;
            }
    
            // add by Momo
            // 递归反馈输入不能阻塞当前轮执行。
            //
            // 例如:
            // tipstool_2 -> globalvalue_1
            // 如果 globalvalue_1 能走到 tipstool_2,
            // 那 tipstool_2 是后续循环才会产生的输入,
            // 当前 globalvalue_1 不应该等待它。
            if (isFeedbackInputEdge(edge)) {
                qDebug() << "[FlowV2] missing feedback input ignored node =" << event.targetNodeId
                    << "missingEdge =" << edge.sourceNodeId << ":" << edge.sourcePort
                    << "->" << edge.targetNodeId << ":" << edge.targetPort;
                continue;
            }
    
    
            if (missingInputCanStillArrive(edge, event.context)) {
                qDebug() << "[FlowV2] wait possible future input node =" << event.targetNodeId
                    << "missingEdge =" << edge.sourceNodeId << ":" << edge.sourcePort
                    << "->" << edge.targetNodeId << ":" << edge.targetPort;
    
                return true;
            }
        }
    
        return false;
    }
    
    bool FlowV2Runtime::isTerminalSinkNode(const QString& nodeId) const
    {
        return m_graph.isTerminalSinkNode(nodeId);
    }
    
    void FlowV2Runtime::reachTerminal(const FlowV2Event& event)
    {
        if (!m_runtimeState) {
            return;
        }
    
        FlowV2Branch& branch = m_branches[event.context.tokenId];
    
        branch.state = FlowV2BranchState::Finished;
    
        if (branch.nodePath.isEmpty() || branch.nodePath.last() != event.targetNodeId) {
            branch.nodePath.push_back(event.targetNodeId);
        }
    
        branch.currentIndex = branch.nodePath.size() - 1;
    
        m_runtimeState->finishToken(event.context);
    
        qDebug() << "[FlowV2]"
            << (m_isSubFlow ? "[subflow]" : "[main]")
            << "branch reach terminal. node =" << event.targetNodeId
            << "branch =" << event.context.tokenId
            << "frameId =" << event.context.frameId
            << "activeChild =" << m_runtimeState->activeChildTokenCount(m_rootContext.tokenId);
    
        tryFinishRun();
    }
    
    bool FlowV2Runtime::wouldRevisitNode(
        const FlowContext& context,
        const QString& targetNodeId) const
    {
        auto it = m_branches.find(context.tokenId);
        if (it == m_branches.end()) {
            return false;
        }
    
        const FlowV2Branch& branch = it.value();
        return branch.nodePath.contains(targetNodeId);
    }
    
    bool FlowV2Runtime::isRuntimeLoopBackEdge(
        const FlowV2Edge& edge,
        const FlowContext& context) const
    {
        if (edge.kind == FlowV2EdgeKind::LoopBack) {
            return true;
        }
    
        return wouldRevisitNode(context, edge.targetNodeId);
    }
    
    bool FlowV2Runtime::isFeedbackInputEdge(const FlowV2Edge& edge) const
    {
        // edge.sourceNodeId -> edge.targetNodeId
        //
        // 如果 target 能再走回 source,
        // 说明这条输入来自 target 后续流程,是递归/循环反馈输入。
        // 它不能阻塞当前轮 target 的执行。
        return canReachByAnyEdges(
            edge.targetNodeId,
            edge.sourceNodeId,
            edge.edgeId);
    }
    
    void FlowV2Runtime::suspendToken(
        const FlowV2Event& event,
        const FlowV2ExecuteInput& input)
    {
        if (!m_runtimeState) {
            return;
        }
    
        FlowV2PendingNode pending;
        pending.event = event;
        pending.input = input;
        pending.nodeId = event.targetNodeId;
        pending.startTime = QDateTime::currentDateTime();
    
        // add by momo 2026.06.01 防止重复insert
        if (m_pendingNodes.contains(event.context.tokenId)) {
            qWarning() << "[FlowV2] token already pending =" << event.context.tokenId
                << "node =" << event.targetNodeId;
            return;
        }
        m_pendingNodes.insert(event.context.tokenId, pending);
    
        auto branchIt = m_branches.find(event.context.tokenId);
        if (branchIt != m_branches.end()) {
            branchIt.value().state = FlowV2BranchState::Pending;
        }
    
        auto tokenIt = m_runtimeState->currentRun.tokens.find(event.context.tokenId);
        if (tokenIt != m_runtimeState->currentRun.tokens.end()) {
            tokenIt.value().state = FlowTokenState::Waiting;
        }
    
        qDebug() << "[FlowV2] token pending node =" << event.targetNodeId
            << "token =" << event.context.tokenId
            << "frameId =" << event.context.frameId
            << "joinBatchId =" << event.context.joinBatchId
            << "iteration =" << event.context.iteration;
    }
    
    void FlowV2Runtime::completePendingNode(
        const FlowContext& context,
        const QVariant& outputValue)
    {
        if (!m_runtimeState) {
            return;
        }
    
        if (m_stopRequested || m_finishedEmitted) {
            return;
        }
    
        if (context.runId != m_runtimeState->currentRun.runId) {
            qWarning() << "[FlowV2] completePendingNode ignored. stale runId ="
                << context.runId
                << "current =" << m_runtimeState->currentRun.runId;
            return;
        }
    
        auto pendingIt = m_pendingNodes.find(context.tokenId);
        if (pendingIt == m_pendingNodes.end()) {
            qWarning() << "[FlowV2] completePendingNode ignored. token not pending ="
                << context.tokenId;
            return;
        }
    
        FlowV2PendingNode pending = pendingIt.value();
        m_pendingNodes.erase(pendingIt);
    
        // 单步执行时高亮
        if (!m_stepMode) {
            markFlowV2NodeState(m_graph, pending.nodeId, STATE_FINISHED);
        }
        else {
            markFlowV2NodeState(m_graph, pending.nodeId, STATE_EXECUTING);
            m_lastHighlightedNodeId = pending.nodeId;
        }
    
    
        auto tokenIt = m_runtimeState->currentRun.tokens.find(context.tokenId);
        if (tokenIt != m_runtimeState->currentRun.tokens.end()) {
            tokenIt.value().state = FlowTokenState::Active;
        }
    
        auto branchIt = m_branches.find(context.tokenId);
        if (branchIt != m_branches.end()) {
            branchIt.value().state = FlowV2BranchState::Running;
        }
    
        // 关键:不要 fallback 到 pending.input.inputValue。
        // Pending 节点输出什么,就传什么。
        // Tips / Delay completePendingNode(context, QVariant()) 就表示不输出业务值。
        FlowV2ExecuteResult result = FlowV2ExecuteResult::complete(outputValue);
    
        qDebug() << "[FlowV2] complete pending node =" << pending.nodeId
            << "token =" << context.tokenId
            << "outputValid =" << outputValue.isValid()
            << "outputType =" << outputValue.typeName();
    
        completeNode(pending.event, result);
    
        if (m_stopRequested || m_finishedEmitted || !m_runtimeState || !m_lastError.isEmpty()) {
            return;
        }
    
        // 单步模式下:异步节点完成后,只把后续事件放进队列,
        // 不自动继续跑。下一次点击“单步”再推进。
        if (m_stepMode) {
            tryFinishRun();
            return;
        }
    
        runEventLoop();
    
        if (!m_stopRequested && !m_finishedEmitted && m_runtimeState && m_lastError.isEmpty()) {
            tryFinishRun();
        }
    }
    
    void FlowV2Runtime::failPendingNode(
        const FlowContext& context,
        const QString& reason)
    {
        auto pendingIt = m_pendingNodes.find(context.tokenId);
        if (pendingIt != m_pendingNodes.end()) {
            m_pendingNodes.erase(pendingIt);
        }
    
        fail(reason);
    }
    
    void FlowV2Runtime::fail(const QString& reason)
    {
        if (!m_lastError.isEmpty()) {
            return;
        }
    
        m_lastError = reason;
    
        m_pendingEvents.clear();
        m_pendingNodes.clear();
    
        if (m_runtimeState) {
            m_runtimeState->markFailed(reason);
        }
    
        for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
            if (it.value().state == FlowV2BranchState::Running ||
                it.value().state == FlowV2BranchState::Pending) {
                it.value().state = FlowV2BranchState::Failed;
            }
        }
    
        qWarning() << "[FlowV2][FAILED]" << reason;
    
        if (m_failCallback) {
            m_failCallback(reason);
        }
    
        clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
        m_lastHighlightedNodeId.clear();
    }
    
    
    void FlowV2Runtime::requestStop()
    {
        m_stopRequested = true;
    
        m_pendingEvents.clear();
        m_pendingNodes.clear();
    
        if (m_runtimeState) {
            for (auto it = m_runtimeState->currentRun.tokens.begin();
                it != m_runtimeState->currentRun.tokens.end();
                ++it) {
                if (it.value().state == FlowTokenState::Active ||
                    it.value().state == FlowTokenState::Waiting) {
                    it.value().state = FlowTokenState::Cancelled;
                }
            }
        }
    
        for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
            if (it.value().state == FlowV2BranchState::Running ||
                it.value().state == FlowV2BranchState::Pending) {
                it.value().state = FlowV2BranchState::Finished;
            }
        }
    
        qDebug() << "runtime stop requested.";
    
        for (auto it = m_branches.begin(); it != m_branches.end(); ++it) {
            const FlowV2Branch& branch = it.value();
            if (!branch.nodePath.isEmpty()) {
                markFlowV2NodeState(m_graph, branch.nodePath.last(), STATE_FINISHED);
            }
        }
    
        clearFlowV2NodeHighlight(m_graph, m_lastHighlightedNodeId);
        m_lastHighlightedNodeId.clear();
    }
    
    // add by Momo 20260.06.02 数据流展示
    void FlowV2Runtime::clearDataFlowDisplayRecords()
    {
        QSet<modelObject*> visited;
    
        for (auto it = m_graph.nodes.begin(); it != m_graph.nodes.end(); ++it) {
            modelObject* obj = it.value().legacyObject;
            if (!obj || obj->isDestroyPrepared()) {
                continue;
            }
    
            if (visited.contains(obj)) {
                continue;
            }
    
            visited.insert(obj);
            obj->clearDataFlowDisplayCache();
        }
    }
    
    void FlowV2Runtime::recordDataFlowForDisplay(const FlowV2Event& event)
    {
        // Ignore 是 FlowV2 的 inactive-edge 语义,不是业务数据。
        // 数据流展示默认只记录真实 Complete 数据,避免未选中分支污染展示。
        if (event.status != FlowSignalStatus::Complete) {
            return;
        }
    
        if (!m_graph.nodes.contains(event.sourceNodeId) ||
            !m_graph.nodes.contains(event.targetNodeId)) {
            return;
        }
    
        modelObject* sourceObj = m_graph.nodes.value(event.sourceNodeId).legacyObject;
        modelObject* targetObj = m_graph.nodes.value(event.targetNodeId).legacyObject;
    
        if (!sourceObj || !targetObj) {
            return;
        }
    
        if (sourceObj->isDestroyPrepared() || targetObj->isDestroyPrepared()) {
            return;
        }
    
        sourceObj->recordDataFlowSendForDisplay(
            targetObj,
            event.sourcePort,
            event.value);
    
        targetObj->recordDataFlowReceiveForDisplay(
            sourceObj,
            event.targetPort,
            event.value);
    }

    FlowV2Executor 负责节点业务执行入口。

    Runtime 不直接处理具体插件逻辑,而是统一通过 ExecutorRegistry 调用。

    #pragma once
    
    #include <QString>
    #include <QVariant>
    #include <QVector>
    #include <QMap>
    #include <QSharedPointer>
    
    #include "FlowV2Graph.h"
    #include "FlowRuntimeState.h"
    
    #include "qtpubliclib_global.h"
    
    class FlowV2Runtime;
    
    
    struct FlowV2InputValue
    {
        QString sourceNodeId;
        int sourcePort = 0;
    
        QString targetNodeId;
        int targetPort = 0;
    
        QVariant value;
    };
    
    
    /**
     * @brief FlowV2 节点执行输入
     *
     * Runtime 调度到某个节点时,会把当前事件上下文整理成该结构。
     *
     * 注意:
     * 这是新运行时的输入结构,不走旧 receive(source, channel, value)。
     */
    struct FlowV2ExecuteInput
    {
        FlowContext context;
    
        // add by Momo. Used by subprocess / async / future frame-aware executors.
       // Do not store this pointer after executeFlowV2 returns.
        FlowRuntimeState* runtimeState = nullptr;
    
        FlowV2Runtime* runtime = nullptr;
    
        QString sourceNodeId;
        QString targetNodeId;
    
        int sourcePort = 0;
        int targetPort = 0;
    
        QVariant inputValue;
    
        // 多输入节点到齐后,按输入端口传给 executor。
        QMap<int, QVariant> inputValuesByPort;
    
        // add by Momo 2026.05.26
        // 保留当前本轮所有 Complete 输入。
        // 适合逻辑模块这种“一个输入端口,但允许多个输入流”的节点。
        QVector<FlowV2InputValue> inputValues;
    };
    
    /**
     * @brief FlowV2 节点执行结果
     *
     * 第一阶段主要使用:
     * - Complete
     * - Failed
     *
     * selectedOutputPorts 为后续 Judge / Loop 预留。
     */
    struct FlowV2ExecuteResult
    {
        FlowSignalStatus status = FlowSignalStatus::Complete;
    
        QVariant outputValue;
    
        QVector<int> selectedOutputPorts;
    
        QString errorMessage;
    
        static FlowV2ExecuteResult complete(const QVariant& output = QVariant())
        {
            FlowV2ExecuteResult result;
            result.status = FlowSignalStatus::Complete;
            result.outputValue = output;
            return result;
        }
    
        static FlowV2ExecuteResult failed(const QString& reason)
        {
            FlowV2ExecuteResult result;
            result.status = FlowSignalStatus::Failed;
            result.errorMessage = reason;
            return result;
        }
    
        static FlowV2ExecuteResult pending()
        {
            FlowV2ExecuteResult result;
            result.status = FlowSignalStatus::Pending;
            return result;
        }
    };
    
    /**
     * @brief FlowV2 节点执行器接口
     *
     * 后续如果不想改插件类本体,可以单独注册 executor。
     * 例如:
     * - FlowV2TipsExecutor
     * - FlowV2ComputeExecutor
     * - FlowV2JudgeExecutor
     */
    class QTPUBLICLIB_EXPORT IFlowV2NodeExecutor
    {
    public:
        virtual ~IFlowV2NodeExecutor() = default;
    
        virtual QString type() const = 0;
    
        virtual FlowV2ExecuteResult execute(
            const FlowV2Node& node,
            const FlowV2ExecuteInput& input
        ) = 0;
    };
    
    /**
     * @brief FlowV2 执行器注册表
     *
     * 第一优先级:
     * - 如果注册了 type 对应 executor,则调用 executor。
     *
     * 第二优先级:
     * - 如果 node.legacyObject 存在,则调用 legacyObject->executeFlowV2(input)。
     *
     * 第三优先级:
     * - 默认模拟执行,直接 Complete。
     */
    class QTPUBLICLIB_EXPORT FlowV2ExecutorRegistry
    {
    public:
        static FlowV2ExecutorRegistry& instance();
    
        void registerExecutor(const QSharedPointer<IFlowV2NodeExecutor>& executor);
    
        bool hasExecutor(const QString& type) const;
    
        FlowV2ExecuteResult execute(
            const FlowV2Node& node,
            const FlowV2ExecuteInput& input
        );
    
    private:
        FlowV2ExecutorRegistry() = default;
    
    private:
        QMap<QString, QSharedPointer<IFlowV2NodeExecutor>> m_executors;
    };

    #include "FlowV2Executor.h"
    #include "dataDefine.h"
    
    #include <QDebug>
    
    FlowV2ExecutorRegistry& FlowV2ExecutorRegistry::instance()
    {
        static FlowV2ExecutorRegistry registry;
        return registry;
    }
    
    void FlowV2ExecutorRegistry::registerExecutor(const QSharedPointer<IFlowV2NodeExecutor>& executor)
    {
        if (!executor) {
            return;
        }
    
        const QString type = executor->type();
        if (type.isEmpty()) {
            return;
        }
    
        m_executors.insert(type, executor);
    }
    
    bool FlowV2ExecutorRegistry::hasExecutor(const QString& type) const
    {
        return m_executors.contains(type);
    }
    
    FlowV2ExecuteResult FlowV2ExecutorRegistry::execute(
        const FlowV2Node& node,
        const FlowV2ExecuteInput& input)
    {
        if (m_executors.contains(node.type)) {
            return m_executors.value(node.type)->execute(node, input);
        }
    
        if (node.legacyObject) {
            return node.legacyObject->executeFlowV2(input);
        }
    
        qDebug() << "[FlowV2] simulate execute node =" << node.nodeId
            << "type =" << node.type;
    
        return FlowV2ExecuteResult::complete(input.inputValue);
    }

    Logo

    一站式 AI 云服务平台

    更多推荐