Dify安装后,直接使用docker打开

在这里插入图片描述
启动后,创建账号登录Dify,创建一个工作流作为测试

在这里插入图片描述
创建流程,sql为输入的参数在这里插入图片描述
代码执行节点,需要接收第一个节点的参数sql,然后再发起请求,请求我们开发的python接口获取数据。并定义输出变量
在这里插入图片描述
需要注意的是,接口的IP需要用接口的IP号,而不是使用localhost,使用localhost报错。

from urllib  import request
import urllib.request
import json
def main(sql: str) -> dict:
    data = {"sql": sql}
    url = 'http://172.30.32.1:5000/query'
    json_data = json.dumps(data)
    byte_data = json_data.encode('utf-8')
    req = urllib.request.Request(url, data=byte_data, headers={'Content-Type': 'application/json'})
    response = urllib.request.urlopen(req)
    response_data = response.read().decode('utf-8')
      
    return {
        "result": response_data,
    }

结束节点需要配置输出变量为上一个节点输出变量
在这里插入图片描述

查询数据的接口使用flask简易框架开发,然后代码执行节点来调用。
在这里插入图片描述

from flask import Flask, request, jsonify
from sqlalchemy import create_engine, text
import pymysql
import json
from werkzeug.exceptions import HTTPException


app = Flask(__name__)

# 使通过jsonify返回的中文显示正常,否则显示为ASCII码
app.config["JSON_AS_ASCII"] = False


# 查询mysql数据库,检索数据
# 接口的访问路径是:http://192.168.100.101:5000/query
# 配置数据库连接
# DATABASE_URI = 'mysql+pymysql://test:tianma%2024@localhost:3306/dify_test'
# engine = create_engine(DATABASE_URI)

connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='tianma%2024',
    db='dify_test',
    charset='utf8mb4'
)


@app.errorhandler(Exception)
def handle_exception(e):
    # 如果这是一个HTTP错误,直接将它映射到响应中
    if isinstance(e, HTTPException):
        response = e.get_response()
        response.data = jsonify({'error': e.description}).json
        response.content_type = 'application/json'
        return response
    # 对于其他异常,返回一个500错误和错误信息
    return jsonify({'error': str(e)}), 500


@app.route('/query', methods=['POST'])
def query_database():
    # 获取请求中的SQL语句
    sql_query = request.json.get("sql")
    if not sql_query:
        print(f"参数为空,参数为空!")
        return jsonify({"error":"SQL statement is required"}), 400
    
    
    try:
        # 执行SQL查询
        cursor = connection.cursor()
        cursor.execute(sql_query)
        rows = cursor.fetchall()

        # 将结果转换为字典列表
        column_names = [column[0] for column in cursor.description]
        result_dict = [dict(zip(column_names, row)) for row in rows]
        return jsonify(result_dict)
    
        
    
    except Exception as e:
        return jsonify({"error": str(e)}), 500
    


@app.route('/hello')
def hello_world():
    # 获取请求中的SQL语句
    sql_query = "select * from host limit 1;"
    if sql_query is None:
        return jsonify({"error": "SQL query is required"}), 400
    try:
        # 执行SQL查询
        cursor = connection.cursor()
        cursor.execute(sql_query)
        rows = cursor.fetchall()

        data = [dict(zip([column[0] for column in cursor.description], row)) for row in rows]

        cursor.close()

        return jsonify(data)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

    
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

后续就可以在这个基础上,结合AI语义识别,生成sql的能力实现MySQL数据库的信息查找,后续展开。

Logo

一站式 AI 云服务平台

更多推荐