Files
code-sync-project/sync_repos.py
F嘉阳-coder 059855c6e8 添加网络诊断和重试机制
- 在 sync_repos.py 中添加Git fetch重试机制(最多3次)
- 优化错误处理和超时管理
- 在 workflow中添加网络连通性诊断步骤
- 提供更详细的错误诊断信息

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-20 22:18:52 +08:00

216 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import yaml
import os
import sys
import subprocess
from pathlib import Path
import tempfile
import shutil
import time
def sync_repository(repo_config):
"""同步单个仓库"""
name = repo_config.get('name', 'unnamed-repo')
source_url = repo_config.get('source_url', '')
target_url = repo_config.get('target_url', '')
branch = repo_config.get('branch', 'master')
if not source_url or not target_url:
print(f"❌ [{name}] 错误: 缺少 source_url 或 target_url")
return False
# 替换环境变量
source_url = os.path.expandvars(source_url)
target_url = os.path.expandvars(target_url)
# 检查必要的环境变量是否设置
if '${' in source_url:
print(f"⚠️ [{name}] 警告: source_url 中的环境变量未完全解析: {source_url}")
if '${' in target_url:
print(f"⚠️ [{name}] 警告: target_url 中的环境变量未完全解析: {target_url}")
print(f"\n{'='*60}")
print(f"开始同步仓库: {name}")
print(f"源地址: {source_url.split('@')[-1] if '@' in source_url else source_url}")
print(f"目标地址: {target_url.split('@')[-1] if '@' in target_url else target_url}")
print(f"分支: {branch}")
print(f"{'='*60}")
# 创建临时工作目录
work_dir = tempfile.mkdtemp(prefix=f"sync_{name}_")
try:
# 克隆目标仓库
print(f"\n[1/6] 克隆目标仓库...")
result = subprocess.run(
['git', 'clone', '--bare', target_url, work_dir],
capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
print(f"❌ 克隆目标仓库失败:")
print(result.stderr)
return False
os.chdir(work_dir)
# 添加上游远程
print(f"[2/6] 添加上游远程...")
result = subprocess.run(
['git', 'remote', 'add', 'upstream', source_url],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"❌ 添加上游远程失败:")
print(result.stderr)
return False
# 获取上游更改
print(f"[3/6] 获取上游更改...")
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
result = subprocess.run(
['git', 'fetch', 'upstream'],
capture_output=True, text=True, timeout=200
)
if result.returncode == 0:
break
print(f"❌ 获取上游更改失败 (尝试 {attempt}/{max_retries}):")
print(result.stderr)
except subprocess.TimeoutExpired:
print(f"❌ 获取上游更改超时 (尝试 {attempt}/{max_retries})200秒限制")
if attempt < max_retries:
print(f"⏳ 等待5秒后重试...")
time.sleep(5)
else:
# 所有尝试都失败
print(f"\n网络诊断信息:")
print(f"- 错误类型: DNS解析或网络连接失败")
print(f"- 可能原因:")
print(f" 1. Gitea Actions Runner的DNS配置问题")
print(f" 2. Runner容器无法访问外部网络")
print(f" 3. 上游仓库服务器防火墙限制")
print(f"- 解决方案:")
print(f" 1. 在workflow中添加hosts配置")
print(f" 2. 检查Runner的网络设置")
print(f" 3. 使用IP地址替代域名")
return False
# 检查目标分支是否存在
print(f"[4/6] 检查分支...")
result = subprocess.run(
['git', 'branch', '-a'],
capture_output=True, text=True
)
remote_branch = f'upstream/{branch}'
if remote_branch not in result.stdout:
print(f"❌ 上游分支 {branch} 不存在!")
print(f"可用的分支:")
print(result.stdout)
return False
# 同步操作:重置到上游(确保完全同步)
print(f"[5/6] 同步到上游分支 (使用 reset --hard)...")
# 首先更新本地分支
subprocess.run(['git', 'fetch', 'origin'], capture_output=True)
# 重置到上游状态
result = subprocess.run(
['git', 'reset', '--hard', f'upstream/{branch}'],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"❌ 同步失败:")
print(result.stderr)
return False
# 推送到目标
print(f"[6/6] 推送到目标仓库...")
result = subprocess.run(
['git', 'push', '--force', '--tags', 'origin', f'refs/heads/{branch}'],
capture_output=True, text=True, timeout=600
)
if result.returncode != 0:
print(f"❌ 推送到目标仓库失败:")
print(result.stderr)
return False
print(f"✅ [{name}] 同步成功!")
return True
except subprocess.TimeoutExpired:
print(f"❌ [{name}] 操作超时!")
return False
except Exception as e:
print(f"❌ [{name}] 发生错误: {e}")
return False
finally:
# 返回原始目录并清理
os.chdir('/')
if Path(work_dir).exists():
shutil.rmtree(work_dir, ignore_errors=True)
def load_config():
"""加载配置文件"""
config_path = os.environ.get('CONFIG_FILE', 'repos.yaml')
if not os.path.exists(config_path):
print(f"错误: 配置文件 {config_path} 不存在!")
sys.exit(1)
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if not isinstance(config, dict) or 'repositories' not in config:
print(f"错误: 配置文件格式不正确,需要包含 'repositories'")
sys.exit(1)
repos = config['repositories']
if not isinstance(repos, list) or len(repos) == 0:
print(f"警告: 配置文件中未找到仓库配置")
return []
# 过滤掉注释掉的或空配置
valid_repos = [
repo for repo in repos
if isinstance(repo, dict) and
repo.get('name') and
not repo.get('name', '').strip().startswith('#') and
(repo.get('source_url') or '').strip()
]
print(f"✓ 找到 {len(valid_repos)} 个仓库配置")
return valid_repos
except yaml.YAMLError as e:
print(f"错误: 解析YAML配置文件失败: {e}")
sys.exit(1)
except Exception as e:
print(f"错误: 读取配置文件失败: {e}")
sys.exit(1)
def main():
print("="*80)
print("多仓库同步工具")
print("="*80)
print(f"\n开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 检查必要的环境变量
required_vars = ['UPSTREAM_USERNAME', 'UPSTREAM_TOKEN', 'TARGET_USERNAME', 'TARGET_TOKEN']
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
print(f"\n❌ 错误: 缺少必要的环境变量: {', '.join(missing_vars)}")
print("请在 Gitea/Actions Secrets 中设置这些变量")
sys.exit(1)
print("✓ 所有必要环境变量已设置")
# 加载配置
repos = load_config()
if not repos:
print("\n⚠️ 没有需要同步的仓库,退出")
sys.exit(0)
# 同步所有仓库
results = []
for i, repo in enumerate(repos, 1):
print(f"\n[{i}/{len(repos)}] 正在同步...")
success = sync_repository(repo)
results.append((repo['name'], success))
# 在仓库之间等待一下,避免过于频繁的操作
if i < len(repos):
time.sleep(5)
# 总结报告
print("\n" + "="*80)
print("同步完成报告")
print("="*80)
for name, success in results:
status = "✅ 成功" if success else "❌ 失败"
print(f"{status} {name}")
successful = sum(1 for _, s in results if s)
failed = len(results) - successful
print(f"\n总计: {len(results)} 个仓库")
print(f"成功: {successful}")
print(f"失败: {failed}")
if failed > 0:
print("\n❌ 有仓库同步失败,请查看详细日志")
sys.exit(1)
else:
print("\n✅ 所有仓库同步成功!")
print(f"\n结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == '__main__':
main()