# -*- coding: utf-8 -*-
import os, json, sqlite3
from datetime import datetime, date
from flask import Flask, request, redirect, url_for, render_template_string, send_file, flash, send_from_directory
from openpyxl import Workbook, load_workbook
from io import BytesIO
from werkzeug.utils import secure_filename
from jinja2 import BaseLoader, TemplateNotFound
# ==============================================
# 全局配置
# ==============================================
app = Flask(__name__)
app.secret_key = "supersecretkey"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# ==============================================
# 首页(选择入口)
# ==============================================
@app.route("/")
def home():
html = """
项目选择
请选择要进入的系统:
当前服务端口:3333
"""
return html
# ==============================================
# ① 值班安排系统
# ==============================================
DUTY_PASSWORD = "Fj320030,"
DUTY_STATE_FILE = os.path.join(BASE_DIR, "duty_state.json")
DUTY_GROUPS = {
"A": ["卢猛", "叶鹏飞"],
"B": ["刁端锋", "邢淮阳"],
"C": ["胥群", "郁士军"],
"D": ["吴山军", "刘凯"]
}
DUTY_ORDER = ["A", "B", "C", "D"]
def duty_load_state():
if os.path.exists(DUTY_STATE_FILE):
try:
return json.load(open(DUTY_STATE_FILE, "r", encoding="utf-8"))
except:
pass
w = date.today().isocalendar()[1]
s = {"week": w, "group": "A"}
json.dump(s, open(DUTY_STATE_FILE, "w", encoding="utf-8"))
return s
def duty_save_state(s):
json.dump(s, open(DUTY_STATE_FILE, "w", encoding="utf-8"))
duty_state = duty_load_state()
def get_current_group():
w = date.today().isocalendar()[1]
g = duty_state["group"]
if w != duty_state["week"]:
g = DUTY_ORDER[(DUTY_ORDER.index(g) + 1) % 4]
duty_state.update({"week": w, "group": g})
duty_save_state(duty_state)
return g
@app.route("/duty/")
def duty_index():
g = get_current_group()
html = f"""
值班表
ck密码:3200;当前周四组别:{g}组({','.join(DUTY_GROUPS[g])})
周一:{','.join(DUTY_GROUPS['A'])}
周二:{','.join(DUTY_GROUPS['B'])}
周三:{','.join(DUTY_GROUPS['C'])}
周四:轮流(当前为:{g}组)
周五:{','.join(DUTY_GROUPS['D'])}
返回首页
"""
return html
@app.route("/duty/change", methods=["POST"])
def duty_change():
g = request.form.get("g")
p = request.form.get("p", "")
if p != DUTY_PASSWORD:
return "密码错误!
返回"
if g not in DUTY_ORDER:
return "组别无效!
返回"
w = date.today().isocalendar()[1]
duty_state.update({"week": w, "group": g})
duty_save_state(duty_state)
return f"已修改为 {g} 组({','.join(DUTY_GROUPS[g])})
返回"
# ==============================================
# ② 金额记录器
# ==============================================
DB_PATH = os.path.join(BASE_DIR, "data.db")
APP_TITLE = "金额记录器"
BASE_HTML = """
{{ title }}
{% with messages = get_flashed_messages() %}
{% if messages %}
{% for m in messages %}
{{ m }}
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
"""
app.jinja_loader = type("DictLoader", (BaseLoader,), {
"get_source": lambda self, env, t: (BASE_HTML, None, lambda: True) if t=="base.html" else (_ for _ in ()).throw(TemplateNotFound(t))
})()
def db_conn():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
c = db_conn()
cur = c.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
amount REAL NOT NULL,
project TEXT NOT NULL,
created_at INTEGER NOT NULL
);""")
c.commit()
c.close()
init_db()
@app.route("/money/")
def money_index():
c = db_conn()
total = c.execute("SELECT COALESCE(SUM(amount),0) FROM entries").fetchone()[0]
c.close()
html = f"""
{{% extends "base.html" %}}
{{% block content %}}
总金额:{{{{ '{total:.2f}' }}}}
{{% endblock %}}
"""
return render_template_string(html, title=APP_TITLE, this_year=datetime.now().year)
@app.route("/money/add", methods=["POST"])
def money_add():
try:
amt = float(request.form["amount"])
except:
flash("金额必须是数字"); return redirect(url_for("money_index"))
project = request.form.get("project","").strip()
if not project: flash("项目不能为空"); return redirect(url_for("money_index"))
c = db_conn()
c.execute("INSERT INTO entries(date,amount,project,created_at) VALUES(?,?,?,?)",
(datetime.now().strftime("%Y-%m-%d"), amt, project, int(datetime.now().timestamp())))
c.commit(); c.close()
flash("保存成功"); return redirect(url_for("money_index"))
@app.route("/money/list")
def money_list():
c = db_conn(); rows = c.execute("SELECT * FROM entries ORDER BY created_at ASC").fetchall(); c.close()
html = """
{% extends "base.html" %}
{% block content %}
共 {{ rows|length }} 条记录
| 日期 | 金额 | 项目 |
{% for r in rows %}
| {{ r['date'] }} | {{ '%.2f'|format(r['amount']) }} | {{ r['project'] }} |
{% endfor %}
{% endblock %}
"""
return render_template_string(html, title="金额记录器 · 列表", rows=rows, this_year=datetime.now().year)
@app.route("/money/download")
def money_download():
c = db_conn()
rows = c.execute("SELECT date,amount,project FROM entries ORDER BY created_at ASC").fetchall()
c.close()
wb = Workbook(); ws = wb.active; ws.title="数据"; ws.append(["日期","金额","项目"])
for r in rows: ws.append([r["date"], r["amount"], r["project"]])
bio = BytesIO(); wb.save(bio); bio.seek(0)
fname = f"金额记录_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return send_file(bio, as_attachment=True, download_name=fname)
@app.route("/money/upload", methods=["GET"])
def money_upload_page():
html = """
{% extends "base.html" %}
{% block content %}
{% endblock %}
"""
return render_template_string(html, title="金额记录器 · 上传", this_year=datetime.now().year)
@app.route("/money/upload", methods=["POST"])
def money_upload():
f = request.files.get("file")
if not f: flash("未选择文件"); return redirect(url_for("money_upload_page"))
try:
wb = load_workbook(filename=BytesIO(f.read())); ws = wb.active
rows = list(ws.iter_rows(values_only=True))
if len(rows)<2 or rows[0][:3] != ("日期","金额","项目"): flash("文件格式不正确"); return redirect(url_for("money_upload_page"))
c = db_conn(); c.execute("DELETE FROM entries")
for row in rows[1:]:
if not row or len(row)<3: continue
d,a,p = row[:3]
try:a=float(a)
except:continue
c.execute("INSERT INTO entries(date,amount,project,created_at) VALUES(?,?,?,?)",(str(d),a,str(p),int(datetime.now().timestamp())))
c.commit(); c.close(); flash("上传成功")
except Exception as e: flash("上传失败:"+str(e))
return redirect(url_for("money_index"))
# ==============================================
# ③ 上传下载系统
# ==============================================
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
UPLOAD_PASS = "Fj320030,"
@app.route("/upload/")
def upload_index():
files = [f for f in os.listdir(UPLOAD_FOLDER) if os.path.isfile(os.path.join(UPLOAD_FOLDER,f))]
html = """
文件上传
上传下载系统
文件列表
{% for f in files %}
- {{ f }}
{% endfor %}
返回首页
"""
return render_template_string(html, files=files)
@app.route("/upload/upload", methods=["POST"])
def upload_file():
if request.form.get("password") != UPLOAD_PASS:
flash("密码错误"); return redirect(url_for("upload_index"))
file = request.files.get("file")
if not file: flash("未选择文件"); return redirect(url_for("upload_index"))
file.save(os.path.join(UPLOAD_FOLDER, file.filename))
flash("上传成功"); return redirect(url_for("upload_index"))
@app.route("/upload/delete/", methods=["POST"])
def upload_delete(filename):
if request.form.get("password") != UPLOAD_PASS:
flash("密码错误"); return redirect(url_for("upload_index"))
path = os.path.join(UPLOAD_FOLDER, filename)
if os.path.exists(path): os.remove(path); flash("删除成功")
else: flash("文件不存在")
return redirect(url_for("upload_index"))
@app.route("/upload/download/")
def upload_download(filename):
return send_from_directory(UPLOAD_FOLDER, filename)
# ==============================================
# 主入口
# ==============================================
if __name__ == "__main__":
print("✅ 合并版启动成功:http://0.0.0.0:3333")
app.run(host="0.0.0.0", port=3333, debug=False)