Add vulnerability scanner and WebSocket manager for scan notifications
- Implemented VulnerabilityScanner class to scan images for vulnerabilities using Trivy and NVD API. - Added methods to parse and store vulnerability data in the database. - Created WebSocketManager class to handle real-time notifications for scan status updates. - Integrated WebSocket notifications for scan start, completion, and failure events.
This commit is contained in:
266
ignore_manager.py
Normal file
266
ignore_manager.py
Normal file
@@ -0,0 +1,266 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models import File, Image, IgnoreRule, IgnoreType, Project
|
||||
|
||||
|
||||
class IgnoreManager:
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def add_ignore_rule(
|
||||
self,
|
||||
ignore_type: IgnoreType,
|
||||
target: str,
|
||||
reason: Optional[str] = None,
|
||||
created_by: Optional[str] = None,
|
||||
project_id: Optional[int] = None,
|
||||
) -> IgnoreRule:
|
||||
existing = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == ignore_type.value,
|
||||
IgnoreRule.target == target,
|
||||
IgnoreRule.project_id == project_id,
|
||||
IgnoreRule.is_active == True,
|
||||
).first()
|
||||
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
rule = IgnoreRule(
|
||||
ignore_type=ignore_type.value,
|
||||
target=target,
|
||||
reason=reason,
|
||||
created_by=created_by,
|
||||
project_id=project_id,
|
||||
)
|
||||
|
||||
self.db.add(rule)
|
||||
self.db.commit()
|
||||
self.db.refresh(rule)
|
||||
|
||||
return rule
|
||||
|
||||
def remove_ignore_rule(self, rule_id: int) -> bool:
|
||||
rule = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.id == rule_id,
|
||||
IgnoreRule.is_active == True,
|
||||
).first()
|
||||
|
||||
if not rule:
|
||||
return False
|
||||
|
||||
rule.is_active = False
|
||||
rule.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
|
||||
return True
|
||||
|
||||
def get_ignore_rules(
|
||||
self,
|
||||
ignore_type: Optional[IgnoreType] = None,
|
||||
project_id: Optional[int] = None,
|
||||
) -> List[IgnoreRule]:
|
||||
query = self.db.query(IgnoreRule).filter(IgnoreRule.is_active == True)
|
||||
|
||||
if ignore_type:
|
||||
query = query.filter(IgnoreRule.ignore_type == ignore_type.value)
|
||||
|
||||
if project_id:
|
||||
query = query.filter(IgnoreRule.project_id == project_id)
|
||||
|
||||
return query.all()
|
||||
|
||||
def is_project_ignored(self, project_id: int) -> bool:
|
||||
return self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
|
||||
IgnoreRule.project_id == project_id,
|
||||
IgnoreRule.is_active == True,
|
||||
).first() is not None
|
||||
|
||||
def is_file_ignored(self, project_id: int, file_path: str) -> bool:
|
||||
return self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
||||
IgnoreRule.project_id == project_id,
|
||||
IgnoreRule.target == file_path,
|
||||
IgnoreRule.is_active == True,
|
||||
).first() is not None
|
||||
|
||||
def is_image_ignored(self, image_name: str, project_id: Optional[int] = None) -> bool:
|
||||
query = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
||||
IgnoreRule.target == image_name,
|
||||
IgnoreRule.is_active == True,
|
||||
)
|
||||
|
||||
if project_id:
|
||||
query = query.filter(
|
||||
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
|
||||
)
|
||||
else:
|
||||
query = query.filter(IgnoreRule.project_id.is_(None))
|
||||
|
||||
return query.first() is not None
|
||||
|
||||
def should_scan_project(self, project_id: int) -> bool:
|
||||
return not self.is_project_ignored(project_id)
|
||||
|
||||
def should_scan_file(self, project_id: int, file_path: str) -> bool:
|
||||
if self.is_project_ignored(project_id):
|
||||
return False
|
||||
|
||||
return not self.is_file_ignored(project_id, file_path)
|
||||
|
||||
def should_scan_image(self, image_name: str, project_id: Optional[int] = None) -> bool:
|
||||
if project_id and self.is_project_ignored(project_id):
|
||||
return False
|
||||
|
||||
return not self.is_image_ignored(image_name, project_id)
|
||||
|
||||
def get_ignored_projects(self) -> List[Project]:
|
||||
ignored_rules = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.PROJECT.value,
|
||||
IgnoreRule.is_active == True,
|
||||
).all()
|
||||
|
||||
project_ids = [rule.project_id for rule in ignored_rules if rule.project_id]
|
||||
|
||||
return self.db.query(Project).filter(Project.id.in_(project_ids)).all()
|
||||
|
||||
def get_ignored_files(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
|
||||
query = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
||||
IgnoreRule.is_active == True,
|
||||
)
|
||||
|
||||
if project_id:
|
||||
query = query.filter(IgnoreRule.project_id == project_id)
|
||||
|
||||
return query.all()
|
||||
|
||||
def get_ignored_images(self, project_id: Optional[int] = None) -> List[IgnoreRule]:
|
||||
query = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
||||
IgnoreRule.is_active == True,
|
||||
)
|
||||
|
||||
if project_id:
|
||||
query = query.filter(
|
||||
(IgnoreRule.project_id == project_id) | (IgnoreRule.project_id.is_(None))
|
||||
)
|
||||
|
||||
return query.all()
|
||||
|
||||
def bulk_ignore_images(
|
||||
self,
|
||||
image_names: List[str],
|
||||
reason: Optional[str] = None,
|
||||
created_by: Optional[str] = None,
|
||||
project_id: Optional[int] = None,
|
||||
) -> List[IgnoreRule]:
|
||||
rules = []
|
||||
|
||||
for image_name in image_names:
|
||||
rule = self.add_ignore_rule(
|
||||
ignore_type=IgnoreType.IMAGE,
|
||||
target=image_name,
|
||||
reason=reason,
|
||||
created_by=created_by,
|
||||
project_id=project_id,
|
||||
)
|
||||
rules.append(rule)
|
||||
|
||||
return rules
|
||||
|
||||
def bulk_ignore_files(
|
||||
self,
|
||||
file_paths: List[str],
|
||||
project_id: int,
|
||||
reason: Optional[str] = None,
|
||||
created_by: Optional[str] = None,
|
||||
) -> List[IgnoreRule]:
|
||||
rules = []
|
||||
|
||||
for file_path in file_paths:
|
||||
rule = self.add_ignore_rule(
|
||||
ignore_type=IgnoreType.FILE,
|
||||
target=file_path,
|
||||
reason=reason,
|
||||
created_by=created_by,
|
||||
project_id=project_id,
|
||||
)
|
||||
rules.append(rule)
|
||||
|
||||
return rules
|
||||
|
||||
def update_ignore_rule(
|
||||
self,
|
||||
rule_id: int,
|
||||
reason: Optional[str] = None,
|
||||
created_by: Optional[str] = None,
|
||||
) -> Optional[IgnoreRule]:
|
||||
rule = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.id == rule_id,
|
||||
IgnoreRule.is_active == True,
|
||||
).first()
|
||||
|
||||
if not rule:
|
||||
return None
|
||||
|
||||
if reason is not None:
|
||||
rule.reason = reason
|
||||
|
||||
if created_by is not None:
|
||||
rule.created_by = created_by
|
||||
|
||||
rule.updated_at = datetime.utcnow()
|
||||
self.db.commit()
|
||||
self.db.refresh(rule)
|
||||
|
||||
return rule
|
||||
|
||||
def get_ignore_rule(self, rule_id: int) -> Optional[IgnoreRule]:
|
||||
return self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.id == rule_id,
|
||||
IgnoreRule.is_active == True,
|
||||
).first()
|
||||
|
||||
def cleanup_unused_rules(self) -> int:
|
||||
image_rules = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.IMAGE.value,
|
||||
IgnoreRule.is_active == True,
|
||||
).all()
|
||||
|
||||
removed_count = 0
|
||||
|
||||
for rule in image_rules:
|
||||
existing_image = self.db.query(Image).filter(
|
||||
Image.full_image_name == rule.target,
|
||||
Image.is_active == True,
|
||||
).first()
|
||||
|
||||
if not existing_image:
|
||||
rule.is_active = False
|
||||
rule.updated_at = datetime.utcnow()
|
||||
removed_count += 1
|
||||
|
||||
file_rules = self.db.query(IgnoreRule).filter(
|
||||
IgnoreRule.ignore_type == IgnoreType.FILE.value,
|
||||
IgnoreRule.is_active == True,
|
||||
).all()
|
||||
|
||||
for rule in file_rules:
|
||||
existing_file = self.db.query(File).filter(
|
||||
File.file_path == rule.target,
|
||||
File.project_id == rule.project_id,
|
||||
File.is_active == True,
|
||||
).first()
|
||||
|
||||
if not existing_file:
|
||||
rule.is_active = False
|
||||
rule.updated_at = datetime.utcnow()
|
||||
removed_count += 1
|
||||
|
||||
self.db.commit()
|
||||
return removed_count
|
||||
Reference in New Issue
Block a user