Files
gdit-back/scanner.py
JSC 2c64c2c34d Add vulnerability scanner and WebSocket manager for scan notifications
- Implemented VulnerabilityScanner class to scan images for vulnerabilities using Trivy and NVD API.
- Added methods to parse and store vulnerability data in the database.
- Created WebSocketManager class to handle real-time notifications for scan status updates.
- Integrated WebSocket notifications for scan start, completion, and failure events.
2025-07-10 22:57:00 +02:00

373 lines
13 KiB
Python

import re
from datetime import datetime
from typing import Dict, List, Optional, Set, Tuple
import gitlab
import yaml
from sqlalchemy.orm import Session
from models import File, Image, Project, ScanJob, FileImageUsage
class DockerImageScanner:
def __init__(self, gitlab_token: str, gitlab_url: str = "https://gitlab.com", gitlab_groups: Optional[List[str]] = None):
self.gl = gitlab.Gitlab(gitlab_url, private_token=gitlab_token)
self.gitlab_groups = gitlab_groups or []
self.docker_image_patterns = {
"docker-compose": [
r"image:\s*[\"']?([^\"'\s]+)[\"']?",
r"FROM\s+([^\s]+)",
],
"dockerfile": [
r"FROM\s+([^\s]+)",
],
"gitlab-ci": [
r"image:\s*[\"']?([^\"'\s]+)[\"']?",
r"FROM\s+([^\s]+)",
],
}
self.target_branches = ["main", "master", "develop"]
self.target_files = {
"docker-compose": [
"docker-compose.yml",
"docker-compose.yaml",
"docker-compose.*.yml",
"docker-compose.*.yaml",
],
"dockerfile": [
"Dockerfile",
"Dockerfile.*",
"*.dockerfile",
],
"gitlab-ci": [
".gitlab-ci.yml",
".gitlab-ci.yaml",
".gitlab-ci.*.yml",
".gitlab-ci.*.yaml",
],
}
def scan_all_projects(self, db: Session) -> None:
if self.gitlab_groups:
# Scan projects from specific groups
projects = []
for group_identifier in self.gitlab_groups:
try:
# Try to get group by ID first, then by path
try:
group_id = int(group_identifier)
group = self.gl.groups.get(group_id)
except ValueError:
# Not an integer, try as path
groups = self.gl.groups.list(search=group_identifier)
if not groups:
print(f"Group not found: {group_identifier}")
continue
group = groups[0]
# Get all projects in the group (including subgroups)
group_projects = group.projects.list(all=True, include_subgroups=True)
# Get full project details for each project to ensure we have path_with_namespace
full_projects = []
for project_ref in group_projects:
try:
full_project = self.gl.projects.get(project_ref.id)
full_projects.append(full_project)
except Exception as e:
print(f"Error getting full project details for {project_ref.id}: {e}")
# Fallback to the reference if we can't get full details
full_projects.append(project_ref)
projects.extend(full_projects)
print(f"Found {len(full_projects)} projects in group {group_identifier}")
except Exception as e:
print(f"Error accessing group {group_identifier}: {e}")
continue
else:
# Scan all projects (original behavior)
projects = self.gl.projects.list(all=True, simple=True)
print(f"Scanning {len(projects)} projects...")
for project_data in projects:
try:
self._scan_project(db, project_data)
except Exception as e:
print(f"Error scanning project {project_data.name}: {e}")
continue
def _scan_project(self, db: Session, project_data) -> None:
project = self._get_or_create_project(db, project_data)
gl_project = self.gl.projects.get(project_data.id)
for branch in self.target_branches:
try:
self._scan_project_branch(db, project, gl_project, branch)
except Exception as e:
print(f"Error scanning branch {branch} in project {project.name}: {e}")
continue
project.last_scanned = datetime.utcnow()
db.commit()
def _get_or_create_project(self, db: Session, project_data) -> Project:
project = db.query(Project).filter(Project.gitlab_id == project_data.id).first()
if not project:
# Use path_with_namespace for full path, fallback to path if not available
full_path = getattr(project_data, 'path_with_namespace', project_data.path)
project = Project(
gitlab_id=project_data.id,
name=project_data.name,
path=full_path,
web_url=project_data.web_url,
)
db.add(project)
db.commit()
db.refresh(project)
else:
# Update existing project path if it has changed
full_path = getattr(project_data, 'path_with_namespace', project_data.path)
if project.path != full_path:
project.path = full_path
db.commit()
return project
def _scan_project_branch(self, db: Session, project: Project, gl_project, branch: str) -> None:
try:
files = gl_project.repository_tree(ref=branch, recursive=True, all=True)
except Exception:
return
for file_info in files:
if file_info["type"] != "blob":
continue
file_type = self._get_file_type(file_info["path"])
if not file_type:
continue
try:
self._scan_file(db, project, gl_project, file_info, branch, file_type)
except Exception as e:
print(f"Error scanning file {file_info['path']}: {e}")
continue
def _get_file_type(self, file_path: str) -> Optional[str]:
file_path_lower = file_path.lower()
if any(pattern in file_path_lower for pattern in ["docker-compose"]):
return "docker-compose"
elif any(pattern in file_path_lower for pattern in ["dockerfile"]):
return "dockerfile"
elif any(pattern in file_path_lower for pattern in [".gitlab-ci"]):
return "gitlab-ci"
return None
def _scan_file(
self, db: Session, project: Project, gl_project, file_info: Dict, branch: str, file_type: str
) -> None:
file_obj = self._get_or_create_file(db, project, file_info["path"], branch, file_type)
try:
file_content = gl_project.files.get(file_info["path"], ref=branch)
content = file_content.decode().decode("utf-8")
except Exception:
return
images = self._extract_images_from_content(content, file_type)
for image_name in images:
image = self._get_or_create_image(db, image_name)
self._create_or_update_file_image_usage(db, file_obj, image)
file_obj.last_scanned = datetime.utcnow()
db.commit()
def _get_or_create_file(
self, db: Session, project: Project, file_path: str, branch: str, file_type: str
) -> File:
file_obj = (
db.query(File)
.filter(
File.project_id == project.id,
File.file_path == file_path,
File.branch == branch,
)
.first()
)
if not file_obj:
file_obj = File(
project_id=project.id,
file_path=file_path,
branch=branch,
file_type=file_type,
)
db.add(file_obj)
db.commit()
db.refresh(file_obj)
return file_obj
def _extract_images_from_content(self, content: str, file_type: str) -> Set[str]:
images = set()
if file_type == "docker-compose":
images.update(self._extract_from_docker_compose(content))
elif file_type == "dockerfile":
images.update(self._extract_from_dockerfile(content))
elif file_type == "gitlab-ci":
images.update(self._extract_from_gitlab_ci(content))
return images
def _extract_from_docker_compose(self, content: str) -> Set[str]:
images = set()
try:
data = yaml.safe_load(content)
if isinstance(data, dict):
self._extract_images_from_yaml(data, images)
except Exception:
pass
for pattern in self.docker_image_patterns["docker-compose"]:
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
images.update(matches)
return images
def _extract_images_from_yaml(self, data: Dict, images: Set[str]) -> None:
if isinstance(data, dict):
for key, value in data.items():
if key == "image" and isinstance(value, str):
images.add(value)
elif isinstance(value, (dict, list)):
self._extract_images_from_yaml(value, images)
elif isinstance(data, list):
for item in data:
if isinstance(item, (dict, list)):
self._extract_images_from_yaml(item, images)
def _extract_from_dockerfile(self, content: str) -> Set[str]:
images = set()
for pattern in self.docker_image_patterns["dockerfile"]:
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
images.update(matches)
return images
def _extract_from_gitlab_ci(self, content: str) -> Set[str]:
images = set()
try:
data = yaml.safe_load(content)
if isinstance(data, dict):
self._extract_images_from_yaml(data, images)
except Exception:
pass
for pattern in self.docker_image_patterns["gitlab-ci"]:
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
images.update(matches)
return images
def _get_or_create_image(self, db: Session, full_image_name: str) -> Image:
image_parts = self._parse_image_name(full_image_name)
image = (
db.query(Image)
.filter(Image.full_image_name == full_image_name)
.first()
)
if not image:
image = Image(
image_name=image_parts["name"],
tag=image_parts["tag"],
registry=image_parts["registry"],
full_image_name=full_image_name,
)
db.add(image)
db.commit()
db.refresh(image)
image.last_seen = datetime.utcnow()
db.commit()
return image
def _create_or_update_file_image_usage(self, db: Session, file_obj: File, image: Image) -> FileImageUsage:
usage = (
db.query(FileImageUsage)
.filter(
FileImageUsage.file_id == file_obj.id,
FileImageUsage.image_id == image.id,
)
.first()
)
if not usage:
usage = FileImageUsage(
file_id=file_obj.id,
image_id=image.id,
)
db.add(usage)
db.commit()
db.refresh(usage)
usage.last_seen = datetime.utcnow()
usage.is_active = True
db.commit()
return usage
def _parse_image_name(self, full_image_name: str) -> Dict[str, Optional[str]]:
parts = full_image_name.split("/")
registry = None
image_name = full_image_name
tag = None
if ":" in parts[-1]:
image_name, tag = full_image_name.rsplit(":", 1)
if len(parts) > 1 and "." in parts[0]:
registry = parts[0]
image_name = "/".join(parts[1:])
if ":" in image_name:
image_name, tag = image_name.rsplit(":", 1)
return {
"name": image_name,
"tag": tag,
"registry": registry,
}
def create_scan_job(self, db: Session, job_type: str, project_id: Optional[int] = None) -> ScanJob:
job = ScanJob(
job_type=job_type,
status="pending",
project_id=project_id,
)
db.add(job)
db.commit()
db.refresh(job)
return job
def update_scan_job(self, db: Session, job_id: int, status: str, error_message: Optional[str] = None) -> None:
job = db.query(ScanJob).filter(ScanJob.id == job_id).first()
if job:
job.status = status
job.error_message = error_message
if status == "completed":
job.completed_at = datetime.utcnow()
db.commit()