网站首页 > 资源文章 正文
上一篇:Dify工具使用全场景:彩票自动兑奖管理系统 (应用篇·第1期)
上一篇通过上传彩票调用大模型,可以按规则生成包含彩票信息的json串,如下信息。
{
"point": "save",
"params": {
"销售站点": "测试站点",
"期号": "2024001",
"玩法": "双色球",
"投注时间": "2024-01-01 10:00:00",
"开奖时间": "2024-01-01 21:00:00",
"验票码": "ABC123",
"金额": "10元",
"公益贡献": "1元",
"微信订阅号": "lottery_test",
"红球和蓝球对": [
{
"红球": [1, 2, 3, 4, 5, 6],
"蓝球": 7
}
]
}
}
接下来,要把json信息入库保存,因为要把彩票信息管理起来,了解自己平时写的号码。
设计表结构存储
# 数据库表结构
"""
CREATE TABLE lottery_records (
id INT AUTO_INCREMENT PRIMARY KEY,
sales_site VARCHAR(255),
issue_number VARCHAR(50),
lottery_type VARCHAR(50),
betting_time DATETIME,
draw_time DATETIME,
verification_code VARCHAR(100),
amount DECIMAL(10,2),
charity_contribution DECIMAL(10,2),
wechat_subscription VARCHAR(100)
);
CREATE TABLE lottery_numbers (
id INT AUTO_INCREMENT PRIMARY KEY,
lottery_record_id INT,
red_balls VARCHAR(100),
blue_ball VARCHAR(10),
FOREIGN KEY (lottery_record_id) REFERENCES lottery_records(id)
);
"""
WEB服务提供
这里需要写一段代码,用于实现web服务。
核心代码如下:
from pydantic import BaseModel
class LotteryData(BaseModel):
point: str
params: dict
from fastapi import APIRouter, Request, Header, HTTPException
import json
import logging
import os
import time
router = APIRouter()
import mysql.connector
from mysql.connector import Error
# MySQL数据库配置
db_config = {
'host': '***.***.***.****',
'user': 'root',
'password': '3333',
'database': 'dbname'
}
# 配置日志
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'lottery.log')),
logging.StreamHandler()
]
)
@router.post('/save')
async def save_lottery(data: LotteryData, authorization: str = Header(None)):
start_time = time.time()
logging.info(f"收到保存请求: {data.point}, 参数: {data.params}")
# 验证API密钥
expected_api_key = "123456" # 设置您的API密钥
if not authorization:
logging.warning("请求缺少authorization header")
raise HTTPException(status_code=401, detail="No authorization header")
auth_scheme, _, api_key = authorization.partition(' ')
if auth_scheme.lower() != "bearer" or api_key != expected_api_key:
logging.warning(f"无效的认证信息: {auth_scheme}")
raise HTTPException(status_code=401, detail="Unauthorized")
# 从params中获取实际的彩票数据
content = data.params
result = json_to_sql(content)
end_time = time.time()
logging.info(f"请求处理完成,耗时: {end_time - start_time:.2f}秒,结果: {result}")
return result
def json_to_sql(data):
start_time = time.time()
logging.info(f"开始处理彩票数据: {data}")
try:
# 连接到MySQL数据库
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()
logging.info(f"数据库连接成功,耗时: {time.time() - start_time:.2f}秒")
# 主表插入语句
main_sql = """
INSERT INTO lottery_records (
sales_site, issue_number, lottery_type, betting_time, draw_time,
verification_code, amount, charity_contribution, wechat_subscription
) VALUES (
%(sales_site)s, %(issue_number)s, %(lottery_type)s, %(betting_time)s,
%(draw_time)s, %(verification_code)s, %(amount)s, %(charity_contribution)s,
%(wechat_subscription)s
)
"""
# 准备主表数据
main_data = {
'sales_site': data['销售站点'],
'issue_number': data['期号'],
'lottery_type': data['玩法'],
'betting_time': data['投注时间'],
'draw_time': data['开奖时间'],
'verification_code': data['验票码'],
'amount': float(data['金额'].replace('元', '')),
'charity_contribution': float(data['公益贡献'].replace('元', '')),
'wechat_subscription': data['微信订阅号']
}
# 执行主表插入
logging.info(f"准备执行主表插入,SQL: {main_sql}, 参数: {main_data}")
cursor.execute(main_sql, main_data)
lottery_record_id = cursor.lastrowid
logging.info(f"主表数据插入成功,ID: {lottery_record_id}, 耗时: {time.time() - start_time:.2f}秒")
# 号码表插入语句
numbers_sql = """
INSERT INTO lottery_numbers (lottery_record_id, red_balls, blue_ball)
VALUES (%(lottery_record_id)s, %(red_balls)s, %(blue_ball)s)
"""
# 插入所有号码记录
for ball_pair in data['红球和蓝球对']:
numbers_data = {
'lottery_record_id': lottery_record_id,
'red_balls': ','.join(map(str, ball_pair['红球'])),
'blue_ball': ball_pair['蓝球']
}
logging.info(f"准备执行号码表插入,SQL: {numbers_sql}, 参数: {numbers_data}")
cursor.execute(numbers_sql, numbers_data)
logging.info(f"号码数据插入成功: 红球 {numbers_data['red_balls']}, 蓝球 {numbers_data['blue_ball']}")
# 提交事务
conn.commit()
end_time = time.time()
logging.info(f"事务提交成功,总耗时: {end_time - start_time:.2f}秒")
return {"status": "success", "message": "数据已成功保存", "execution_time": f"{end_time - start_time:.2f}秒"}
except Error as e:
error_time = time.time()
if conn.is_connected():
conn.rollback()
logging.error(f"数据库操作失败,执行回滚: {str(e)},耗时: {error_time - start_time:.2f}秒")
return {"status": "error", "message": str(e), "execution_time": f"{error_time - start_time:.2f}秒"}
finally:
if 'cursor' in locals():
cursor.close()
logging.info("数据库游标已关闭")
if 'conn' in locals() and conn.is_connected():
conn.close()
logging.info("数据库连接已关闭")
启动类
from fastapi import FastAPI
from word import router as word_router
from receive import router as receive_router
from JsonToSave import router as json_router
import os
import uvicorn
app = FastAPI()
# 注册子路由
app.include_router(json_router, prefix="/json")
if __name__ == '__main__':
os.makedirs('temp', exist_ok=True)
os.makedirs('output', exist_ok=True)
uvicorn.run(app, host='0.0.0.0', port=5000)
这样启动服务后,后端就提供了一个http://IP:5000/json/save 的请求,如下图在dify中的配置。
其中,需要注意的是配置要写在body里,同时把上下文传递进去才行。
最后把这个工作流发布成工具:
成功效果:
在数据库查看的结果,发现已经成功入库。
猜你喜欢
- 2025-05-08 ORACLE 体系 - 12(上)(oracle数据库体系)
- 2025-05-08 TiDB 专用语法,优化器注释和表属性
- 2025-05-08 22、ORM框架(orm的框架)
- 2025-05-08 回答:我不小心把公司的数据库给删了,该不该离职?
- 2025-05-08 MySQL三大日志:binlog、redolog、undolog全解析
- 2025-05-08 数据库基础详解:存储过程、视图、游标、SQL语句优化以及索引
- 2025-05-08 MySQL体系架构(mysql数据库架构图)
- 2025-05-08 看这篇就够了!MySQL 索引知识点超全总结
- 2025-05-08 orcl数据库查询重复数据及删除重复数据方法
- 2025-05-08 浅聊MySQL索引分类(mysql索引类型详解)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- 电脑显示器花屏 (79)
- 403 forbidden (65)
- linux怎么查看系统版本 (54)
- 补码运算 (63)
- 缓存服务器 (61)
- 定时重启 (59)
- plsql developer (73)
- 对话框打开时命令无法执行 (61)
- excel数据透视表 (72)
- oracle认证 (56)
- 网页不能复制 (84)
- photoshop外挂滤镜 (58)
- 网页无法复制粘贴 (55)
- vmware workstation 7 1 3 (78)
- jdk 64位下载 (65)
- phpstudy 2013 (66)
- 卡通形象生成 (55)
- psd模板免费下载 (67)
- shift (58)
- localhost打不开 (58)
- 检测代理服务器设置 (55)
- frequency (66)
- indesign教程 (55)
- 运行命令大全 (61)
- ping exe (64)
本文暂时没有评论,来添加一个吧(●'◡'●)