import re from datetime import datetime from typing import Dict, List, Optional, Set, Tuple import gitlab import yaml from sqlalchemy.orm import Session from models import File, Image, Project, ScanJob, FileImageUsage class DockerImageScanner: def __init__(self, gitlab_token: str, gitlab_url: str = "https://gitlab.com", gitlab_groups: Optional[List[str]] = None): self.gl = gitlab.Gitlab(gitlab_url, private_token=gitlab_token) self.gitlab_groups = gitlab_groups or [] self.docker_image_patterns = { "docker-compose": [ r"image:\s*[\"']?([^\"'\s]+)[\"']?", r"FROM\s+([^\s]+)", ], "dockerfile": [ r"FROM\s+([^\s]+)", ], "gitlab-ci": [ r"image:\s*[\"']?([^\"'\s]+)[\"']?", r"FROM\s+([^\s]+)", ], } self.target_branches = ["main", "master", "develop"] self.target_files = { "docker-compose": [ "docker-compose.yml", "docker-compose.yaml", "docker-compose.*.yml", "docker-compose.*.yaml", ], "dockerfile": [ "Dockerfile", "Dockerfile.*", "*.dockerfile", ], "gitlab-ci": [ ".gitlab-ci.yml", ".gitlab-ci.yaml", ".gitlab-ci.*.yml", ".gitlab-ci.*.yaml", ], } def scan_all_projects(self, db: Session) -> None: if self.gitlab_groups: # Scan projects from specific groups projects = [] for group_identifier in self.gitlab_groups: try: # Try to get group by ID first, then by path try: group_id = int(group_identifier) group = self.gl.groups.get(group_id) except ValueError: # Not an integer, try as path groups = self.gl.groups.list(search=group_identifier) if not groups: print(f"Group not found: {group_identifier}") continue group = groups[0] # Get all projects in the group (including subgroups) group_projects = group.projects.list(all=True, include_subgroups=True) # Get full project details for each project to ensure we have path_with_namespace full_projects = [] for project_ref in group_projects: try: full_project = self.gl.projects.get(project_ref.id) full_projects.append(full_project) except Exception as e: print(f"Error getting full project details for {project_ref.id}: {e}") # Fallback to the reference if we can't get full details full_projects.append(project_ref) projects.extend(full_projects) print(f"Found {len(full_projects)} projects in group {group_identifier}") except Exception as e: print(f"Error accessing group {group_identifier}: {e}") continue else: # Scan all projects (original behavior) projects = self.gl.projects.list(all=True, simple=True) print(f"Scanning {len(projects)} projects...") for project_data in projects: try: self._scan_project(db, project_data) except Exception as e: print(f"Error scanning project {project_data.name}: {e}") continue def _scan_project(self, db: Session, project_data) -> None: project = self._get_or_create_project(db, project_data) gl_project = self.gl.projects.get(project_data.id) for branch in self.target_branches: try: self._scan_project_branch(db, project, gl_project, branch) except Exception as e: print(f"Error scanning branch {branch} in project {project.name}: {e}") continue project.last_scanned = datetime.utcnow() db.commit() def _get_or_create_project(self, db: Session, project_data) -> Project: project = db.query(Project).filter(Project.gitlab_id == project_data.id).first() if not project: # Use path_with_namespace for full path, fallback to path if not available full_path = getattr(project_data, 'path_with_namespace', project_data.path) project = Project( gitlab_id=project_data.id, name=project_data.name, path=full_path, web_url=project_data.web_url, ) db.add(project) db.commit() db.refresh(project) else: # Update existing project path if it has changed full_path = getattr(project_data, 'path_with_namespace', project_data.path) if project.path != full_path: project.path = full_path db.commit() return project def _scan_project_branch(self, db: Session, project: Project, gl_project, branch: str) -> None: try: files = gl_project.repository_tree(ref=branch, recursive=True, all=True) except Exception: return for file_info in files: if file_info["type"] != "blob": continue file_type = self._get_file_type(file_info["path"]) if not file_type: continue try: self._scan_file(db, project, gl_project, file_info, branch, file_type) except Exception as e: print(f"Error scanning file {file_info['path']}: {e}") continue def _get_file_type(self, file_path: str) -> Optional[str]: file_path_lower = file_path.lower() if any(pattern in file_path_lower for pattern in ["docker-compose"]): return "docker-compose" elif any(pattern in file_path_lower for pattern in ["dockerfile"]): return "dockerfile" elif any(pattern in file_path_lower for pattern in [".gitlab-ci"]): return "gitlab-ci" return None def _scan_file( self, db: Session, project: Project, gl_project, file_info: Dict, branch: str, file_type: str ) -> None: file_obj = self._get_or_create_file(db, project, file_info["path"], branch, file_type) try: file_content = gl_project.files.get(file_info["path"], ref=branch) content = file_content.decode().decode("utf-8") except Exception: return images = self._extract_images_from_content(content, file_type) for image_name in images: image = self._get_or_create_image(db, image_name) self._create_or_update_file_image_usage(db, file_obj, image) file_obj.last_scanned = datetime.utcnow() db.commit() def _get_or_create_file( self, db: Session, project: Project, file_path: str, branch: str, file_type: str ) -> File: file_obj = ( db.query(File) .filter( File.project_id == project.id, File.file_path == file_path, File.branch == branch, ) .first() ) if not file_obj: file_obj = File( project_id=project.id, file_path=file_path, branch=branch, file_type=file_type, ) db.add(file_obj) db.commit() db.refresh(file_obj) return file_obj def _extract_images_from_content(self, content: str, file_type: str) -> Set[str]: images = set() if file_type == "docker-compose": images.update(self._extract_from_docker_compose(content)) elif file_type == "dockerfile": images.update(self._extract_from_dockerfile(content)) elif file_type == "gitlab-ci": images.update(self._extract_from_gitlab_ci(content)) return images def _extract_from_docker_compose(self, content: str) -> Set[str]: images = set() try: data = yaml.safe_load(content) if isinstance(data, dict): self._extract_images_from_yaml(data, images) except Exception: pass for pattern in self.docker_image_patterns["docker-compose"]: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) images.update(matches) return images def _extract_images_from_yaml(self, data: Dict, images: Set[str]) -> None: if isinstance(data, dict): for key, value in data.items(): if key == "image" and isinstance(value, str): images.add(value) elif isinstance(value, (dict, list)): self._extract_images_from_yaml(value, images) elif isinstance(data, list): for item in data: if isinstance(item, (dict, list)): self._extract_images_from_yaml(item, images) def _extract_from_dockerfile(self, content: str) -> Set[str]: images = set() for pattern in self.docker_image_patterns["dockerfile"]: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) images.update(matches) return images def _extract_from_gitlab_ci(self, content: str) -> Set[str]: images = set() try: data = yaml.safe_load(content) if isinstance(data, dict): self._extract_images_from_yaml(data, images) except Exception: pass for pattern in self.docker_image_patterns["gitlab-ci"]: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) images.update(matches) return images def _get_or_create_image(self, db: Session, full_image_name: str) -> Image: image_parts = self._parse_image_name(full_image_name) image = ( db.query(Image) .filter(Image.full_image_name == full_image_name) .first() ) if not image: image = Image( image_name=image_parts["name"], tag=image_parts["tag"], registry=image_parts["registry"], full_image_name=full_image_name, ) db.add(image) db.commit() db.refresh(image) image.last_seen = datetime.utcnow() db.commit() return image def _create_or_update_file_image_usage(self, db: Session, file_obj: File, image: Image) -> FileImageUsage: usage = ( db.query(FileImageUsage) .filter( FileImageUsage.file_id == file_obj.id, FileImageUsage.image_id == image.id, ) .first() ) if not usage: usage = FileImageUsage( file_id=file_obj.id, image_id=image.id, ) db.add(usage) db.commit() db.refresh(usage) usage.last_seen = datetime.utcnow() usage.is_active = True db.commit() return usage def _parse_image_name(self, full_image_name: str) -> Dict[str, Optional[str]]: parts = full_image_name.split("/") registry = None image_name = full_image_name tag = None if ":" in parts[-1]: image_name, tag = full_image_name.rsplit(":", 1) if len(parts) > 1 and "." in parts[0]: registry = parts[0] image_name = "/".join(parts[1:]) if ":" in image_name: image_name, tag = image_name.rsplit(":", 1) return { "name": image_name, "tag": tag, "registry": registry, } def create_scan_job(self, db: Session, job_type: str, project_id: Optional[int] = None) -> ScanJob: job = ScanJob( job_type=job_type, status="pending", project_id=project_id, ) db.add(job) db.commit() db.refresh(job) return job def update_scan_job(self, db: Session, job_id: int, status: str, error_message: Optional[str] = None) -> None: job = db.query(ScanJob).filter(ScanJob.id == job_id).first() if job: job.status = status job.error_message = error_message if status == "completed": job.completed_at = datetime.utcnow() db.commit()