#!/usr/bin/env python3 import yaml import os import sys import subprocess from pathlib import Path import tempfile import shutil import time def sync_repository(repo_config): """同步单个仓库""" name = repo_config.get('name', 'unnamed-repo') source_url = repo_config.get('source_url', '') target_url = repo_config.get('target_url', '') branch = repo_config.get('branch', 'master') if not source_url or not target_url: print(f"❌ [{name}] 错误: 缺少 source_url 或 target_url") return False # 替换环境变量 print(f"[DEBUG] ===== 变量替换开始 =====") print(f"[DEBUG] 替换前的 source_url: {source_url}") print(f"[DEBUG] 替换前的 target_url: {target_url}") print(f"[DEBUG] 当前环境变量:") # 检查环境变量是否已设置 env_vars = {} for var in ['UPSTREAM_USERNAME', 'UPSTREAM_TOKEN', 'TARGET_USERNAME', 'TARGET_TOKEN']: value = os.environ.get(var) env_vars[var] = value if value: print(f"[DEBUG] {var}: 已设置 (长度: {len(value)})") # 隐藏敏感信息(只显示前后几个字符) if 'TOKEN' in var and len(value) > 8: shown = value[:4] + '...' + value[-4:] else: shown = value print(f"[DEBUG] {var} 值: {shown}") else: print(f"[DEBUG] {var}: ⚠️ 未设置或为空!会导致替换失败") # 方法1:使用 os.path.expandvars() source_url_expanded = os.path.expandvars(source_url) target_url_expanded = os.path.expandvars(target_url) # 方法2:手动替换(备用方案) if '${' in source_url_expanded: print(f"[DEBUG] os.path.expandvars() 未完全替换,尝试手动替换...") for var, value in env_vars.items(): if value: source_url_expanded = source_url_expanded.replace(f"${{{var}}}", value) source_url_expanded = source_url_expanded.replace(f"${var}", value) if '${' in target_url_expanded: print(f"[DEBUG] os.path.expandvars() 未完全替换,尝试手动替换...") for var, value in env_vars.items(): if value: target_url_expanded = target_url_expanded.replace(f"${{{var}}}", value) target_url_expanded = target_url_expanded.replace(f"${var}", value) source_url = source_url_expanded target_url = target_url_expanded print(f"[DEBUG] 最终 source_url: {source_url}") print(f"[DEBUG] 最终 target_url: {target_url}") print(f"[DEBUG] ===== 变量替换结束 =====") # 检查必要的环境变量是否设置 if '${' in source_url: print(f"❌ [{name}] 错误: source_url 中的环境变量未完全解析: {source_url}") print(f"❌ 这可能是因为:") print(f" 1. 环境变量未传递给 Python 脚本") print(f" 2. 变量名拼写错误") print(f" 3. secrets 未在 Gitea 中正确配置") return False if '${' in target_url: print(f"❌ [{name}] 错误: target_url 中的环境变量未完全解析: {target_url}") return False print(f"\n{'='*60}") print(f"开始同步仓库: {name}") print(f"源地址: {source_url.split('@')[-1] if '@' in source_url else source_url}") print(f"目标地址: {target_url.split('@')[-1] if '@' in target_url else target_url}") print(f"分支: {branch}") print(f"{'='*60}") # 创建临时工作目录 work_dir = tempfile.mkdtemp(prefix=f"sync_{name}_") try: # 克隆目标仓库 print(f"\n[1/6] 克隆目标仓库...") result = subprocess.run( ['git', 'clone', target_url, work_dir], capture_output=True, text=True, timeout=3600 ) if result.returncode != 0: print(f"❌ 克隆目标仓库失败:") print(result.stderr) return False os.chdir(work_dir) # 添加上游远程 print(f"[2/6] 添加上游远程...") result = subprocess.run( ['git', 'remote', 'add', 'upstream', source_url], capture_output=True, text=True ) if result.returncode != 0: print(f"❌ 添加上游远程失败:") print(result.stderr) return False # 获取上游更改 print(f"[3/6] 获取上游更改...") max_retries = 3 for attempt in range(1, max_retries + 1): try: result = subprocess.run( ['git', 'fetch', 'upstream'], capture_output=True, text=True, timeout=3600 ) if result.returncode == 0: break print(f"❌ 获取上游更改失败 (尝试 {attempt}/{max_retries}):") print(result.stderr) except subprocess.TimeoutExpired: print(f"❌ 获取上游更改超时 (尝试 {attempt}/{max_retries}),200秒限制") if attempt < max_retries: print(f"⏳ 等待5秒后重试...") time.sleep(5) else: # 所有尝试都失败 print(f"\n网络诊断信息:") print(f"- 错误类型: DNS解析或网络连接失败") print(f"- 可能原因:") print(f" 1. Gitea Actions Runner的DNS配置问题") print(f" 2. Runner容器无法访问外部网络") print(f" 3. 上游仓库服务器防火墙限制") print(f"- 解决方案:") print(f" 1. 在workflow中添加hosts配置") print(f" 2. 检查Runner的网络设置") print(f" 3. 使用IP地址替代域名") return False # 检查目标分支是否存在 print(f"[4/6] 检查分支...") result = subprocess.run( ['git', 'branch', '-a'], capture_output=True, text=True ) remote_branch = f'upstream/{branch}' if remote_branch not in result.stdout: print(f"❌ 上游分支 {branch} 不存在!") print(f"可用的分支:") print(result.stdout) return False # 同步操作:重置到上游(确保完全同步) print(f"[5/6] 同步到上游分支 (使用 reset --hard)...") # 首先更新本地分支 subprocess.run(['git', 'fetch', 'origin'], capture_output=True) # 重置到上游状态 result = subprocess.run( ['git', 'reset', '--hard', f'upstream/{branch}'], capture_output=True, text=True ) if result.returncode != 0: print(f"❌ 同步失败:") print(result.stderr) return False # 推送到目标 print(f"[6/6] 推送到目标仓库...") result = subprocess.run( ['git', 'push', '--force', '--tags', 'origin', f'refs/heads/{branch}'], capture_output=True, text=True, timeout=3600 ) if result.returncode != 0: print(f"❌ 推送到目标仓库失败:") print(result.stderr) return False print(f"✅ [{name}] 同步成功!") return True except subprocess.TimeoutExpired: print(f"❌ [{name}] 操作超时!") return False except Exception as e: print(f"❌ [{name}] 发生错误: {e}") return False finally: # 返回原始目录并清理 os.chdir('/') if Path(work_dir).exists(): shutil.rmtree(work_dir, ignore_errors=True) def load_config(): """加载配置文件""" config_path = os.environ.get('CONFIG_FILE', 'repos.yaml') if not os.path.exists(config_path): print(f"错误: 配置文件 {config_path} 不存在!") sys.exit(1) try: with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if not isinstance(config, dict) or 'repositories' not in config: print(f"错误: 配置文件格式不正确,需要包含 'repositories' 键") sys.exit(1) repos = config['repositories'] if not isinstance(repos, list) or len(repos) == 0: print(f"警告: 配置文件中未找到仓库配置") return [] # 过滤掉注释掉的或空配置 valid_repos = [ repo for repo in repos if isinstance(repo, dict) and repo.get('name') and not repo.get('name', '').strip().startswith('#') and (repo.get('source_url') or '').strip() ] print(f"✓ 找到 {len(valid_repos)} 个仓库配置") return valid_repos except yaml.YAMLError as e: print(f"错误: 解析YAML配置文件失败: {e}") sys.exit(1) except Exception as e: print(f"错误: 读取配置文件失败: {e}") sys.exit(1) def main(): print("="*80) print("多仓库同步工具") print("="*80) print(f"\n开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") # 检查必要的环境变量 required_vars = ['UPSTREAM_USERNAME', 'UPSTREAM_TOKEN', 'TARGET_USERNAME', 'TARGET_TOKEN'] missing_vars = [var for var in required_vars if not os.environ.get(var)] if missing_vars: print(f"\n❌ 错误: 缺少必要的环境变量: {', '.join(missing_vars)}") print("请在 Gitea/Actions Secrets 中设置这些变量") sys.exit(1) print("✓ 所有必要环境变量已设置") # 加载配置 repos = load_config() if not repos: print("\n⚠️ 没有需要同步的仓库,退出") sys.exit(0) # 同步所有仓库 results = [] for i, repo in enumerate(repos, 1): print(f"\n[{i}/{len(repos)}] 正在同步...") success = sync_repository(repo) results.append((repo['name'], success)) # 在仓库之间等待一下,避免过于频繁的操作 if i < len(repos): time.sleep(5) # 总结报告 print("\n" + "="*80) print("同步完成报告") print("="*80) for name, success in results: status = "✅ 成功" if success else "❌ 失败" print(f"{status} {name}") successful = sum(1 for _, s in results if s) failed = len(results) - successful print(f"\n总计: {len(results)} 个仓库") print(f"成功: {successful} 个") print(f"失败: {failed} 个") if failed > 0: print("\n❌ 有仓库同步失败,请查看详细日志") sys.exit(1) else: print("\n✅ 所有仓库同步成功!") print(f"\n结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") if __name__ == '__main__': main()