xianyu-backend-java/simple_stats_server.py
zhinianboke ea44e32e32 提交
2025-08-30 14:40:07 +08:00

233 lines
6.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简单的用户统计服务器
只统计有多少人在使用闲鱼自动回复系统
"""
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Dict, Any
import sqlite3
from datetime import datetime
import uvicorn
from pathlib import Path
app = FastAPI(title="闲鱼自动回复系统用户统计", version="1.0.0")
# 数据库文件路径
DB_PATH = Path(__file__).parent / "user_stats.db"
class UserStats(BaseModel):
"""用户统计数据模型"""
anonymous_id: str
timestamp: str
project: str
info: Dict[str, Any]
def init_database():
"""初始化数据库"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 创建用户统计表
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
anonymous_id TEXT UNIQUE NOT NULL,
first_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
last_seen DATETIME DEFAULT CURRENT_TIMESTAMP,
os TEXT,
version TEXT,
total_reports INTEGER DEFAULT 1
)
''')
# 创建索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_anonymous_id ON user_stats(anonymous_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_last_seen ON user_stats(last_seen)')
conn.commit()
conn.close()
def save_user_stats(data: UserStats):
"""保存用户统计数据"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
info = data.info
os_info = info.get('os', 'unknown')
version = info.get('version', '2.2.0')
# 检查用户是否已存在
cursor.execute('SELECT id, total_reports FROM user_stats WHERE anonymous_id = ?', (data.anonymous_id,))
existing = cursor.fetchone()
if existing:
# 更新现有用户的最后访问时间和报告次数
cursor.execute('''
UPDATE user_stats
SET last_seen = CURRENT_TIMESTAMP,
total_reports = total_reports + 1,
os = ?,
version = ?
WHERE anonymous_id = ?
''', (os_info, version, data.anonymous_id))
else:
# 插入新用户
cursor.execute('''
INSERT INTO user_stats (anonymous_id, os, version)
VALUES (?, ?, ?)
''', (data.anonymous_id, os_info, version))
conn.commit()
return True
except Exception as e:
print(f"保存用户统计失败: {e}")
return False
finally:
conn.close()
@app.post('/statistics')
async def receive_user_stats(data: UserStats):
"""接收用户统计数据"""
try:
success = save_user_stats(data)
if success:
print(f"收到用户统计: {data.anonymous_id}")
return {"status": "success", "message": "用户统计已收到"}
else:
return {"status": "error", "message": "保存统计数据失败"}
except Exception as e:
print(f"处理用户统计失败: {e}")
return {"status": "error", "message": "处理统计数据失败"}
@app.get('/stats')
async def get_user_stats():
"""获取用户统计摘要"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
# 总用户数
cursor.execute('SELECT COUNT(*) FROM user_stats')
total_users = cursor.fetchone()[0]
# 今日活跃用户
cursor.execute('''
SELECT COUNT(*)
FROM user_stats
WHERE DATE(last_seen) = DATE('now')
''')
daily_active = cursor.fetchone()[0]
# 本周活跃用户
cursor.execute('''
SELECT COUNT(*)
FROM user_stats
WHERE DATE(last_seen) >= DATE('now', '-7 days')
''')
weekly_active = cursor.fetchone()[0]
# 操作系统分布
cursor.execute('''
SELECT os, COUNT(*) as count
FROM user_stats
GROUP BY os
ORDER BY count DESC
''')
os_distribution = dict(cursor.fetchall())
# 版本分布
cursor.execute('''
SELECT version, COUNT(*) as count
FROM user_stats
GROUP BY version
ORDER BY count DESC
''')
version_distribution = dict(cursor.fetchall())
return {
"total_users": total_users,
"daily_active_users": daily_active,
"weekly_active_users": weekly_active,
"os_distribution": os_distribution,
"version_distribution": version_distribution,
"last_updated": datetime.now().isoformat()
}
except Exception as e:
return {"error": f"获取统计失败: {e}"}
finally:
conn.close()
@app.get('/stats/recent')
async def get_recent_users():
"""获取最近活跃的用户(匿名)"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute('''
SELECT anonymous_id, first_seen, last_seen, os, version, total_reports
FROM user_stats
ORDER BY last_seen DESC
LIMIT 20
''')
records = cursor.fetchall()
return {
"recent_users": [
{
"anonymous_id": record[0][:8] + "****", # 部分隐藏ID
"first_seen": record[1],
"last_seen": record[2],
"os": record[3],
"version": record[4],
"total_reports": record[5]
}
for record in records
]
}
except Exception as e:
return {"error": f"获取最近用户失败: {e}"}
finally:
conn.close()
@app.get('/')
async def root():
"""根路径"""
return {
"message": "闲鱼自动回复系统用户统计服务器",
"description": "只统计有多少人在使用这个系统",
"endpoints": {
"POST /statistics": "接收用户统计数据",
"GET /stats": "获取用户统计摘要",
"GET /stats/recent": "获取最近活跃用户"
}
}
if __name__ == "__main__":
# 初始化数据库
init_database()
print("用户统计数据库初始化完成")
# 启动服务器
print("启动用户统计服务器...")
print("访问 http://localhost:8081/stats 查看统计信息")
uvicorn.run(app, host="0.0.0.0", port=8081)