Dify + python flask 接口,实现通过工作流,操作MySQL数据库(一)
Dify 工作流操作本地数据库
·
Dify安装后,直接使用docker打开

启动后,创建账号登录Dify,创建一个工作流作为测试

创建流程,sql为输入的参数
代码执行节点,需要接收第一个节点的参数sql,然后再发起请求,请求我们开发的python接口获取数据。并定义输出变量
需要注意的是,接口的IP需要用接口的IP号,而不是使用localhost,使用localhost报错。
from urllib import request
import urllib.request
import json
def main(sql: str) -> dict:
data = {"sql": sql}
url = 'http://172.30.32.1:5000/query'
json_data = json.dumps(data)
byte_data = json_data.encode('utf-8')
req = urllib.request.Request(url, data=byte_data, headers={'Content-Type': 'application/json'})
response = urllib.request.urlopen(req)
response_data = response.read().decode('utf-8')
return {
"result": response_data,
}
结束节点需要配置输出变量为上一个节点输出变量
查询数据的接口使用flask简易框架开发,然后代码执行节点来调用。
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, text
import pymysql
import json
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
# 使通过jsonify返回的中文显示正常,否则显示为ASCII码
app.config["JSON_AS_ASCII"] = False
# 查询mysql数据库,检索数据
# 接口的访问路径是:http://192.168.100.101:5000/query
# 配置数据库连接
# DATABASE_URI = 'mysql+pymysql://test:tianma%2024@localhost:3306/dify_test'
# engine = create_engine(DATABASE_URI)
connection = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='tianma%2024',
db='dify_test',
charset='utf8mb4'
)
@app.errorhandler(Exception)
def handle_exception(e):
# 如果这是一个HTTP错误,直接将它映射到响应中
if isinstance(e, HTTPException):
response = e.get_response()
response.data = jsonify({'error': e.description}).json
response.content_type = 'application/json'
return response
# 对于其他异常,返回一个500错误和错误信息
return jsonify({'error': str(e)}), 500
@app.route('/query', methods=['POST'])
def query_database():
# 获取请求中的SQL语句
sql_query = request.json.get("sql")
if not sql_query:
print(f"参数为空,参数为空!")
return jsonify({"error":"SQL statement is required"}), 400
try:
# 执行SQL查询
cursor = connection.cursor()
cursor.execute(sql_query)
rows = cursor.fetchall()
# 将结果转换为字典列表
column_names = [column[0] for column in cursor.description]
result_dict = [dict(zip(column_names, row)) for row in rows]
return jsonify(result_dict)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/hello')
def hello_world():
# 获取请求中的SQL语句
sql_query = "select * from host limit 1;"
if sql_query is None:
return jsonify({"error": "SQL query is required"}), 400
try:
# 执行SQL查询
cursor = connection.cursor()
cursor.execute(sql_query)
rows = cursor.fetchall()
data = [dict(zip([column[0] for column in cursor.description], row)) for row in rows]
cursor.close()
return jsonify(data)
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
后续就可以在这个基础上,结合AI语义识别,生成sql的能力实现MySQL数据库的信息查找,后续展开。
更多推荐




所有评论(0)