Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions aboutcode/federated/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ def large_size_configs(cls):
"mlflow": 16,
"pub": 16,
"rpm": 16,
# Small Ecosystem all use the defaul
# Small Ecosystem all use the default
"default": 1,
}
return [
Expand Down Expand Up @@ -1069,7 +1069,7 @@ def medium_size_configs(cls):
"mlflow": 8,
"pub": 8,
"rpm": 8,
# Small Ecosystem all use the defaul
# Small Ecosystem all use the default
"default": 1,
}
return [
Expand Down Expand Up @@ -1110,7 +1110,7 @@ def small_size_configs(cls):
"mlflow": 4,
"pub": 4,
"rpm": 4,
# Small Ecosystem all use the defaul
# Small Ecosystem all use the default
"default": 1,
}
return [
Expand Down Expand Up @@ -1181,7 +1181,7 @@ def cluster_preset():
DataCluster(
data_kind="security_advisories",
description="VulnerableCode security advisories for each package version.",
datafile_path_template="{/namespace}/{name}/{version}/advisories.json",
datafile_path_template="{/namespace}/{name}/{version}/advisories.yml",
purl_type_configs=[PurlTypeConfig.default_config()],
data_schema_url="",
documentation_url="",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ data_clusters:
data_license: CC-BY-4.0
data_maintainers: []
- data_kind: security_advisories
datafile_path_template: '{/namespace}/{name}/{version}/advisories.json'
datafile_path_template: '{/namespace}/{name}/{version}/advisories.yml'
purl_type_configs:
- purl_type: default
number_of_repos: 1
Expand Down
9 changes: 5 additions & 4 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2344,13 +2344,14 @@ def save(self, *args, **kwargs):
@property
def pipeline_class(self):
"""Return the pipeline class."""

from vulnerabilities.importers import IMPORTERS_REGISTRY
from vulnerabilities.improvers import IMPROVERS_REGISTRY
from vulnerabilities.pipelines.exporters import EXPORTERS_REGISTRY

pipeline_registry = IMPORTERS_REGISTRY | IMPROVERS_REGISTRY | EXPORTERS_REGISTRY

if self.pipeline_id in IMPROVERS_REGISTRY:
return IMPROVERS_REGISTRY.get(self.pipeline_id)
if self.pipeline_id in IMPORTERS_REGISTRY:
return IMPORTERS_REGISTRY.get(self.pipeline_id)
return pipeline_registry[self.pipeline_id]

@property
def description(self):
Expand Down
4 changes: 4 additions & 0 deletions vulnerabilities/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def log(self, message, level=logging.INFO):
class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun):
pipeline_id = None # Unique Pipeline ID

# When set to true pipeline is run only once.
# To rerun onetime pipeline reset is_active field to True via migration.
run_once = False

def on_failure(self):
"""
Tasks to run in the event that pipeline execution fails.
Expand Down
16 changes: 16 additions & 0 deletions vulnerabilities/pipelines/exporters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from vulnerabilities.pipelines.exporters import federate_vulnerabilities
from vulnerabilities.utils import create_registry

EXPORTERS_REGISTRY = create_registry(
[
federate_vulnerabilities.FederatePackageVulnerabilities,
]
)
307 changes: 307 additions & 0 deletions vulnerabilities/pipelines/exporters/federate_vulnerabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


import itertools
import shutil
from operator import attrgetter
from pathlib import Path

import saneyaml
from aboutcode.pipeline import LoopProgress
from django.conf import settings
from django.db.models import Prefetch

from aboutcode.federated import DataFederation
from vulnerabilities.models import AdvisoryV2
from vulnerabilities.models import ImpactedPackage
from vulnerabilities.models import PackageV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.pipes import federatedcode


class FederatePackageVulnerabilities(VulnerableCodePipeline):
"""Export package vulnerabilities and advisory to FederatedCode."""

pipeline_id = "federate_vulnerabilities_v2"

@classmethod
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
cls.fetch_federation_config,
cls.clone_federation_repository,
cls.publish_package_related_advisories,
cls.publish_advisories,
cls.delete_working_dir,
)

def check_federatedcode_eligibility(self):
"""Check if FederatedCode is configured."""
federatedcode.check_federatedcode_configured_and_available(self.log)

def create_federatedcode_working_dir(self):
"""Create temporary working dir."""
self.working_path = federatedcode.create_federatedcode_working_dir()

def fetch_federation_config(self):
"""Fetch config for PackageURL Federation."""
data_federation = DataFederation.from_url(
name="aboutcode-data",
remote_root_url="https://github.com/aboutcode-data",
)
self.data_cluster = data_federation.get_cluster("security_advisories")

def clone_federation_repository(self):
self.repo = federatedcode.clone_repository(
repo_url=settings.FEDERATEDCODE_VULNERABILITIES_REPO,
clone_path=self.working_path / "advisories-data",
logger=self.log,
)

def publish_package_related_advisories(self):
"""Publish package advisories relations to FederatedCode"""
repo_path = Path(self.repo.working_dir)
commit_count = 1
batch_size = 2000
chunk_size = 500
files_to_commit = set()

distinct_packages_count = (
PackageV2.objects.values("type", "namespace", "name", "version")
.distinct("type", "namespace", "name", "version")
.count()
)
package_qs = package_prefetched_qs()
grouped_packages = itertools.groupby(
package_qs.iterator(chunk_size=chunk_size),
key=attrgetter("type", "namespace", "name", "version"),
)

self.log(f"Exporting advisory relation for {distinct_packages_count} packages.")
progress = LoopProgress(
total_iterations=distinct_packages_count,
progress_step=5,
logger=self.log,
)
for _, packages in progress.iter(grouped_packages):
purl, package_vulnerabilities = get_package_related_advisory(packages)
package_repo, datafile_path = self.data_cluster.get_datafile_repo_and_path(purl)
package_vulnerability_path = f"packages/{package_repo}/{datafile_path}"

write_file(
repo_path=repo_path,
file_path=package_vulnerability_path,
data=package_vulnerabilities,
)
files_to_commit.add(package_vulnerability_path)

if len(files_to_commit) > batch_size:
if federatedcode.commit_and_push_changes(
commit_message=self.commit_message("package advisory relations", commit_count),
repo=self.repo,
files_to_commit=files_to_commit,
logger=self.log,
):
commit_count += 1
files_to_commit.clear()

if files_to_commit:
federatedcode.commit_and_push_changes(
commit_message=self.commit_message(
"package advisory relations",
commit_count,
commit_count,
),
repo=self.repo,
files_to_commit=files_to_commit,
logger=self.log,
)

self.log(f"Federated {distinct_packages_count} package advisories.")

def publish_advisories(self):
"""Publish advisory to FederatedCode"""
repo_path = Path(self.repo.working_dir)
commit_count = 1
batch_size = 2000
chunk_size = 1000
files_to_commit = set()
advisory_qs = advisory_prefetched_qs()
advisory_count = advisory_qs.count()

self.log(f"Exporting {advisory_count} advisory.")
progress = LoopProgress(
total_iterations=advisory_count,
progress_step=5,
logger=self.log,
)
for advisory in progress.iter(advisory_qs.iterator(chunk_size=chunk_size)):
advisory_data = serialize_advisory(advisory)
adv_file = f"advisories/{advisory.avid}.yml"
write_file(
repo_path=repo_path,
file_path=adv_file,
data=advisory_data,
)
files_to_commit.add(adv_file)

if len(files_to_commit) > batch_size:
if federatedcode.commit_and_push_changes(
commit_message=self.commit_message("advisories", commit_count),
repo=self.repo,
files_to_commit=files_to_commit,
logger=self.log,
):
commit_count += 1
files_to_commit.clear()

if files_to_commit:
federatedcode.commit_and_push_changes(
commit_message=self.commit_message(
"advisories",
commit_count,
commit_count,
),
repo=self.repo,
files_to_commit=files_to_commit,
logger=self.log,
)

self.log(f"Successfully federated {advisory_count} advisories.")

def delete_working_dir(self):
"""Remove temporary working dir."""
if hasattr(self, "working_path") and self.working_path:
shutil.rmtree(self.working_path)

def on_failure(self):
self.delete_working_dir()

def commit_message(
self,
item_type,
commit_count,
total_commit_count="many",
):
"""Commit message for pushing package vulnerability."""
return federatedcode.commit_message(
item_type=item_type,
commit_count=commit_count,
total_commit_count=total_commit_count,
)


def package_prefetched_qs():
return (
PackageV2.objects.order_by("type", "namespace", "name", "version")
.only("package_url", "type", "namespace", "name", "version")
.prefetch_related(
Prefetch(
"affected_in_impacts",
queryset=ImpactedPackage.objects.only("advisory_id").prefetch_related(
Prefetch(
"advisory",
queryset=AdvisoryV2.objects.only("avid"),
)
),
),
Prefetch(
"fixed_in_impacts",
queryset=ImpactedPackage.objects.only("advisory_id").prefetch_related(
Prefetch(
"advisory",
queryset=AdvisoryV2.objects.only("avid"),
)
),
),
)
)


def get_package_related_advisory(packages):
package_vulnerabilities = []
for package in packages:
affected_by_vulnerabilities = [
impact.advisory.avid for impact in package.affected_in_impacts.all()
]
fixing_vulnerabilities = [impact.advisory.avid for impact in package.fixed_in_impacts.all()]

package_vulnerability = {
"purl": package.package_url,
"affected_by_advisories": sorted(affected_by_vulnerabilities),
"fixing_advisories": sorted(fixing_vulnerabilities),
}
package_vulnerabilities.append(package_vulnerability)

return package.package_url, package_vulnerabilities


def advisory_prefetched_qs():
return AdvisoryV2.objects.prefetch_related(
"impacted_packages",
"aliases",
"references",
"severities",
"weaknesses",
)


def serialize_severity(sev):
return {
"score": sev.value,
"scoring_system": sev.scoring_system,
"scoring_elements": sev.scoring_elements,
"published_at": str(sev.published_at),
"url": sev.url,
}


def serialize_references(reference):
return {
"url": reference.url,
"reference_type": reference.reference_type,
"reference_id": reference.reference_id,
}


def serialize_advisory(advisory):
"""Return a plain data mapping serialized from advisory object."""
aliases = sorted([a.alias for a in advisory.aliases.all()])
severities = [serialize_severity(sev) for sev in advisory.severities.all()]
weaknesses = [wkns.cwe for wkns in advisory.weaknesses.all()]
references = [serialize_references(ref) for ref in advisory.references.all()]
impacts = [
{
"purl": impact.base_purl,
"affected_versions": impact.affecting_vers,
"fixed_versions": impact.fixed_vers,
}
for impact in advisory.impacted_packages.all()
]

return {
"advisory_id": advisory.advisory_id,
"datasource_id": advisory.avid,
"datasource_url": advisory.url,
"aliases": aliases,
"summary": advisory.summary,
"impacted_packages": impacts,
"severities": severities,
"weaknesses": weaknesses,
"references": references,
}


def write_file(repo_path, file_path, data):
"""Write ``data`` as YAML to ``repo_path``."""
write_to = repo_path / file_path
write_to.parent.mkdir(parents=True, exist_ok=True)
with open(write_to, encoding="utf-8", mode="w") as f:
f.write(saneyaml.dump(data))
Loading
Loading