# -*- coding: utf-8 -*- import os, json, sqlite3 from datetime import datetime, date from flask import Flask, request, redirect, url_for, render_template_string, send_file, flash, send_from_directory from openpyxl import Workbook, load_workbook from io import BytesIO from werkzeug.utils import secure_filename from jinja2 import BaseLoader, TemplateNotFound # ============================================== # 全局配置 # ============================================== app = Flask(__name__) app.secret_key = "supersecretkey" BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # ============================================== # 首页(选择入口) # ============================================== @app.route("/") def home(): html = """ 项目选择

请选择要进入的系统:


当前服务端口:3333
""" return html # ============================================== # ① 值班安排系统 # ============================================== DUTY_PASSWORD = "Fj320030," DUTY_STATE_FILE = os.path.join(BASE_DIR, "duty_state.json") DUTY_GROUPS = { "A": ["卢猛", "叶鹏飞"], "B": ["刁端锋", "邢淮阳"], "C": ["胥群", "郁士军"], "D": ["吴山军", "刘凯"] } DUTY_ORDER = ["A", "B", "C", "D"] def duty_load_state(): if os.path.exists(DUTY_STATE_FILE): try: return json.load(open(DUTY_STATE_FILE, "r", encoding="utf-8")) except: pass w = date.today().isocalendar()[1] s = {"week": w, "group": "A"} json.dump(s, open(DUTY_STATE_FILE, "w", encoding="utf-8")) return s def duty_save_state(s): json.dump(s, open(DUTY_STATE_FILE, "w", encoding="utf-8")) duty_state = duty_load_state() def get_current_group(): w = date.today().isocalendar()[1] g = duty_state["group"] if w != duty_state["week"]: g = DUTY_ORDER[(DUTY_ORDER.index(g) + 1) % 4] duty_state.update({"week": w, "group": g}) duty_save_state(duty_state) return g @app.route("/duty/") def duty_index(): g = get_current_group() html = f""" 值班表

ck密码:3200;当前周四组别:{g}组({','.join(DUTY_GROUPS[g])})

修正周四组别: 密码:

周一:{','.join(DUTY_GROUPS['A'])}
周二:{','.join(DUTY_GROUPS['B'])}
周三:{','.join(DUTY_GROUPS['C'])}
周四:轮流(当前为:{g}组)
周五:{','.join(DUTY_GROUPS['D'])}

返回首页 """ return html @app.route("/duty/change", methods=["POST"]) def duty_change(): g = request.form.get("g") p = request.form.get("p", "") if p != DUTY_PASSWORD: return "密码错误!
返回" if g not in DUTY_ORDER: return "组别无效!
返回" w = date.today().isocalendar()[1] duty_state.update({"week": w, "group": g}) duty_save_state(duty_state) return f"已修改为 {g} 组({','.join(DUTY_GROUPS[g])})
返回" # ============================================== # ② 金额记录器 # ============================================== DB_PATH = os.path.join(BASE_DIR, "data.db") APP_TITLE = "金额记录器" BASE_HTML = """ {{ title }}

{{ title }}

首页 列表 下载 上传 返回首页
{% with messages = get_flashed_messages() %} {% if messages %} {% for m in messages %}
{{ m }}
{% endfor %} {% endif %} {% endwith %}
{% block content %}{% endblock %}
""" app.jinja_loader = type("DictLoader", (BaseLoader,), { "get_source": lambda self, env, t: (BASE_HTML, None, lambda: True) if t=="base.html" else (_ for _ in ()).throw(TemplateNotFound(t)) })() def db_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): c = db_conn() cur = c.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT NOT NULL, amount REAL NOT NULL, project TEXT NOT NULL, created_at INTEGER NOT NULL );""") c.commit() c.close() init_db() @app.route("/money/") def money_index(): c = db_conn() total = c.execute("SELECT COALESCE(SUM(amount),0) FROM entries").fetchone()[0] c.close() html = f""" {{% extends "base.html" %}} {{% block content %}}
总金额:{{{{ '{total:.2f}' }}}}
{{% endblock %}} """ return render_template_string(html, title=APP_TITLE, this_year=datetime.now().year) @app.route("/money/add", methods=["POST"]) def money_add(): try: amt = float(request.form["amount"]) except: flash("金额必须是数字"); return redirect(url_for("money_index")) project = request.form.get("project","").strip() if not project: flash("项目不能为空"); return redirect(url_for("money_index")) c = db_conn() c.execute("INSERT INTO entries(date,amount,project,created_at) VALUES(?,?,?,?)", (datetime.now().strftime("%Y-%m-%d"), amt, project, int(datetime.now().timestamp()))) c.commit(); c.close() flash("保存成功"); return redirect(url_for("money_index")) @app.route("/money/list") def money_list(): c = db_conn(); rows = c.execute("SELECT * FROM entries ORDER BY created_at ASC").fetchall(); c.close() html = """ {% extends "base.html" %} {% block content %}
共 {{ rows|length }} 条记录
{% for r in rows %} {% endfor %}
日期金额项目
{{ r['date'] }}{{ '%.2f'|format(r['amount']) }}{{ r['project'] }}
{% endblock %} """ return render_template_string(html, title="金额记录器 · 列表", rows=rows, this_year=datetime.now().year) @app.route("/money/download") def money_download(): c = db_conn() rows = c.execute("SELECT date,amount,project FROM entries ORDER BY created_at ASC").fetchall() c.close() wb = Workbook(); ws = wb.active; ws.title="数据"; ws.append(["日期","金额","项目"]) for r in rows: ws.append([r["date"], r["amount"], r["project"]]) bio = BytesIO(); wb.save(bio); bio.seek(0) fname = f"金额记录_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" return send_file(bio, as_attachment=True, download_name=fname) @app.route("/money/upload", methods=["GET"]) def money_upload_page(): html = """ {% extends "base.html" %} {% block content %}
{% endblock %} """ return render_template_string(html, title="金额记录器 · 上传", this_year=datetime.now().year) @app.route("/money/upload", methods=["POST"]) def money_upload(): f = request.files.get("file") if not f: flash("未选择文件"); return redirect(url_for("money_upload_page")) try: wb = load_workbook(filename=BytesIO(f.read())); ws = wb.active rows = list(ws.iter_rows(values_only=True)) if len(rows)<2 or rows[0][:3] != ("日期","金额","项目"): flash("文件格式不正确"); return redirect(url_for("money_upload_page")) c = db_conn(); c.execute("DELETE FROM entries") for row in rows[1:]: if not row or len(row)<3: continue d,a,p = row[:3] try:a=float(a) except:continue c.execute("INSERT INTO entries(date,amount,project,created_at) VALUES(?,?,?,?)",(str(d),a,str(p),int(datetime.now().timestamp()))) c.commit(); c.close(); flash("上传成功") except Exception as e: flash("上传失败:"+str(e)) return redirect(url_for("money_index")) # ============================================== # ③ 上传下载系统 # ============================================== UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") os.makedirs(UPLOAD_FOLDER, exist_ok=True) UPLOAD_PASS = "Fj320030," @app.route("/upload/") def upload_index(): files = [f for f in os.listdir(UPLOAD_FOLDER) if os.path.isfile(os.path.join(UPLOAD_FOLDER,f))] html = """ 文件上传

上传下载系统

密码:



文件列表

返回首页 """ return render_template_string(html, files=files) @app.route("/upload/upload", methods=["POST"]) def upload_file(): if request.form.get("password") != UPLOAD_PASS: flash("密码错误"); return redirect(url_for("upload_index")) file = request.files.get("file") if not file: flash("未选择文件"); return redirect(url_for("upload_index")) file.save(os.path.join(UPLOAD_FOLDER, file.filename)) flash("上传成功"); return redirect(url_for("upload_index")) @app.route("/upload/delete/", methods=["POST"]) def upload_delete(filename): if request.form.get("password") != UPLOAD_PASS: flash("密码错误"); return redirect(url_for("upload_index")) path = os.path.join(UPLOAD_FOLDER, filename) if os.path.exists(path): os.remove(path); flash("删除成功") else: flash("文件不存在") return redirect(url_for("upload_index")) @app.route("/upload/download/") def upload_download(filename): return send_from_directory(UPLOAD_FOLDER, filename) # ============================================== # 主入口 # ============================================== if __name__ == "__main__": print("✅ 合并版启动成功:http://0.0.0.0:3333") app.run(host="0.0.0.0", port=3333, debug=False)