Files
gdit-back/ignore_manager.py
JSC 2c64c2c34d Add vulnerability scanner and WebSocket manager for scan notifications
- Implemented VulnerabilityScanner class to scan images for vulnerabilities using Trivy and NVD API.
- Added methods to parse and store vulnerability data in the database.
- Created WebSocketManager class to handle real-time notifications for scan status updates.
- Integrated WebSocket notifications for scan start, completion, and failure events.
2025-07-10 22:57:00 +02:00

266 lines
8.1 KiB
Python

from datetime import datetime
from typing import List, Optional
from sqlalchemy.orm import Session
from models import File, Image, IgnoreRule, IgnoreType, Project
class IgnoreManager:
def __init__(self, db: Session):
self.db = db
def add_ignore_rule(
self,
ignore_type: IgnoreType,
target: str,
reason: Optional[str] = None,
created_by: Optional[str] = None,
project_id: Optional[int] = None,
) -> IgnoreRule:
existing = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == ignore_type.value,
IgnoreRule.target == target,
IgnoreRule.project_id == project_id,
IgnoreRule.is_active == True,
).first()
if existing:
return existing
rule = IgnoreRule(
ignore_type=ignore_type.value,
target=target,
reason=reason,
created_by=created_by,
project_id=project_id,
)
self.db.add(rule)
self.db.commit()
self.db.refresh(rule)
return rule
def remove_ignore_rule(self, rule_id: int) -> bool:
rule = self.db.query(IgnoreRule).filter(
IgnoreRule.id == rule_id,
IgnoreRule.is_active == True,
).first()
if not rule:
return False
rule.is_active = False
rule.updated_at = datetime.utcnow()
self.db.commit()
return True
def get_ignore_rules(
self,
ignore_type: Optional[IgnoreType] = None,
project_id: Optional[int] = None,
) -> List[IgnoreRule]:
query = self.db.query(IgnoreRule).filter(IgnoreRule.is_active == True)
if ignore_type:
query = query.filter(IgnoreRule.ignore_type == ignore_type.value)
if project_id:
query = query.filter(IgnoreRule.project_id == project_id)
return query.all()
def is_project_ignored(self, project_id: int) -> bool:
return self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
IgnoreRule.project_id == project_id,
IgnoreRule.is_active == True,
).first() is not None
def is_file_ignored(self, project_id: int, file_path: str) -> bool:
return self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.FILE.value,
IgnoreRule.project_id == project_id,
IgnoreRule.target == file_path,
IgnoreRule.is_active == True,
).first() is not None
def is_image_ignored(self, image_name: str, project_id: Optional[int] = None) -> bool:
query = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
IgnoreRule.target == image_name,
IgnoreRule.is_active == True,
)
if project_id:
query = query.filter(
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
)
else:
query = query.filter(IgnoreRule.project_id.is_(None))
return query.first() is not None
def should_scan_project(self, project_id: int) -> bool:
return not self.is_project_ignored(project_id)
def should_scan_file(self, project_id: int, file_path: str) -> bool:
if self.is_project_ignored(project_id):
return False
return not self.is_file_ignored(project_id, file_path)
def should_scan_image(self, image_name: str, project_id: Optional[int] = None) -> bool:
if project_id and self.is_project_ignored(project_id):
return False
return not self.is_image_ignored(image_name, project_id)
def get_ignored_projects(self) -> List[Project]:
ignored_rules = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
IgnoreRule.is_active == True,
).all()
project_ids = [rule.project_id for rule in ignored_rules if rule.project_id]
return self.db.query(Project).filter(Project.id.in_(project_ids)).all()
def get_ignored_files(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
query = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.FILE.value,
IgnoreRule.is_active == True,
)
if project_id:
query = query.filter(IgnoreRule.project_id == project_id)
return query.all()
def get_ignored_images(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
query = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
IgnoreRule.is_active == True,
)
if project_id:
query = query.filter(
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
)
return query.all()
def bulk_ignore_images(
self,
image_names: List[str],
reason: Optional[str] = None,
created_by: Optional[str] = None,
project_id: Optional[int] = None,
) -> List[IgnoreRule]:
rules = []
for image_name in image_names:
rule = self.add_ignore_rule(
ignore_type=IgnoreType.IMAGE,
target=image_name,
reason=reason,
created_by=created_by,
project_id=project_id,
)
rules.append(rule)
return rules
def bulk_ignore_files(
self,
file_paths: List[str],
project_id: int,
reason: Optional[str] = None,
created_by: Optional[str] = None,
) -> List[IgnoreRule]:
rules = []
for file_path in file_paths:
rule = self.add_ignore_rule(
ignore_type=IgnoreType.FILE,
target=file_path,
reason=reason,
created_by=created_by,
project_id=project_id,
)
rules.append(rule)
return rules
def update_ignore_rule(
self,
rule_id: int,
reason: Optional[str] = None,
created_by: Optional[str] = None,
) -> Optional[IgnoreRule]:
rule = self.db.query(IgnoreRule).filter(
IgnoreRule.id == rule_id,
IgnoreRule.is_active == True,
).first()
if not rule:
return None
if reason is not None:
rule.reason = reason
if created_by is not None:
rule.created_by = created_by
rule.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(rule)
return rule
def get_ignore_rule(self, rule_id: int) -> Optional[IgnoreRule]:
return self.db.query(IgnoreRule).filter(
IgnoreRule.id == rule_id,
IgnoreRule.is_active == True,
).first()
def cleanup_unused_rules(self) -> int:
image_rules = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
IgnoreRule.is_active == True,
).all()
removed_count = 0
for rule in image_rules:
existing_image = self.db.query(Image).filter(
Image.full_image_name == rule.target,
Image.is_active == True,
).first()
if not existing_image:
rule.is_active = False
rule.updated_at = datetime.utcnow()
removed_count += 1
file_rules = self.db.query(IgnoreRule).filter(
IgnoreRule.ignore_type == IgnoreType.FILE.value,
IgnoreRule.is_active == True,
).all()
for rule in file_rules:
existing_file = self.db.query(File).filter(
File.file_path == rule.target,
File.project_id == rule.project_id,
File.is_active == True,
).first()
if not existing_file:
rule.is_active = False
rule.updated_at = datetime.utcnow()
removed_count += 1
self.db.commit()
return removed_count