- Implemented VulnerabilityScanner class to scan images for vulnerabilities using Trivy and NVD API. - Added methods to parse and store vulnerability data in the database. - Created WebSocketManager class to handle real-time notifications for scan status updates. - Integrated WebSocket notifications for scan start, completion, and failure events.
266 lines
8.1 KiB
Python
266 lines
8.1 KiB
Python
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from models import File, Image, IgnoreRule, IgnoreType, Project
|
|
|
|
|
|
class IgnoreManager:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def add_ignore_rule(
|
|
self,
|
|
ignore_type: IgnoreType,
|
|
target: str,
|
|
reason: Optional[str] = None,
|
|
created_by: Optional[str] = None,
|
|
project_id: Optional[int] = None,
|
|
) -> IgnoreRule:
|
|
existing = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == ignore_type.value,
|
|
IgnoreRule.target == target,
|
|
IgnoreRule.project_id == project_id,
|
|
IgnoreRule.is_active == True,
|
|
).first()
|
|
|
|
if existing:
|
|
return existing
|
|
|
|
rule = IgnoreRule(
|
|
ignore_type=ignore_type.value,
|
|
target=target,
|
|
reason=reason,
|
|
created_by=created_by,
|
|
project_id=project_id,
|
|
)
|
|
|
|
self.db.add(rule)
|
|
self.db.commit()
|
|
self.db.refresh(rule)
|
|
|
|
return rule
|
|
|
|
def remove_ignore_rule(self, rule_id: int) -> bool:
|
|
rule = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.id == rule_id,
|
|
IgnoreRule.is_active == True,
|
|
).first()
|
|
|
|
if not rule:
|
|
return False
|
|
|
|
rule.is_active = False
|
|
rule.updated_at = datetime.utcnow()
|
|
self.db.commit()
|
|
|
|
return True
|
|
|
|
def get_ignore_rules(
|
|
self,
|
|
ignore_type: Optional[IgnoreType] = None,
|
|
project_id: Optional[int] = None,
|
|
) -> List[IgnoreRule]:
|
|
query = self.db.query(IgnoreRule).filter(IgnoreRule.is_active == True)
|
|
|
|
if ignore_type:
|
|
query = query.filter(IgnoreRule.ignore_type == ignore_type.value)
|
|
|
|
if project_id:
|
|
query = query.filter(IgnoreRule.project_id == project_id)
|
|
|
|
return query.all()
|
|
|
|
def is_project_ignored(self, project_id: int) -> bool:
|
|
return self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
|
|
IgnoreRule.project_id == project_id,
|
|
IgnoreRule.is_active == True,
|
|
).first() is not None
|
|
|
|
def is_file_ignored(self, project_id: int, file_path: str) -> bool:
|
|
return self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
|
IgnoreRule.project_id == project_id,
|
|
IgnoreRule.target == file_path,
|
|
IgnoreRule.is_active == True,
|
|
).first() is not None
|
|
|
|
def is_image_ignored(self, image_name: str, project_id: Optional[int] = None) -> bool:
|
|
query = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
|
IgnoreRule.target == image_name,
|
|
IgnoreRule.is_active == True,
|
|
)
|
|
|
|
if project_id:
|
|
query = query.filter(
|
|
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
|
|
)
|
|
else:
|
|
query = query.filter(IgnoreRule.project_id.is_(None))
|
|
|
|
return query.first() is not None
|
|
|
|
def should_scan_project(self, project_id: int) -> bool:
|
|
return not self.is_project_ignored(project_id)
|
|
|
|
def should_scan_file(self, project_id: int, file_path: str) -> bool:
|
|
if self.is_project_ignored(project_id):
|
|
return False
|
|
|
|
return not self.is_file_ignored(project_id, file_path)
|
|
|
|
def should_scan_image(self, image_name: str, project_id: Optional[int] = None) -> bool:
|
|
if project_id and self.is_project_ignored(project_id):
|
|
return False
|
|
|
|
return not self.is_image_ignored(image_name, project_id)
|
|
|
|
def get_ignored_projects(self) -> List[Project]:
|
|
ignored_rules = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
|
|
IgnoreRule.is_active == True,
|
|
).all()
|
|
|
|
project_ids = [rule.project_id for rule in ignored_rules if rule.project_id]
|
|
|
|
return self.db.query(Project).filter(Project.id.in_(project_ids)).all()
|
|
|
|
def get_ignored_files(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
|
|
query = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
|
IgnoreRule.is_active == True,
|
|
)
|
|
|
|
if project_id:
|
|
query = query.filter(IgnoreRule.project_id == project_id)
|
|
|
|
return query.all()
|
|
|
|
def get_ignored_images(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
|
|
query = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
|
IgnoreRule.is_active == True,
|
|
)
|
|
|
|
if project_id:
|
|
query = query.filter(
|
|
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
|
|
)
|
|
|
|
return query.all()
|
|
|
|
def bulk_ignore_images(
|
|
self,
|
|
image_names: List[str],
|
|
reason: Optional[str] = None,
|
|
created_by: Optional[str] = None,
|
|
project_id: Optional[int] = None,
|
|
) -> List[IgnoreRule]:
|
|
rules = []
|
|
|
|
for image_name in image_names:
|
|
rule = self.add_ignore_rule(
|
|
ignore_type=IgnoreType.IMAGE,
|
|
target=image_name,
|
|
reason=reason,
|
|
created_by=created_by,
|
|
project_id=project_id,
|
|
)
|
|
rules.append(rule)
|
|
|
|
return rules
|
|
|
|
def bulk_ignore_files(
|
|
self,
|
|
file_paths: List[str],
|
|
project_id: int,
|
|
reason: Optional[str] = None,
|
|
created_by: Optional[str] = None,
|
|
) -> List[IgnoreRule]:
|
|
rules = []
|
|
|
|
for file_path in file_paths:
|
|
rule = self.add_ignore_rule(
|
|
ignore_type=IgnoreType.FILE,
|
|
target=file_path,
|
|
reason=reason,
|
|
created_by=created_by,
|
|
project_id=project_id,
|
|
)
|
|
rules.append(rule)
|
|
|
|
return rules
|
|
|
|
def update_ignore_rule(
|
|
self,
|
|
rule_id: int,
|
|
reason: Optional[str] = None,
|
|
created_by: Optional[str] = None,
|
|
) -> Optional[IgnoreRule]:
|
|
rule = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.id == rule_id,
|
|
IgnoreRule.is_active == True,
|
|
).first()
|
|
|
|
if not rule:
|
|
return None
|
|
|
|
if reason is not None:
|
|
rule.reason = reason
|
|
|
|
if created_by is not None:
|
|
rule.created_by = created_by
|
|
|
|
rule.updated_at = datetime.utcnow()
|
|
self.db.commit()
|
|
self.db.refresh(rule)
|
|
|
|
return rule
|
|
|
|
def get_ignore_rule(self, rule_id: int) -> Optional[IgnoreRule]:
|
|
return self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.id == rule_id,
|
|
IgnoreRule.is_active == True,
|
|
).first()
|
|
|
|
def cleanup_unused_rules(self) -> int:
|
|
image_rules = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
|
IgnoreRule.is_active == True,
|
|
).all()
|
|
|
|
removed_count = 0
|
|
|
|
for rule in image_rules:
|
|
existing_image = self.db.query(Image).filter(
|
|
Image.full_image_name == rule.target,
|
|
Image.is_active == True,
|
|
).first()
|
|
|
|
if not existing_image:
|
|
rule.is_active = False
|
|
rule.updated_at = datetime.utcnow()
|
|
removed_count += 1
|
|
|
|
file_rules = self.db.query(IgnoreRule).filter(
|
|
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
|
IgnoreRule.is_active == True,
|
|
).all()
|
|
|
|
for rule in file_rules:
|
|
existing_file = self.db.query(File).filter(
|
|
File.file_path == rule.target,
|
|
File.project_id == rule.project_id,
|
|
File.is_active == True,
|
|
).first()
|
|
|
|
if not existing_file:
|
|
rule.is_active = False
|
|
rule.updated_at = datetime.utcnow()
|
|
removed_count += 1
|
|
|
|
self.db.commit()
|
|
return removed_count |