返回博客

通过API获取代理使用统计数据:流量记录自动化

详细指南,介绍如何使用代理提供商的API获取使用统计数据:代码示例、自动化方法和与业务流程的集成。

📅2026年2月16日
```html

如果您正在处理大量代理——用于套利、解析或多账户——手动控制流量消耗变成了一场噩梦。代理提供商的API允许自动化获取统计信息:流量余额、活动会话、按IP地址的消耗和使用历史。在本指南中,我们将探讨如何通过Python、Node.js和cURL的代码示例集成API以进行统计管理。

为什么需要代理统计的自动化

当您管理10-50个代理用于不同项目时,通过提供商的个人账户手动检查余额和流量消耗变得低效。API解决了几个关键问题:

API统计的主要任务:

  • 监控流量余额 — 达到限制时自动通知
  • 按项目控制消耗 — 在客户或任务之间分配费用
  • 效率分析 — 哪些代理消耗更多流量以及原因
  • 防止停机 — 在流量耗尽前补充余额
  • 与CRM/计费集成 — 在业务流程中自动记录代理费用

对于通过多个账户投放广告的套利者来说,了解哪些代理处于活动状态以及剩余流量至关重要——由于达到限制而突然被封锁可能会导致数千美元的利润损失。对于管理客户Instagram或TikTok账户的SMM代理商来说,自动化可以准确开具代理使用费用。

市场解析器(Wildberries、Ozon、Avito)可以在主要代理达到限制时自动切换到备用代理。API使得构建24/7无人工干预的高可用系统成为可能。

代理提供商API的基本知识

大多数现代代理提供商提供RESTful API来管理服务。API按标准模式工作:您向特定端点发送带有身份验证参数的HTTP请求,服务器返回包含数据的JSON。

API组件 描述 示例
基础URL 提供商的API基本地址 https://api.provider.com/v1
身份验证 API密钥或Bearer Token Authorization: Bearer YOUR_API_KEY
端点 不同操作的具体路径 /statistics, /balance
响应格式 数据结构 包含status、data、error字段的JSON
速率限制 每分钟请求限制 60-300请求/分钟

代理提供商的API返回的典型JSON响应结构如下:

{
  "status": "success",
  "data": {
    "balance": 1250.50,
    "traffic_used_gb": 45.2,
    "traffic_remaining_gb": 54.8,
    "active_proxies": 15,
    "last_updated": "2024-01-15T10:30:00Z"
  },
  "error": null
}

在使用API时,重要的是要考虑速率限制(请求数量限制)。大多数提供商允许每分钟60-300个请求。对于统计监控来说,这已经足够——通常请求每5-15分钟发出一次。

身份验证和获取API密钥

使用API的第一步是获取身份验证密钥。不同提供商的过程不同,但总体逻辑是相同的:

获取API密钥的分步指南:

  1. 登录代理提供商的个人账户
  2. 找到“API”或“设置”→“API访问”部分
  3. 点击“创建API密钥”或“生成令牌”按钮
  4. 复制密钥并保存在安全的地方(它只显示一次)
  5. 如果提供商要求绑定IP地址,请设置IP白名单
  6. 通过cURL或Postman测试请求以检查密钥

代理提供商API中有两种主要的身份验证方法:

方法 工作原理 示例头部
Bearer Token 令牌在Authorization头中传递 Authorization: Bearer abc123...
API密钥作为参数 密钥作为GET/POST参数传递 ?api_key=abc123...

重要:永远不要将API密钥存储在会进入Git仓库的代码中。使用环境变量(.env文件)或秘密管理工具(AWS Secrets Manager、HashiCorp Vault)。如果密钥泄露——立即在个人账户中撤回并生成新的。

获取统计信息的主要端点

虽然每个提供商的API结构不同,但大多数提供标准的端点集以获取统计信息。我们来看一些典型的方法:

端点 方法 返回内容
/balance GET 当前余额和流量余额
/statistics GET 指定期间的总体使用统计
/statistics/proxy/{id} GET 特定代理的统计信息
/sessions/active GET 代理的活动会话列表
/traffic/history GET 按天统计的流量使用历史
/proxies/list GET 所有代理及其状态的列表

请求参数通常包括时间范围和过滤器:

GET /statistics?from=2024-01-01&to=2024-01-15&proxy_type=residential
GET /traffic/history?period=7d&group_by=day
GET /sessions/active?status=online&country=US

对于使用住宅代理的工作,获取会话统计信息的端点尤其重要,因为它们通常使用IP轮换。API允许跟踪在一段时间内使用了多少个唯一IP,以及每个会话通过的流量量。

获取统计信息的Python代码示例

Python是用于自动化代理工作的最流行语言,得益于requests库。我们来看一些通过API获取统计信息的实际示例。

基本的余额和流量请求

import requests
import os
from datetime import datetime

# API凭证来自环境变量
API_KEY = os.getenv('PROXY_API_KEY')
BASE_URL = 'https://api.proxyprovider.com/v1'

def get_balance_statistics():
    """获取余额和总体统计信息"""
    headers = {
        'Authorization': f'Bearer {API_KEY}',
        'Content-Type': 'application/json'
    }
    
    try:
        response = requests.get(
            f'{BASE_URL}/balance',
            headers=headers,
            timeout=10
        )
        response.raise_for_status()
        
        data = response.json()
        
        print(f"=== 代理统计 {datetime.now().strftime('%Y-%m-%d %H:%M')} ===")
        print(f"余额: ${data['balance']:.2f}")
        print(f"已使用流量: {data['traffic_used_gb']:.2f} GB")
        print(f"剩余流量: {data['traffic_remaining_gb']:.2f} GB")
        print(f"活动代理: {data['active_proxies']}")
        
        return data
        
    except requests.exceptions.RequestException as e:
        print(f"请求错误: {e}")
        return None

# 调用函数
stats = get_balance_statistics()

获取指定期间的详细统计信息

from datetime import datetime, timedelta
import requests

def get_traffic_history(days=7):
    """获取N天的流量使用历史"""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    
    # 计算时间范围
    end_date = datetime.now()
    start_date = end_date - timedelta(days=days)
    
    params = {
        'from': start_date.strftime('%Y-%m-%d'),
        'to': end_date.strftime('%Y-%m-%d'),
        'group_by': 'day'
    }
    
    response = requests.get(
        f'{BASE_URL}/traffic/history',
        headers=headers,
        params=params,
        timeout=10
    )
    
    if response.status_code == 200:
        data = response.json()
        
        print(f"\n=== 最近{days}天的流量使用 ===")
        for day in data['daily_stats']:
            print(f"{day['date']}: {day['traffic_gb']:.2f} GB "
                  f"({day['requests']} 请求)")
        
        total = sum(day['traffic_gb'] for day in data['daily_stats'])
        print(f"\n该期间总计: {total:.2f} GB")
        
        return data
    else:
        print(f"错误 {response.status_code}: {response.text}")
        return None

# 获取一周的统计信息
history = get_traffic_history(7)

监控特定代理的统计信息

def get_proxy_statistics(proxy_id):
    """获取特定代理的统计信息"""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    
    response = requests.get(
        f'{BASE_URL}/statistics/proxy/{proxy_id}',
        headers=headers,
        timeout=10
    )
    
    if response.status_code == 200:
        data = response.json()
        
        print(f"\n=== 代理 {proxy_id} 的统计信息 ===")
        print(f"IP地址: {data['ip_address']}")
        print(f"国家: {data['country']}")
        print(f"类型: {data['proxy_type']}")
        print(f"状态: {data['status']}")
        print(f"已使用流量: {data['traffic_used_mb']:.2f} MB")
        print(f"请求总数: {data['total_requests']}")
        print(f"最后活动: {data['last_activity']}")
        
        return data
    else:
        print(f"代理 {proxy_id} 未找到")
        return None

def get_all_proxies_stats():
    """获取所有代理的统计信息"""
    headers = {'Authorization': f'Bearer {API_KEY}'}
    
    # 首先获取所有代理的列表
    response = requests.get(
        f'{BASE_URL}/proxies/list',
        headers=headers,
        timeout=10
    )
    
    if response.status_code == 200:
        proxies = response.json()['proxies']
        
        print(f"\n=== {len(proxies)} 个代理的统计信息 ===\n")
        
        total_traffic = 0
        for proxy in proxies:
            stats = get_proxy_statistics(proxy['id'])
            if stats:
                total_traffic += stats['traffic_used_mb']
        
        print(f"\n=== 总流量消耗: {total_traffic/1024:.2f} GB ===")
        
        return proxies
    else:
        print("获取代理列表时出错")
        return None

# 获取所有代理的统计信息
all_stats = get_all_proxies_stats()

这些示例展示了如何基本使用API。对于生产系统,添加错误处理、网络故障时的重试逻辑和结果缓存,以避免超过提供商的速率限制。

监控流量的Node.js代码示例

Node.js非常适合创建后台监控微服务,这些服务在后台运行并在关键事件发生时发送通知。我们使用axios库进行HTTP请求。

用于API代理的基本类

const axios = require('axios');
require('dotenv').config();

class ProxyAPIClient {
  constructor() {
    this.baseURL = 'https://api.proxyprovider.com/v1';
    this.apiKey = process.env.PROXY_API_KEY;
    
    this.client = axios.create({
      baseURL: this.baseURL,
      timeout: 10000,
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json'
      }
    });
  }

  async getBalance() {
    try {
      const response = await this.client.get('/balance');
      return response.data;
    } catch (error) {
      console.error('获取余额时出错:', error.message);
      throw error;
    }
  }

  async getStatistics(params = {}) {
    try {
      const response = await this.client.get('/statistics', { params });
      return response.data;
    } catch (error) {
      console.error('获取统计信息时出错:', error.message);
      throw error;
    }
  }

  async getTrafficHistory(days = 7) {
    const endDate = new Date();
    const startDate = new Date();
    startDate.setDate(startDate.getDate() - days);

    const params = {
      from: startDate.toISOString().split('T')[0],
      to: endDate.toISOString().split('T')[0],
      group_by: 'day'
    };

    try {
      const response = await this.client.get('/traffic/history', { params });
      return response.data;
    } catch (error) {
      console.error('获取流量历史时出错:', error.message);
      throw error;
    }
  }

  async getProxyStats(proxyId) {
    try {
      const response = await this.client.get(`/statistics/proxy/${proxyId}`);
      return response.data;
    } catch (error) {
      console.error(`获取代理 ${proxyId} 的统计信息时出错:`, error.message);
      throw error;
    }
  }
}

module.exports = ProxyAPIClient;

带通知的自动监控

const ProxyAPIClient = require('./ProxyAPIClient');
const nodemailer = require('nodemailer');

class ProxyMonitor {
  constructor(checkIntervalMinutes = 15) {
    this.api = new ProxyAPIClient();
    this.checkInterval = checkIntervalMinutes * 60 * 1000;
    this.thresholds = {
      trafficWarning: 10, // GB - 警告
      trafficCritical: 5,  // GB - 临界水平
      balanceWarning: 50   // USD - 余额警告
    };
  }

  async checkAndNotify() {
    try {
      const balance = await this.api.getBalance();
      
      console.log(`[${new Date().toISOString()}] 检查统计信息...`);
      console.log(`余额: $${balance.balance.toFixed(2)}`);
      console.log(`剩余流量: ${balance.traffic_remaining_gb.toFixed(2)} GB`);

      // 检查临界阈值
      const alerts = [];

      if (balance.traffic_remaining_gb <= this.thresholds.trafficCritical) {
        alerts.push({
          level: 'critical',
          message: `流量极少: 剩余 ${balance.traffic_remaining_gb.toFixed(2)} GB!`
        });
      } else if (balance.traffic_remaining_gb <= this.thresholds.trafficWarning) {
        alerts.push({
          level: 'warning',
          message: `流量不足: 剩余 ${balance.traffic_remaining_gb.toFixed(2)} GB`
        });
      }

      if (balance.balance <= this.thresholds.balanceWarning) {
        alerts.push({
          level: 'warning',
          message: `余额低: $${balance.balance.toFixed(2)}`
        });
      }

      // 发送通知
      if (alerts.length > 0) {
        await this.sendAlerts(alerts, balance);
      }

      return { balance, alerts };
    } catch (error) {
      console.error('监控时出错:', error);
      await this.sendErrorNotification(error);
    }
  }

  async sendAlerts(alerts, balance) {
    console.log('\n⚠️  注意! 检测到问题:');
    alerts.forEach(alert => {
      console.log(`[${alert.level.toUpperCase()}] ${alert.message}`);
    });

    // 发送电子邮件(配置SMTP)
    const transporter = nodemailer.createTransport({
      host: process.env.SMTP_HOST,
      port: process.env.SMTP_PORT,
      auth: {
        user: process.env.SMTP_USER,
        pass: process.env.SMTP_PASS
      }
    });

    const emailBody = `
      代理存在问题:
      
      ${alerts.map(a => `- ${a.message}`).join('\n')}
      
      当前统计信息:
      - 余额: $${balance.balance.toFixed(2)}
      - 剩余流量: ${balance.traffic_remaining_gb.toFixed(2)} GB
      - 已使用: ${balance.traffic_used_gb.toFixed(2)} GB
      - 活动代理: ${balance.active_proxies}
      
      建议补充余额或减少负载。
    `;

    await transporter.sendMail({
      from: process.env.ALERT_FROM_EMAIL,
      to: process.env.ALERT_TO_EMAIL,
      subject: '⚠️ 代理警告',
      text: emailBody
    });
  }

  start() {
    console.log(`启动代理监控 (每 ${this.checkInterval / 60000} 分钟检查一次)`);
    
    // 立即进行第一次检查
    this.checkAndNotify();
    
    // 定期检查
    setInterval(() => {
      this.checkAndNotify();
    }, this.checkInterval);
  }
}

// 启动监控
const monitor = new ProxyMonitor(15); // 每15分钟检查一次
monitor.start();

这段代码创建了一个后台服务,每15分钟检查一次统计信息,并在达到临界阈值时发送电子邮件通知。对于移动代理,通常比住宅代理更昂贵,这种监控对于控制支出尤其重要。

通过cURL的请求示例

cURL是一个通用工具,用于从命令行测试API。对于快速检查端点和调试集成非常有用。

获取余额

# 基本余额请求
curl -X GET "https://api.proxyprovider.com/v1/balance" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json"

# 带有漂亮JSON格式化(通过jq)
curl -X GET "https://api.proxyprovider.com/v1/balance" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.'

# 提取特定字段
curl -s "https://api.proxyprovider.com/v1/balance" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.traffic_remaining_gb'

获取指定期间的统计信息

# 最近7天的统计信息
curl -X GET "https://api.proxyprovider.com/v1/traffic/history?from=2024-01-08&to=2024-01-15&group_by=day" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.daily_stats'

# 特定类型代理的统计信息
curl -X GET "https://api.proxyprovider.com/v1/statistics?proxy_type=residential&country=US" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.'

获取活动代理列表

# 所有活动代理的列表
curl -X GET "https://api.proxyprovider.com/v1/proxies/list?status=active" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.proxies[] | {id, ip_address, country, proxy_type}'

# 计算活动代理的数量
curl -s "https://api.proxyprovider.com/v1/proxies/list?status=active" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  | jq '.proxies | length'

自动检查的Bash脚本

#!/bin/bash

# 配置
API_KEY="YOUR_API_KEY"
BASE_URL="https://api.proxyprovider.com/v1"
TRAFFIC_THRESHOLD=10  # GB - 警告阈值

# 获取统计信息
response=$(curl -s "${BASE_URL}/balance" \
  -H "Authorization: Bearer ${API_KEY}")

# 解析JSON
traffic_remaining=$(echo "$response" | jq -r '.traffic_remaining_gb')
balance=$(echo "$response" | jq -r '.balance')
active_proxies=$(echo "$response" | jq -r '.active_proxies')

# 输出统计信息
echo "=== 代理统计 $(date '+%Y-%m-%d %H:%M:%S') ==="
echo "剩余流量: ${traffic_remaining} GB"
echo "余额: \$${balance}"
echo "活动代理: ${active_proxies}"

# 检查阈值
if (( $(echo "$traffic_remaining < $TRAFFIC_THRESHOLD" | bc -l) )); then
  echo "⚠️  注意: 流量不足! 剩余 ${traffic_remaining} GB"
  
  # 发送通知(例如,通过Telegram)
  # curl -s -X POST "https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage" \
  #   -d chat_id="${TELEGRAM_CHAT_ID}" \
  #   -d text="⚠️ 代理流量不足: ${traffic_remaining} GB"
fi

将此脚本保存为check_proxy_stats.sh,赋予执行权限(chmod +x check_proxy_stats.sh)并添加到cron中以每小时自动运行:

0 * * * * /path/to/check_proxy_stats.sh >> /var/log/proxy_monitor.log 2>&1

监控和通知的自动化

手动检查统计信息效率低下——需要智能通知的自动化。让我们考虑一个完整的监控系统与集成。

与Telegram集成以发送通知

import requests
import os
from datetime import datetime

class TelegramNotifier:
    def __init__(self):
        self.bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
        self.chat_id = os.getenv('TELEGRAM_CHAT_ID')
        self.base_url = f'https://api.telegram.org/bot{self.bot_token}'
    
    def send_message(self, text, parse_mode='Markdown'):
        """发送消息到Telegram"""
        url = f'{self.base_url}/sendMessage'
        payload = {
            'chat_id': self.chat_id,
            'text': text,
            'parse_mode': parse_mode
        }
        
        try:
            response = requests.post(url, json=payload, timeout=10)
            return response.status_code == 200
        except Exception as e:
            print(f"发送到Telegram时出错: {e}")
            return False

def monitor_proxy_with_telegram():
    """带有Telegram通知的监控"""
    api = ProxyAPIClient()
    notifier = TelegramNotifier()
    
    try:
        balance = api.get_balance()
        
        # 生成消息
        message = f"""
📊 *代理统计* ({datetime.now().strftime('%H:%M %d.%m.%Y')})

💰 余额: ${balance['balance']:.2f}
📈 已使用: {balance['traffic_used_gb']:.2f} GB
📉 剩余: {balance['traffic_remaining_gb']:.2f} GB
🔌 活动代理: {balance['active_proxies']}
        """
        
        # 检查临界阈值
        if balance['traffic_remaining_gb'] < 5:
            message += "\n⚠️ *流量极少!*"
            notifier.send_message(message)
        elif balance['traffic_remaining_gb'] < 10:
            message += "\n⚡ 建议补充流量"
            notifier.send_message(message)
        
        return balance
        
    except Exception as e:
        error_msg = f"❌ *代理监控错误*\n\n{str(e)}"
        notifier.send_message(error_msg)
        raise

# 启动监控
monitor_proxy_with_telegram()

与Slack集成

import requests
import json

class SlackNotifier:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url
    
    def send_alert(self, title, message, level='warning'):
        """发送通知到Slack"""
        color = {
            'info': '#36a64f',
            'warning': '#ff9900',
            'critical': '#ff0000'
        }.get(level, '#cccccc')
        
        payload = {
            'attachments': [{
                'color': color,
                'title': title,
                'text': message,
                'footer': '代理监控',
                'ts': int(datetime.now().timestamp())
            }]
        }
        
        try:
            response = requests.post(
                self.webhook_url,
                data=json.dumps(payload),
                headers={'Content-Type': 'application/json'},
                timeout=10
            )
            return response.status_code == 200
        except Exception as e:
            print(f"发送到Slack时出错: {e}")
            return False

def check_proxy_health():
    """检查代理健康状态并发送通知"""
    api = ProxyAPIClient()
    slack = SlackNotifier(os.getenv('SLACK_WEBHOOK_URL'))
    
    balance = api.get_balance()
    proxies = api.get_all_proxies()
    
    # 状态分析
    offline_proxies = [p for p in proxies if p['status'] != 'active']
    
    if len(offline_proxies) > 0:
        message = f"检测到 {len(offline_proxies)} 个不活动的代理:\n"
        for proxy in offline_proxies[:5]:  # 前5个
            message += f"- {proxy['ip_address']} ({proxy['country']})\n"
        
        slack.send_alert(
            '不活动的代理',
            message,
            level='warning'
        )
    
    if balance['traffic_remaining_gb'] < 5:
        slack.send_alert(
            '流量极少!',
            f"剩余 {balance['traffic_remaining_gb']:.2f} GB. "
            f"余额: ${balance['balance']:.2f}",
            level='critical'
        )

check_proxy_health()

设置自动充值

一些提供商允许通过API不仅获取统计信息,还可以在达到阈值时自动充值:

def auto_topup_if_needed(threshold_gb=10, topup_amount_usd=100):
    """在余额低时自动充值"""
    api = ProxyAPIClient()
    
    balance = api.get_balance()
    
    if balance['traffic_remaining_gb'] < threshold_gb:
        print(f"流量低于阈值 ({threshold_gb} GB),启动充值...")
        
        # 通过API请求充值
        topup_response = requests.post(
            f'{api.base_url}/billing/topup',
            headers={'Authorization': f'Bearer {api.api_key}'},
            json={
                'amount': topup_amount_usd,
                'payment_method': 'card',
                'auto_topup': True
            },
            timeout=10
        )
        
        if topup_response.status_code == 200:
            print(f"✅ 余额充值 ${topup_amount_usd}")
            
            # 发送通知
            notifier = TelegramNotifier()
            notifier.send_message(
                f"💳 自动充值\n\n"
                f"金额: ${topup_amount_usd}\n"
                f"之前的流量: {balance['traffic_remaining_gb']:.2f} GB\n"
                f"原因: 达到阈值 {threshold_gb} GB"
            )
        else:
            print(f"❌ 充值错误: {topup_response.text}")
    else:
        print(f"流量充足: {balance['traffic_remaining_gb']:.2f} GB")

# 启动检查(可以添加到cron中)
auto_topup_if_needed()

创建自己的统计仪表板

为了可视化API中的数据,可以创建一个带有流量消耗、按国家和代理类型分配的图表的Web仪表板。我们来看一个简单的Flask + Chart.js示例。

Flask后端

from flask import Flask, jsonify, render_template
from datetime import datetime, timedelta
import requests

app = Flask(__name__)

class ProxyDashboard:
    def __init__(self):
        self.api_key = os.getenv('PROXY_API_KEY')
        self.base_url = 'https://api.proxyprovider.com/v1'
    
    def get_dashboard_data(self):
        """获取仪表板的所有数据"""
        headers = {'Authorization': f'Bearer {self.api_key}'}
        
        # 余额和总体统计
        balance = requests.get(
            f'{self.base_url}/balance',
            headers=headers
        ).json()
        
        # 最近30天的历史
        end_date = datetime.now()
        start_date = end_date - timedelta(days=30)
        
        history = requests.get(
            f'{self.base_url}/traffic/history',
            headers=headers,
            params={
                'from': start_date.strftime('%Y-%m-%d'),
                'to': end_date.strftime('%Y-%m-%d'),
                'group_by': 'day'
            }
        ).json()
        
        # 代理列表
        proxies = requests.get(
            f'{self.base_url}/proxies/list',
            headers=headers
        ).json()
        
        # 数据聚合
        traffic_by_country = {}
        traffic_by_type = {}
        
        for proxy in proxies.get('proxies', []):
            country = proxy.get('country', 'Unknown')
            proxy_type = proxy.get('proxy_type', 'Unknown')
            traffic = proxy.get('traffic_used_mb', 0) / 1024  # 转换为GB
            
            traffic_by_country[country] = traffic_by_country.get(country, 0) + traffic
            traffic_by_type[proxy_type] = traffic_by_type.get(proxy_type, 0) + traffic
        
        return {
            'balance': balance,
            'history': history.get('daily_stats', []),
            'traffic_by_country': traffic_by_country,
            'traffic_by_type': traffic_by_type,
            'total_proxies': len(proxies.get('proxies', []))
        }

dashboard = ProxyDashboard()

@app.route('/')
def index():
    """仪表板主页"""
    return render_template('dashboard.html')

@app.route('/api/stats')
def get_stats():
    """获取统计信息的API端点"""
    try:
        data = dashboard.get_dashboard_data()
        return jsonify(data)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True, port=5000)

带图表的前端(dashboard.html)

<!DOCTYPE html>
<html>
<head>
  <title>代理统计仪表板</title>
  <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
  <style>
    body { font-family: Arial, sans-serif; padding: 20px; background: #f5f5f5; }
    .stats-card { background: white; padding: 20px; margin: 10px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
    .stat-value { font-size: 2rem; font-weight: bold; color: #2563eb; }
    .chart-container { height: 300px; margin: 20px 0; }
  </style>
</head>
<body>
  <h1>📊 代理统计仪表板</h1>
  
  <div style="display: grid; grid-template-columns: repeat(4, 1fr); gap: 10px;">
    <div class="stats-card">
      <div>余额</div>
      <div class="stat-value" id="balance">-</div>
    </div>
    <div class="stats-card">
      <div>剩余流量</div>
      <div class="stat-value" id="traffic-remaining">-</div>
    </div>
    <div class="stats-card">
      <div>已使用</div>
      <div class="stat-value" id="traffic-used">-</div>
    </div>
    <div class="stats-card">
      <div>活动代理</div>
      <div class="stat-value" id="active-proxies">-</div>
    </div>
  </div>

  <div class="chart-container">
    <canvas id="trafficChart"></canvas>
  </div>

  <script>
    async function fetchStats() {
      const response = await fetch('/api/stats');
      const data = await response.json();
      document.getElementById('balance').innerText = data.balance.balance.toFixed(2);
      document.getElementById('traffic-remaining').innerText = data.balance.traffic_remaining_gb.toFixed(2);
      document.getElementById('traffic-used').innerText = data.balance.traffic_used_gb.toFixed(2);
      document.getElementById('active-proxies').innerText = data.balance.active_proxies;

      const labels = data.history.map(item => item.date);
      const trafficData = data.history.map(item => item.traffic_gb);

      const ctx = document.getElementById('trafficChart').getContext('2d');
      new Chart(ctx, {
        type: 'line',
        data: {
          labels: labels,
          datasets: [{
            label: '每日流量使用 (GB)',
            data: trafficData,
            borderColor: 'rgba(38, 99, 235, 1)',
            backgroundColor: 'rgba(38, 99, 235, 0.2)',
            fill: true
          }]
        },
        options: {
          responsive: true,
          scales: {
            y: {
              beginAtZero: true
            }
          }
        }
      });
    }

    fetchStats();
  </script>
</body>
</html>
```