- Implemented VulnerabilityScanner class to scan images for vulnerabilities using Trivy and NVD API. - Added methods to parse and store vulnerability data in the database. - Created WebSocketManager class to handle real-time notifications for scan status updates. - Integrated WebSocket notifications for scan start, completion, and failure events.
373 lines
13 KiB
Python
373 lines
13 KiB
Python
import re
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
import gitlab
|
|
import yaml
|
|
from sqlalchemy.orm import Session
|
|
|
|
from models import File, Image, Project, ScanJob, FileImageUsage
|
|
|
|
|
|
class DockerImageScanner:
|
|
def __init__(self, gitlab_token: str, gitlab_url: str = "https://gitlab.com", gitlab_groups: Optional[List[str]] = None):
|
|
self.gl = gitlab.Gitlab(gitlab_url, private_token=gitlab_token)
|
|
self.gitlab_groups = gitlab_groups or []
|
|
self.docker_image_patterns = {
|
|
"docker-compose": [
|
|
r"image:\s*[\"']?([^\"'\s]+)[\"']?",
|
|
r"FROM\s+([^\s]+)",
|
|
],
|
|
"dockerfile": [
|
|
r"FROM\s+([^\s]+)",
|
|
],
|
|
"gitlab-ci": [
|
|
r"image:\s*[\"']?([^\"'\s]+)[\"']?",
|
|
r"FROM\s+([^\s]+)",
|
|
],
|
|
}
|
|
self.target_branches = ["main", "master", "develop"]
|
|
self.target_files = {
|
|
"docker-compose": [
|
|
"docker-compose.yml",
|
|
"docker-compose.yaml",
|
|
"docker-compose.*.yml",
|
|
"docker-compose.*.yaml",
|
|
],
|
|
"dockerfile": [
|
|
"Dockerfile",
|
|
"Dockerfile.*",
|
|
"*.dockerfile",
|
|
],
|
|
"gitlab-ci": [
|
|
".gitlab-ci.yml",
|
|
".gitlab-ci.yaml",
|
|
".gitlab-ci.*.yml",
|
|
".gitlab-ci.*.yaml",
|
|
],
|
|
}
|
|
|
|
def scan_all_projects(self, db: Session) -> None:
|
|
if self.gitlab_groups:
|
|
# Scan projects from specific groups
|
|
projects = []
|
|
for group_identifier in self.gitlab_groups:
|
|
try:
|
|
# Try to get group by ID first, then by path
|
|
try:
|
|
group_id = int(group_identifier)
|
|
group = self.gl.groups.get(group_id)
|
|
except ValueError:
|
|
# Not an integer, try as path
|
|
groups = self.gl.groups.list(search=group_identifier)
|
|
if not groups:
|
|
print(f"Group not found: {group_identifier}")
|
|
continue
|
|
group = groups[0]
|
|
|
|
# Get all projects in the group (including subgroups)
|
|
group_projects = group.projects.list(all=True, include_subgroups=True)
|
|
|
|
# Get full project details for each project to ensure we have path_with_namespace
|
|
full_projects = []
|
|
for project_ref in group_projects:
|
|
try:
|
|
full_project = self.gl.projects.get(project_ref.id)
|
|
full_projects.append(full_project)
|
|
except Exception as e:
|
|
print(f"Error getting full project details for {project_ref.id}: {e}")
|
|
# Fallback to the reference if we can't get full details
|
|
full_projects.append(project_ref)
|
|
|
|
projects.extend(full_projects)
|
|
print(f"Found {len(full_projects)} projects in group {group_identifier}")
|
|
|
|
except Exception as e:
|
|
print(f"Error accessing group {group_identifier}: {e}")
|
|
continue
|
|
else:
|
|
# Scan all projects (original behavior)
|
|
projects = self.gl.projects.list(all=True, simple=True)
|
|
|
|
print(f"Scanning {len(projects)} projects...")
|
|
|
|
for project_data in projects:
|
|
try:
|
|
self._scan_project(db, project_data)
|
|
except Exception as e:
|
|
print(f"Error scanning project {project_data.name}: {e}")
|
|
continue
|
|
|
|
def _scan_project(self, db: Session, project_data) -> None:
|
|
project = self._get_or_create_project(db, project_data)
|
|
|
|
gl_project = self.gl.projects.get(project_data.id)
|
|
|
|
for branch in self.target_branches:
|
|
try:
|
|
self._scan_project_branch(db, project, gl_project, branch)
|
|
except Exception as e:
|
|
print(f"Error scanning branch {branch} in project {project.name}: {e}")
|
|
continue
|
|
|
|
project.last_scanned = datetime.utcnow()
|
|
db.commit()
|
|
|
|
def _get_or_create_project(self, db: Session, project_data) -> Project:
|
|
project = db.query(Project).filter(Project.gitlab_id == project_data.id).first()
|
|
|
|
if not project:
|
|
# Use path_with_namespace for full path, fallback to path if not available
|
|
full_path = getattr(project_data, 'path_with_namespace', project_data.path)
|
|
|
|
project = Project(
|
|
gitlab_id=project_data.id,
|
|
name=project_data.name,
|
|
path=full_path,
|
|
web_url=project_data.web_url,
|
|
)
|
|
db.add(project)
|
|
db.commit()
|
|
db.refresh(project)
|
|
else:
|
|
# Update existing project path if it has changed
|
|
full_path = getattr(project_data, 'path_with_namespace', project_data.path)
|
|
if project.path != full_path:
|
|
project.path = full_path
|
|
db.commit()
|
|
|
|
return project
|
|
|
|
def _scan_project_branch(self, db: Session, project: Project, gl_project, branch: str) -> None:
|
|
try:
|
|
files = gl_project.repository_tree(ref=branch, recursive=True, all=True)
|
|
except Exception:
|
|
return
|
|
|
|
for file_info in files:
|
|
if file_info["type"] != "blob":
|
|
continue
|
|
|
|
file_type = self._get_file_type(file_info["path"])
|
|
if not file_type:
|
|
continue
|
|
|
|
try:
|
|
self._scan_file(db, project, gl_project, file_info, branch, file_type)
|
|
except Exception as e:
|
|
print(f"Error scanning file {file_info['path']}: {e}")
|
|
continue
|
|
|
|
def _get_file_type(self, file_path: str) -> Optional[str]:
|
|
file_path_lower = file_path.lower()
|
|
|
|
if any(pattern in file_path_lower for pattern in ["docker-compose"]):
|
|
return "docker-compose"
|
|
elif any(pattern in file_path_lower for pattern in ["dockerfile"]):
|
|
return "dockerfile"
|
|
elif any(pattern in file_path_lower for pattern in [".gitlab-ci"]):
|
|
return "gitlab-ci"
|
|
|
|
return None
|
|
|
|
def _scan_file(
|
|
self, db: Session, project: Project, gl_project, file_info: Dict, branch: str, file_type: str
|
|
) -> None:
|
|
file_obj = self._get_or_create_file(db, project, file_info["path"], branch, file_type)
|
|
|
|
try:
|
|
file_content = gl_project.files.get(file_info["path"], ref=branch)
|
|
content = file_content.decode().decode("utf-8")
|
|
except Exception:
|
|
return
|
|
|
|
images = self._extract_images_from_content(content, file_type)
|
|
|
|
for image_name in images:
|
|
image = self._get_or_create_image(db, image_name)
|
|
self._create_or_update_file_image_usage(db, file_obj, image)
|
|
|
|
file_obj.last_scanned = datetime.utcnow()
|
|
db.commit()
|
|
|
|
def _get_or_create_file(
|
|
self, db: Session, project: Project, file_path: str, branch: str, file_type: str
|
|
) -> File:
|
|
file_obj = (
|
|
db.query(File)
|
|
.filter(
|
|
File.project_id == project.id,
|
|
File.file_path == file_path,
|
|
File.branch == branch,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not file_obj:
|
|
file_obj = File(
|
|
project_id=project.id,
|
|
file_path=file_path,
|
|
branch=branch,
|
|
file_type=file_type,
|
|
)
|
|
db.add(file_obj)
|
|
db.commit()
|
|
db.refresh(file_obj)
|
|
|
|
return file_obj
|
|
|
|
def _extract_images_from_content(self, content: str, file_type: str) -> Set[str]:
|
|
images = set()
|
|
|
|
if file_type == "docker-compose":
|
|
images.update(self._extract_from_docker_compose(content))
|
|
elif file_type == "dockerfile":
|
|
images.update(self._extract_from_dockerfile(content))
|
|
elif file_type == "gitlab-ci":
|
|
images.update(self._extract_from_gitlab_ci(content))
|
|
|
|
return images
|
|
|
|
def _extract_from_docker_compose(self, content: str) -> Set[str]:
|
|
images = set()
|
|
|
|
try:
|
|
data = yaml.safe_load(content)
|
|
if isinstance(data, dict):
|
|
self._extract_images_from_yaml(data, images)
|
|
except Exception:
|
|
pass
|
|
|
|
for pattern in self.docker_image_patterns["docker-compose"]:
|
|
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
|
|
images.update(matches)
|
|
|
|
return images
|
|
|
|
def _extract_images_from_yaml(self, data: Dict, images: Set[str]) -> None:
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
if key == "image" and isinstance(value, str):
|
|
images.add(value)
|
|
elif isinstance(value, (dict, list)):
|
|
self._extract_images_from_yaml(value, images)
|
|
elif isinstance(data, list):
|
|
for item in data:
|
|
if isinstance(item, (dict, list)):
|
|
self._extract_images_from_yaml(item, images)
|
|
|
|
def _extract_from_dockerfile(self, content: str) -> Set[str]:
|
|
images = set()
|
|
|
|
for pattern in self.docker_image_patterns["dockerfile"]:
|
|
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
|
|
images.update(matches)
|
|
|
|
return images
|
|
|
|
def _extract_from_gitlab_ci(self, content: str) -> Set[str]:
|
|
images = set()
|
|
|
|
try:
|
|
data = yaml.safe_load(content)
|
|
if isinstance(data, dict):
|
|
self._extract_images_from_yaml(data, images)
|
|
except Exception:
|
|
pass
|
|
|
|
for pattern in self.docker_image_patterns["gitlab-ci"]:
|
|
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
|
|
images.update(matches)
|
|
|
|
return images
|
|
|
|
def _get_or_create_image(self, db: Session, full_image_name: str) -> Image:
|
|
image_parts = self._parse_image_name(full_image_name)
|
|
|
|
image = (
|
|
db.query(Image)
|
|
.filter(Image.full_image_name == full_image_name)
|
|
.first()
|
|
)
|
|
|
|
if not image:
|
|
image = Image(
|
|
image_name=image_parts["name"],
|
|
tag=image_parts["tag"],
|
|
registry=image_parts["registry"],
|
|
full_image_name=full_image_name,
|
|
)
|
|
db.add(image)
|
|
db.commit()
|
|
db.refresh(image)
|
|
|
|
image.last_seen = datetime.utcnow()
|
|
db.commit()
|
|
|
|
return image
|
|
|
|
def _create_or_update_file_image_usage(self, db: Session, file_obj: File, image: Image) -> FileImageUsage:
|
|
usage = (
|
|
db.query(FileImageUsage)
|
|
.filter(
|
|
FileImageUsage.file_id == file_obj.id,
|
|
FileImageUsage.image_id == image.id,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if not usage:
|
|
usage = FileImageUsage(
|
|
file_id=file_obj.id,
|
|
image_id=image.id,
|
|
)
|
|
db.add(usage)
|
|
db.commit()
|
|
db.refresh(usage)
|
|
|
|
usage.last_seen = datetime.utcnow()
|
|
usage.is_active = True
|
|
db.commit()
|
|
|
|
return usage
|
|
|
|
def _parse_image_name(self, full_image_name: str) -> Dict[str, Optional[str]]:
|
|
parts = full_image_name.split("/")
|
|
registry = None
|
|
image_name = full_image_name
|
|
tag = None
|
|
|
|
if ":" in parts[-1]:
|
|
image_name, tag = full_image_name.rsplit(":", 1)
|
|
|
|
if len(parts) > 1 and "." in parts[0]:
|
|
registry = parts[0]
|
|
image_name = "/".join(parts[1:])
|
|
if ":" in image_name:
|
|
image_name, tag = image_name.rsplit(":", 1)
|
|
|
|
return {
|
|
"name": image_name,
|
|
"tag": tag,
|
|
"registry": registry,
|
|
}
|
|
|
|
def create_scan_job(self, db: Session, job_type: str, project_id: Optional[int] = None) -> ScanJob:
|
|
job = ScanJob(
|
|
job_type=job_type,
|
|
status="pending",
|
|
project_id=project_id,
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
def update_scan_job(self, db: Session, job_id: int, status: str, error_message: Optional[str] = None) -> None:
|
|
job = db.query(ScanJob).filter(ScanJob.id == job_id).first()
|
|
if job:
|
|
job.status = status
|
|
job.error_message = error_message
|
|
if status == "completed":
|
|
job.completed_at = datetime.utcnow()
|
|
db.commit() |