from datetime import datetime from typing import List, Optional from sqlalchemy.orm import Session from models import File, Image, IgnoreRule, IgnoreType, Project class IgnoreManager: def __init__(self, db: Session): self.db = db def add_ignore_rule( self, ignore_type: IgnoreType, target: str, reason: Optional[str] = None, created_by: Optional[str] = None, project_id: Optional[int] = None, ) -> IgnoreRule: existing = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == ignore_type.value, IgnoreRule.target == target, IgnoreRule.project_id == project_id, IgnoreRule.is_active == True, ).first() if existing: return existing rule = IgnoreRule( ignore_type=ignore_type.value, target=target, reason=reason, created_by=created_by, project_id=project_id, ) self.db.add(rule) self.db.commit() self.db.refresh(rule) return rule def remove_ignore_rule(self, rule_id: int) -> bool: rule = self.db.query(IgnoreRule).filter( IgnoreRule.id == rule_id, IgnoreRule.is_active == True, ).first() if not rule: return False rule.is_active = False rule.updated_at = datetime.utcnow() self.db.commit() return True def get_ignore_rules( self, ignore_type: Optional[IgnoreType] = None, project_id: Optional[int] = None, ) -> List[IgnoreRule]: query = self.db.query(IgnoreRule).filter(IgnoreRule.is_active == True) if ignore_type: query = query.filter(IgnoreRule.ignore_type == ignore_type.value) if project_id: query = query.filter(IgnoreRule.project_id == project_id) return query.all() def is_project_ignored(self, project_id: int) -> bool: return self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.PROJECT.value, IgnoreRule.project_id == project_id, IgnoreRule.is_active == True, ).first() is not None def is_file_ignored(self, project_id: int, file_path: str) -> bool: return self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.FILE.value, IgnoreRule.project_id == project_id, IgnoreRule.target == file_path, IgnoreRule.is_active == True, ).first() is not None def is_image_ignored(self, image_name: str, project_id: Optional[int] = None) -> bool: query = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.IMAGE.value, IgnoreRule.target == image_name, IgnoreRule.is_active == True, ) if project_id: query = query.filter( (IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None)) ) else: query = query.filter(IgnoreRule.project_id.is_(None)) return query.first() is not None def should_scan_project(self, project_id: int) -> bool: return not self.is_project_ignored(project_id) def should_scan_file(self, project_id: int, file_path: str) -> bool: if self.is_project_ignored(project_id): return False return not self.is_file_ignored(project_id, file_path) def should_scan_image(self, image_name: str, project_id: Optional[int] = None) -> bool: if project_id and self.is_project_ignored(project_id): return False return not self.is_image_ignored(image_name, project_id) def get_ignored_projects(self) -> List[Project]: ignored_rules = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.PROJECT.value, IgnoreRule.is_active == True, ).all() project_ids = [rule.project_id for rule in ignored_rules if rule.project_id] return self.db.query(Project).filter(Project.id.in_(project_ids)).all() def get_ignored_files(self, project_id: Optional[int] = None) -> List[IgnoreRule]: query = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.FILE.value, IgnoreRule.is_active == True, ) if project_id: query = query.filter(IgnoreRule.project_id == project_id) return query.all() def get_ignored_images(self, project_id: Optional[int] = None) -> List[IgnoreRule]: query = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.IMAGE.value, IgnoreRule.is_active == True, ) if project_id: query = query.filter( (IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None)) ) return query.all() def bulk_ignore_images( self, image_names: List[str], reason: Optional[str] = None, created_by: Optional[str] = None, project_id: Optional[int] = None, ) -> List[IgnoreRule]: rules = [] for image_name in image_names: rule = self.add_ignore_rule( ignore_type=IgnoreType.IMAGE, target=image_name, reason=reason, created_by=created_by, project_id=project_id, ) rules.append(rule) return rules def bulk_ignore_files( self, file_paths: List[str], project_id: int, reason: Optional[str] = None, created_by: Optional[str] = None, ) -> List[IgnoreRule]: rules = [] for file_path in file_paths: rule = self.add_ignore_rule( ignore_type=IgnoreType.FILE, target=file_path, reason=reason, created_by=created_by, project_id=project_id, ) rules.append(rule) return rules def update_ignore_rule( self, rule_id: int, reason: Optional[str] = None, created_by: Optional[str] = None, ) -> Optional[IgnoreRule]: rule = self.db.query(IgnoreRule).filter( IgnoreRule.id == rule_id, IgnoreRule.is_active == True, ).first() if not rule: return None if reason is not None: rule.reason = reason if created_by is not None: rule.created_by = created_by rule.updated_at = datetime.utcnow() self.db.commit() self.db.refresh(rule) return rule def get_ignore_rule(self, rule_id: int) -> Optional[IgnoreRule]: return self.db.query(IgnoreRule).filter( IgnoreRule.id == rule_id, IgnoreRule.is_active == True, ).first() def cleanup_unused_rules(self) -> int: image_rules = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.IMAGE.value, IgnoreRule.is_active == True, ).all() removed_count = 0 for rule in image_rules: existing_image = self.db.query(Image).filter( Image.full_image_name == rule.target, Image.is_active == True, ).first() if not existing_image: rule.is_active = False rule.updated_at = datetime.utcnow() removed_count += 1 file_rules = self.db.query(IgnoreRule).filter( IgnoreRule.ignore_type == IgnoreType.FILE.value, IgnoreRule.is_active == True, ).all() for rule in file_rules: existing_file = self.db.query(File).filter( File.file_path == rule.target, File.project_id == rule.project_id, File.is_active == True, ).first() if not existing_file: rule.is_active = False rule.updated_at = datetime.utcnow() removed_count += 1 self.db.commit() return removed_count