Skip to content

Supabase MCP 数据库工作流

本指南介绍如何使用 Supabase MCP 来构建智能数据库管理和工作流系统,让 ByteBuddy 能够自动化数据库操作。

概述

Supabase MCP 提供了对 Supabase 平台的深度集成,让 ByteBuddy 能够:

  • 自动化数据库迁移
  • 智能查询优化
  • 实时数据同步
  • 备份和恢复管理

配置

1. 安装 Supabase MCP

bash
npm install -g supabase-mcp-server

2. 配置 ByteBuddy

json
{
  "mcpServers": {
    "supabase": {
      "command": "node",
      "args": ["supabase-mcp-server"],
      "env": {
        "SUPABASE_URL": "https://your-project.supabase.co",
        "SUPABASE_KEY": "your-service-role-key",
        "SUPABASE_PROJECT_ID": "your-project-id"
      }
    }
  }
}

使用场景

场景 1: 自动化数据库迁移

typescript
// 数据库迁移管理器
class DatabaseMigrationManager {
  async runMigrations(migrationConfig: any) {
    // 检查数据库状态
    const dbStatus = await this.checkDatabaseStatus();

    if (dbStatus.isLocked) {
      throw new Error("数据库正在维护中,无法执行迁移");
    }

    // 获取待执行的迁移
    const pendingMigrations = await this.getPendingMigrations();

    // 执行迁移计划
    const migrationPlan = await this.createMigrationPlan(pendingMigrations);

    // 备份数据库
    const backup = await this.createBackup("pre-migration");

    try {
      // 执行迁移
      const results = [];
      for (const migration of migrationPlan.migrations) {
        const result = await this.executeMigration(migration);
        results.push(result);
      }

      // 验证迁移结果
      await this.validateMigration(results);

      return { success: true, migrations: results, backupId: backup.id };
    } catch (error) {
      // 回滚迁移
      await this.rollbackMigration(backup.id);
      throw error;
    }
  }

  private async executeMigration(migration: any) {
    const startTime = Date.now();

    try {
      // 执行 SQL
      const result = await mcp.call("supabase.executeSql", {
        sql: migration.sql,
        parameters: migration.parameters || [],
      });

      // 记录迁移历史
      await this.recordMigration(migration, result, true);

      return {
        id: migration.id,
        success: true,
        executionTime: Date.now() - startTime,
        affectedRows: result.affectedRows,
        result: result.data,
      };
    } catch (error) {
      // 记录失败
      await this.recordMigration(migration, error, false);

      return {
        id: migration.id,
        success: false,
        executionTime: Date.now() - startTime,
        error: error.message,
      };
    }
  }

  private async createMigrationPlan(migrations: any[]) {
    // 分析迁移依赖关系
    const dependencyGraph = this.buildDependencyGraph(migrations);

    // 排序迁移顺序
    const sortedMigrations = this.topologicalSort(dependencyGraph);

    // 验证迁移安全性
    const validatedMigrations = await this.validateMigrations(sortedMigrations);

    return {
      migrations: validatedMigrations,
      estimatedTime: this.estimateExecutionTime(validatedMigrations),
      riskLevel: this.assessRiskLevel(validatedMigrations),
    };
  }
}

场景 2: 智能查询优化

typescript
// 查询优化器
class QueryOptimizer {
  async optimizeSlowQueries(timeRange: string = "24h") {
    // 获取慢查询日志
    const slowQueries = await mcp.call("supabase.getSlowQueries", {
      timeRange: timeRange,
      threshold: 1000, // 1秒阈值
    });

    const optimizations = [];

    for (const query of slowQueries) {
      const optimization = await this.optimizeQuery(query);
      optimizations.push(optimization);
    }

    // 应用优化建议
    const appliedOptimizations = await this.applyOptimizations(optimizations);

    return {
      originalQueries: slowQueries.length,
      optimizations: optimizations.length,
      applied: appliedOptimizations.length,
      performanceGain: this.calculatePerformanceGain(appliedOptimizations),
    };
  }

  private async optimizeQuery(query: any) {
    // 分析查询执行计划
    const executionPlan = await mcp.call("supabase.explainQuery", {
      sql: query.sql,
      parameters: query.parameters,
    });

    // 识别性能瓶颈
    const bottlenecks = this.identifyBottlenecks(executionPlan);

    // 生成优化建议
    const suggestions = await this.generateOptimizationSuggestions(
      query,
      bottlenecks,
    );

    // 生成优化后的查询
    const optimizedQuery = await this.generateOptimizedQuery(
      query,
      suggestions,
    );

    return {
      original: query,
      bottlenecks,
      suggestions,
      optimized: optimizedQuery,
      estimatedImprovement: this.estimateImprovement(query, optimizedQuery),
    };
  }

  private identifyBottlenecks(executionPlan: any) {
    const bottlenecks = [];

    // 检查全表扫描
    if (executionPlan.scanType === "Seq Scan") {
      bottlenecks.push({
        type: "full_table_scan",
        table: executionPlan.tableName,
        suggestion: "添加适当的索引",
      });
    }

    // 检查缺失索引
    if (
      executionPlan.missingIndexes &&
      executionPlan.missingIndexes.length > 0
    ) {
      bottlenecks.push({
        type: "missing_index",
        columns: executionPlan.missingIndexes,
        suggestion: "创建复合索引",
      });
    }

    // 检查排序操作
    if (executionPlan.sortMethod === "External Sort") {
      bottlenecks.push({
        type: "external_sort",
        suggestion: "优化排序字段或增加内存",
      });
    }

    return bottlenecks;
  }

  private async generateOptimizedQuery(query: any, suggestions: any[]) {
    let optimizedSql = query.sql;
    const parameters = [...(query.parameters || [])];

    // 应用优化建议
    for (const suggestion of suggestions) {
      switch (suggestion.type) {
        case "add_index":
          // 生成索引创建语句
          await this.createIndex(suggestion);
          break;

        case "rewrite_join":
          optimizedSql = this.rewriteJoin(optimizedSql, suggestion);
          break;

        case "add_limit":
          optimizedSql = this.addLimit(optimizedSql, suggestion);
          break;
      }
    }

    return {
      sql: optimizedSql,
      parameters,
      optimizations: suggestions.length,
    };
  }
}

场景 3: 数据同步和备份

typescript
// 数据同步管理器
class DataSyncManager {
  async setupDataSync(syncConfig: any) {
    // 创建同步任务
    const syncTask = await this.createSyncTask(syncConfig);

    // 设置触发器
    await this.setupTriggers(syncConfig);

    // 配置实时同步
    if (syncConfig.realtime) {
      await this.setupRealtimeSync(syncConfig);
    }

    return syncTask;
  }

  private async setupRealtimeSync(config: any) {
    // 订阅数据变更
    const subscription = await mcp.call("supabase.subscribe", {
      table: config.sourceTable,
      event: "*",
    });

    // 处理变更事件
    subscription.on("change", async (payload) => {
      await this.handleChange(payload, config);
    });

    return subscription;
  }

  private async handleChange(payload: any, config: any) {
    const { eventType, record, oldRecord } = payload;

    switch (eventType) {
      case "INSERT":
        await this.syncInsert(record, config);
        break;
      case "UPDATE":
        await this.syncUpdate(record, oldRecord, config);
        break;
      case "DELETE":
        await this.syncDelete(oldRecord, config);
        break;
    }
  }

  private async createBackup(backupType: string) {
    const backup = await mcp.call("supabase.createBackup", {
      type: backupType,
      compression: "gzip",
      encryption: true,
    });

    // 验证备份完整性
    const integrityCheck = await this.verifyBackupIntegrity(backup.id);

    if (!integrityCheck.valid) {
      throw new Error(`备份验证失败: ${integrityCheck.error}`);
    }

    return backup;
  }
}

监控和告警

1: 性能监控

typescript
// 性能监控系统
class PerformanceMonitor {
  async monitorDatabaseHealth() {
    const metrics = {
      connections: await this.getConnectionMetrics(),
      queries: await this.getQueryMetrics(),
      storage: await this.getStorageMetrics(),
      performance: await this.getPerformanceMetrics(),
    };

    // 计算健康分数
    const healthScore = this.calculateHealthScore(metrics);

    // 生成告警
    const alerts = await this.generateAlerts(metrics);

    // 创建监控报告
    const report = await this.createMonitoringReport(metrics, healthScore);

    return { metrics, healthScore, alerts, report };
  }

  private async getConnectionMetrics() {
    return await mcp.call("supabase.getConnectionMetrics", {
      timeRange: "1h",
    });
  }

  private async generateAlerts(metrics: any) {
    const alerts = [];

    // 连接数告警
    if (metrics.connections.active > metrics.connections.max * 0.8) {
      alerts.push({
        type: "connection_limit",
        severity: "warning",
        message: `连接使用率达到 ${((metrics.connections.active / metrics.connections.max) * 100).toFixed(1)}%`,
        recommendation: "考虑增加连接池大小或优化查询",
      });
    }

    // 查询性能告警
    if (metrics.performance.avgQueryTime > 1000) {
      alerts.push({
        type: "slow_queries",
        severity: "warning",
        message: `平均查询时间: ${metrics.performance.avgQueryTime}ms`,
        recommendation: "检查慢查询日志并优化查询",
      });
    }

    // 存储空间告警
    if (metrics.storage.usage > metrics.storage.limit * 0.9) {
      alerts.push({
        type: "storage_full",
        severity: "critical",
        message: `存储空间使用率 ${((metrics.storage.usage / metrics.storage.limit) * 100).toFixed(1)}%`,
        recommendation: "清理不必要的数据或升级存储计划",
      });
    }

    return alerts;
  }
}

2: 自动化维护

typescript
// 自动化维护系统
class AutoMaintenance {
  async scheduleMaintenance() {
    const maintenanceTasks = [
      this.optimizeTables(),
      this.updateStatistics(),
      this.cleanupOldData(),
      this.rebuildIndexes(),
    ];

    const results = await Promise.allSettled(maintenanceTasks);

    const report = {
      completed: results.filter((r) => r.status === "fulfilled").length,
      failed: results.filter((r) => r.status === "rejected").length,
      details: results.map((r, i) => ({
        task: [
          "table_optimization",
          "statistics_update",
          "data_cleanup",
          "index_rebuild",
        ][i],
        status: r.status,
        result: r.status === "fulfilled" ? r.value : r.reason,
      })),
    };

    // 发送维护报告
    await this.sendMaintenanceReport(report);

    return report;
  }

  private async optimizeTables() {
    const tables = await mcp.call("supabase.getTables");

    const optimizationResults = [];

    for (const table of tables) {
      const beforeMetrics = await this.getTableMetrics(table.name);

      // 执行优化
      await mcp.call("supabase.optimizeTable", {
        tableName: table.name,
      });

      const afterMetrics = await this.getTableMetrics(table.name);

      optimizationResults.push({
        table: table.name,
        sizeReduction: beforeMetrics.size - afterMetrics.size,
        performanceGain: afterMetrics.scanTime - beforeMetrics.scanTime,
      });
    }

    return optimizationResults;
  }

  private async cleanupOldData() {
    // 清理过期日志
    const deletedLogs = await mcp.call("supabase.deleteOldLogs", {
      olderThan: "90d",
    });

    // 清理临时数据
    const deletedTempData = await mcp.call("supabase.deleteTempData", {
      olderThan: "7d",
    });

    return {
      deletedLogs,
      deletedTempData,
      totalFreedSpace: deletedLogs.size + deletedTempData.size,
    };
  }
}

CI/CD 集成

1: 数据库版本控制

typescript
// 数据库版本控制
class DatabaseVersionControl {
  async syncWithGit(repoPath: string, targetBranch: string = "main") {
    // 获取当前数据库模式
    const currentSchema = await this.getCurrentSchema();

    // 获取 Git 中的模式定义
    const gitSchema = await this.getGitSchema(repoPath, targetBranch);

    // 比较差异
    const differences = await this.compareSchemas(currentSchema, gitSchema);

    if (differences.length > 0) {
      // 生成迁移脚本
      const migrationScript = await this.generateMigrationScript(differences);

      // 创建 PR 或直接应用
      if (differences.some((d) => d.breaking)) {
        await this.createSchemaPR(differences, migrationScript);
      } else {
        await this.applySchemaChanges(migrationScript);
      }
    }

    return differences;
  }

  private async getCurrentSchema() {
    return await mcp.call("supabase.getSchema", {
      include: ["tables", "views", "functions", "indexes", "constraints"],
    });
  }

  private async generateMigrationScript(differences: any[]) {
    let script = "-- 自动生成的迁移脚本\n";
    script += "-- 生成时间: " + new Date().toISOString() + "\n\n";

    for (const diff of differences) {
      switch (diff.type) {
        case "table_added":
          script += this.generateCreateTableSQL(diff.table);
          break;
        case "column_added":
          script += this.generateAddColumnSQL(diff.column);
          break;
        case "index_added":
          script += this.generateCreateIndexSQL(diff.index);
          break;
        // ... 其他类型
      }
    }

    return script;
  }
}

最佳实践

1: 安全配置

typescript
// 安全配置管理
class SecurityConfigurator {
  async setupSecurity(config: any) {
    // 配置行级安全
    await this.setupRowLevelSecurity(config.tables);

    // 配置用户权限
    await this.setupUserPermissions(config.users);

    // 配置 API 限制
    await this.setupRateLimiting(config.rateLimit);

    // 配置审计日志
    await this.setupAuditLogging(config.audit);
  }

  private async setupRowLevelSecurity(tables: any[]) {
    for (const table of tables) {
      await mcp.call("supabase.enableRLS", {
        tableName: table.name,
      });

      for (const policy of table.policies) {
        await mcp.call("supabase.createPolicy", {
          tableName: table.name,
          policy: policy,
        });
      }
    }
  }
}

通过 Supabase MCP,ByteBuddy 可以提供强大的数据库管理能力,让数据工作流更加智能和自动化。