Files
ansible/docs/runbooks/arr-cleanup/verify.py
Tuan-Dat Tran 5b44c46e10 docs(arr-cleanup): improve runbook and fix api key paths
Rewrites findings.md with how-to section, cleaner summary tables,
and more detailed per-pass results. Fixes relative path for
sonarr/radarr API key files after runbook moved deeper in repo.
2026-04-27 21:39:28 +02:00

247 lines
8.1 KiB
Python

#!/usr/bin/env python3
"""
Cross-reference /media/downloads/sonarr and /media/downloads/radarr against
the Sonarr/Radarr APIs, then verify reported file paths actually exist on disk.
Requirements:
- kubectl port-forwards active:
kubectl -n arr-stack port-forward svc/sonarr 8989:8989
kubectl -n arr-stack port-forward svc/radarr 7878:7878
- SSH access to aya01
- API keys in ../../../../sonarr.api.env and ../../../../radarr.api.env
Output:
/tmp/arr_verified.json — full structured results for use by cleanup.py
"""
import urllib.request
import json
import subprocess
import re
import sys
import os
SONARR_URL = "http://localhost:8989/api/v3"
RADARR_URL = "http://localhost:7878/api/v3"
SSH_HOST = "aya01"
script_dir = os.path.dirname(os.path.abspath(__file__))
def load_key(filename):
path = os.path.join(script_dir, '../../../..', filename)
return open(path).read().strip()
SONARR_KEY = load_key('sonarr.api.env')
RADARR_KEY = load_key('radarr.api.env')
def api_get(url):
with urllib.request.urlopen(url, timeout=30) as r:
return json.load(r)
def norm(s):
return re.sub(r'[^a-z0-9]', '', s.lower())
def extract_title(name, is_movie):
"""Strip release tags from a download name to recover a bare title."""
name = re.sub(r'\.(mkv|mp4|avi|m4v)$', '', name, flags=re.IGNORECASE)
name = re.sub(r'\[.*?\]', '', name)
if is_movie:
name = re.sub(r'[\.\s_\-]?(19|20)\d{2}.*$', '', name)
else:
name = re.sub(r'[\.\s_\-]?[Ss]\d{1,2}([Ee]\d{1,2})?.*$', '', name)
return re.sub(r'[\.\-_]+', ' ', name).strip()
def build_index(records, key_fn):
idx = {}
for rec in records:
for k in key_fn(rec):
if k:
idx[k] = rec
return idx
def find_match(dl_name, idx, is_movie):
title = extract_title(dl_name, is_movie)
tn = norm(title)
if tn in idx:
return idx[tn]
for k, rec in idx.items():
if k and len(k) > 5 and (tn.startswith(k) or k.startswith(tn)):
return rec
return None
def ssh_check_paths(paths):
"""Return (existing, missing) sets for the given list of paths."""
if not paths:
return set(), set()
cmds = '\n'.join(
f'[ -e {json.dumps(p)} ] && echo "EXISTS:{p}" || echo "MISSING:{p}"'
for p in paths
)
r = subprocess.run(['ssh', SSH_HOST, 'bash', '-s'],
input=cmds, capture_output=True, text=True)
existing, missing = set(), set()
for line in r.stdout.splitlines():
if line.startswith('EXISTS:'):
existing.add(line[7:])
elif line.startswith('MISSING:'):
missing.add(line[8:])
return existing, missing
def main():
print("Fetching Radarr movies...")
radarr_movies = api_get(f"{RADARR_URL}/movie?apikey={RADARR_KEY}")
print(f" {len(radarr_movies)} movies")
print("Fetching Sonarr series...")
sonarr_series = api_get(f"{SONARR_URL}/series?apikey={SONARR_KEY}")
print(f" {len(sonarr_series)} series")
# Radarr index
def radarr_keys(m):
return [norm(m['title']), norm(f"{m['title']}{m.get('year','')}")]
radarr_idx = build_index(radarr_movies, radarr_keys)
# Enrich radarr records with disk path
for m in radarr_movies:
mf = m.get('movieFile')
m['_file_path'] = (
mf['path'].replace('/movies/', '/media/movies/', 1) if mf and mf.get('path') else None
)
m['_dir_path'] = m.get('path', '').replace('/movies/', '/media/movies/', 1)
# Sonarr index
def sonarr_keys(s):
return [norm(s['title'])]
sonarr_idx = build_index(sonarr_series, sonarr_keys)
for s in sonarr_series:
s['_dir_path'] = s.get('path', '').replace('/tv/', '/media/series/', 1)
# Download listings
print(f"\nFetching download listings from {SSH_HOST}...")
r = subprocess.run(
['ssh', SSH_HOST, 'ls /media/downloads/sonarr/ && echo "===RADARR===" && ls /media/downloads/radarr/'],
capture_output=True, text=True
)
parts = r.stdout.split('===RADARR===\n')
sonarr_dls = [l.strip() for l in parts[0].splitlines() if l.strip()]
radarr_dls = [l.strip() for l in parts[1].splitlines() if l.strip()]
print(f" Sonarr downloads: {len(sonarr_dls)}")
print(f" Radarr downloads: {len(radarr_dls)}")
# Match and collect paths
radarr_matched, radarr_orphans = [], []
for dl in radarr_dls:
rec = find_match(dl, radarr_idx, is_movie=True)
if rec is None:
radarr_orphans.append(dl)
else:
check_path = rec['_file_path'] or rec['_dir_path']
radarr_matched.append({
'dl': dl,
'title': rec['title'],
'year': rec.get('year'),
'hasFile': rec.get('hasFile', False),
'monitored': rec.get('monitored'),
'check_path': check_path,
})
sonarr_matched, sonarr_orphans = [], []
for dl in sonarr_dls:
rec = find_match(dl, sonarr_idx, is_movie=False)
if rec is None:
sonarr_orphans.append(dl)
else:
stats = rec.get('statistics', {})
sonarr_matched.append({
'dl': dl,
'title': rec['title'],
'episodeFileCount': stats.get('episodeFileCount', 0),
'totalEpisodeCount': stats.get('totalEpisodeCount', 0),
'percentOfEpisodes': stats.get('percentOfEpisodes', 0),
'monitored': rec.get('monitored'),
'status': rec.get('status'),
'check_path': rec['_dir_path'],
})
# Batch disk verification
all_paths = list(set(
[m['check_path'] for m in radarr_matched if m['check_path']] +
[m['check_path'] for m in sonarr_matched if m['check_path']]
))
print(f"\nVerifying {len(all_paths)} paths on disk...")
existing, missing = ssh_check_paths(all_paths)
print(f" {len(existing)} exist, {len(missing)} missing")
# Classify
def classify_radarr(m):
if not m['hasFile'] or not m['check_path']:
return 'not_imported'
if m['check_path'] in existing:
return 'safe'
return 'path_missing'
def classify_sonarr(m):
if m['episodeFileCount'] == 0 or not m['check_path']:
return 'not_imported'
if m['check_path'] in existing:
return 'safe'
return 'path_missing'
for m in radarr_matched:
m['status'] = classify_radarr(m)
for m in sonarr_matched:
m['status'] = classify_sonarr(m)
result = {
'radarr_matched': radarr_matched,
'radarr_orphans': radarr_orphans,
'sonarr_matched': sonarr_matched,
'sonarr_orphans': sonarr_orphans,
'existing_paths': list(existing),
'missing_paths': list(missing),
}
out_path = '/tmp/arr_verified.json'
with open(out_path, 'w') as f:
json.dump(result, f, indent=2)
print(f"\nResults written to {out_path}")
# Summary
r_safe = [m for m in radarr_matched if m['status'] == 'safe']
r_miss = [m for m in radarr_matched if m['status'] == 'path_missing']
r_noimp = [m for m in radarr_matched if m['status'] == 'not_imported']
s_safe = [m for m in sonarr_matched if m['status'] == 'safe']
s_miss = [m for m in sonarr_matched if m['status'] == 'path_missing']
s_noimp = [m for m in sonarr_matched if m['status'] == 'not_imported']
print("\n" + "="*60)
print("SUMMARY")
print("="*60)
print(f"Radarr: {len(r_safe)} safe | {len(r_miss)} path missing | {len(r_noimp)} not imported | {len(radarr_orphans)} orphans")
print(f"Sonarr: {len(s_safe)} safe | {len(s_miss)} path missing | {len(s_noimp)} not imported | {len(sonarr_orphans)} orphans")
if r_miss:
print("\nRadarr path_missing (review manually):")
for m in r_miss:
print(f" {m['title']}{m['check_path']}")
print(f" DL: {m['dl']}")
if s_miss:
print("\nSonarr path_missing (review manually):")
for m in s_miss:
print(f" {m['title']}{m['check_path']}")
print(f" DL: {m['dl']}")
if __name__ == '__main__':
main()