mirror of
https://git.yoctoproject.org/meta-security
synced 2026-01-14 15:51:01 +00:00
cvert-foss - generate CVE report for the list of packages. Analyze the whole image manifest to align with the complex CPE configurations. cvert-update - update NVD feeds and store CVE structues dump. CVE dump is a pickled representation of the cve_struct dictionary. cvert.py - python library used by cvert-* scripts. NVD JSON Vulnerability Feeds https://nvd.nist.gov/vuln/data-feeds#JSON_FEED Usage examples: o Download CVE feeds to "nvdfeed" directory % cvert-update nvdfeed o Update CVE feeds and store a dump in a file % cvert-update --store cvedump nvdfeed o Generate a CVE report % cvert-foss --feed-dir nvdfeed --output report-foss.txt cve-manifest o (faster) Use dump file to generate a CVE report % cvert-foss --restore cvedump --output report-foss.txt cve-manifest o Generate a full report % cvert-foss --restore cvedump --show-description --show-reference \ --output report-foss-full.txt cve-manifest Manifest example: bash,4.2,CVE-2014-7187 python,2.7.35, python,3.5.5,CVE-2017-17522 CVE-2018-1061 Report example: patched | 7.5 | CVE-2018-1061 | python | 3.5.5 patched | 10.0 | CVE-2014-7187 | bash | 4.2 patched | 8.8 | CVE-2017-17522 | python | 3.5.5 unpatched | 10.0 | CVE-2014-6271 | bash | 4.2 unpatched | 10.0 | CVE-2014-6277 | bash | 4.2 unpatched | 10.0 | CVE-2014-6278 | bash | 4.2 unpatched | 10.0 | CVE-2014-7169 | bash | 4.2 unpatched | 10.0 | CVE-2014-7186 | bash | 4.2 unpatched | 4.6 | CVE-2012-3410 | bash | 4.2 unpatched | 8.4 | CVE-2016-7543 | bash | 4.2 unpatched | 5.0 | CVE-2010-3492 | python | 2.7.35 unpatched | 5.3 | CVE-2016-1494 | python | 2.7.35 unpatched | 6.5 | CVE-2017-18207 | python | 3.5.5 unpatched | 6.5 | CVE-2017-18207 | python | 2.7.35 unpatched | 7.1 | CVE-2013-7338 | python | 2.7.35 unpatched | 7.5 | CVE-2018-1060 | python | 3.5.5 unpatched | 8.8 | CVE-2017-17522 | python | 2.7.35 Signed-off-by: grygorii tertychnyi <gtertych@cisco.com> Signed-off-by: Armin Kuster <akuster808@gmail.com>
474 lines
13 KiB
Python
474 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (c) 2018 by Cisco Systems, Inc.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License version 2 as
|
|
# published by the Free Software Foundation.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
#
|
|
|
|
""" CVERT library: set of functions for CVE reports
|
|
"""
|
|
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import gzip
|
|
import pickle
|
|
import logging
|
|
import hashlib
|
|
import datetime
|
|
import textwrap
|
|
import urllib.request
|
|
import distutils.version
|
|
|
|
|
|
logging.getLogger(__name__).addHandler(logging.NullHandler())
|
|
|
|
|
|
def generate_report(manifest, cve_struct):
|
|
"""Generate CVE report"""
|
|
|
|
report = []
|
|
|
|
for cve in cve_struct:
|
|
affected = set()
|
|
|
|
for conf in cve_struct[cve]["nodes"]:
|
|
affected = affected.union(process_configuration(manifest, conf))
|
|
|
|
for key in affected:
|
|
product, version = key.split(",")
|
|
patched = manifest[product][version]
|
|
|
|
if cve in patched:
|
|
cve_item = {"status": "patched"}
|
|
else:
|
|
cve_item = {"status": "unpatched"}
|
|
|
|
cve_item["CVSS"] = "{0:.1f}".format(cve_struct[cve]["score"])
|
|
cve_item["CVE"] = cve
|
|
cve_item["product"] = product
|
|
cve_item["version"] = version
|
|
cve_item["description"] = cve_struct[cve]["description"]
|
|
cve_item["reference"] = [x["url"] for x in cve_struct[cve]["reference"]]
|
|
|
|
logging.debug("%9s %s %s,%s",
|
|
cve_item["status"], cve_item["CVE"],
|
|
cve_item["product"], cve_item["version"])
|
|
|
|
report.append(cve_item)
|
|
|
|
return sorted(report, key=lambda x: (x["status"], x["product"], x["CVSS"], x["CVE"]))
|
|
|
|
|
|
def process_configuration(manifest, conf):
|
|
"""Recursive call to process all CVE configurations"""
|
|
|
|
operator = conf["operator"]
|
|
|
|
if operator not in ["OR", "AND"]:
|
|
raise ValueError("operator {} is not supported".format(operator))
|
|
|
|
operator = True if operator == "AND" else False
|
|
match = False
|
|
affected = set()
|
|
|
|
if "cpe" in conf:
|
|
match = process_cpe(manifest, conf["cpe"][0], affected)
|
|
|
|
for cpe in conf["cpe"][1:]:
|
|
package_match = process_cpe(manifest, cpe, affected)
|
|
|
|
# match = match <operator> package_match
|
|
match = operator ^ ((operator ^ match) or (operator ^ package_match))
|
|
elif "children" in conf:
|
|
product_set = process_configuration(manifest, conf["children"][0])
|
|
|
|
if product_set:
|
|
match = True
|
|
affected = affected.union(product_set)
|
|
|
|
for child in conf["children"][1:]:
|
|
product_set = process_configuration(manifest, child)
|
|
package_match = True if product_set else False
|
|
|
|
# match = match OP package_match
|
|
match = operator ^ ((operator ^ match) or (operator ^ package_match))
|
|
|
|
if package_match:
|
|
affected = affected.union(product_set)
|
|
|
|
if match:
|
|
return affected
|
|
|
|
return ()
|
|
|
|
|
|
def process_cpe(manifest, cpe, affected):
|
|
"""Match CPE with all manifest packages"""
|
|
|
|
if not cpe["vulnerable"]:
|
|
# ignore non vulnerable part
|
|
return False
|
|
|
|
version_range = {}
|
|
|
|
for flag in ["versionStartIncluding",
|
|
"versionStartExcluding",
|
|
"versionEndIncluding",
|
|
"versionEndExcluding"]:
|
|
if flag in cpe:
|
|
version_range[flag] = cpe[flag]
|
|
|
|
# take only "product" and "version"
|
|
product, version = cpe["cpe23Uri"].split(":")[4:6]
|
|
|
|
if product not in manifest:
|
|
return False
|
|
|
|
if not version_range:
|
|
if version == "*":
|
|
# ignore CVEs that touches all versions of package,
|
|
# can not fix it anyway
|
|
logging.debug('ignore "*" in %s', cpe["cpe23Uri"])
|
|
return False
|
|
elif version == "-":
|
|
# "-" means NA
|
|
#
|
|
# NA (i.e. "not applicable/not used"). The logical value NA
|
|
# SHOULD be assigned when there is no legal or meaningful
|
|
# value for that attribute, or when that attribute is not
|
|
# used as part of the description.
|
|
# This includes the situation in which an attribute has
|
|
# an obtainable value that is null
|
|
#
|
|
# Ignores CVEs if version is not set
|
|
logging.debug('ignore "-" in %s', cpe["cpe23Uri"])
|
|
return False
|
|
else:
|
|
version_range["versionExactMatch"] = version
|
|
|
|
result = False
|
|
|
|
for version in manifest[product]:
|
|
try:
|
|
if match_version(version,
|
|
version_range):
|
|
logging.debug("match %s %s: %s", product, version, cpe["cpe23Uri"])
|
|
affected.add("{},{}".format(product, version))
|
|
|
|
result = True
|
|
except TypeError:
|
|
# version comparison is a very tricky
|
|
# sometimes provider changes product version in a strange manner
|
|
# and the above comparison just failed
|
|
# so here we try to make version string "more standard"
|
|
|
|
if match_version(twik_version(version),
|
|
[twik_version(v) for v in version_range]):
|
|
logging.debug("match %s %s (twiked): %s", product, twik_version(version),
|
|
cpe["cpe23Uri"])
|
|
affected.add("{},{}".format(product, version))
|
|
|
|
result = True
|
|
|
|
return result
|
|
|
|
|
|
def match_version(version, vrange):
|
|
"""Match version with the version range"""
|
|
|
|
result = False
|
|
version = util_version(version)
|
|
|
|
if "versionExactMatch" in vrange:
|
|
if version == util_version(vrange["versionExactMatch"]):
|
|
result = True
|
|
else:
|
|
result = True
|
|
|
|
if "versionStartIncluding" in vrange:
|
|
result = result and version >= util_version(vrange["versionStartIncluding"])
|
|
|
|
if "versionStartExcluding" in vrange:
|
|
result = result and version > util_version(vrange["versionStartExcluding"])
|
|
|
|
if "versionEndIncluding" in vrange:
|
|
result = result and version <= util_version(vrange["versionEndIncluding"])
|
|
|
|
if "versionEndExcluding" in vrange:
|
|
result = result and version < util_version(vrange["versionEndExcluding"])
|
|
|
|
return result
|
|
|
|
|
|
def util_version(version):
|
|
"""Simplify package version"""
|
|
return distutils.version.LooseVersion(version.split("+git")[0])
|
|
|
|
|
|
def twik_version(version):
|
|
"""Return "standard" version for complex cases"""
|
|
return "v1" + re.sub(r"^[a-zA-Z]+", "", version)
|
|
|
|
|
|
def print_report(report, width=70, show_description=False, show_reference=False, output=sys.stdout):
|
|
"""Print out final report"""
|
|
|
|
for cve in report:
|
|
print("{0:>9s} | {1:>4s} | {2:18s} | {3} | {4}".format(cve["status"], cve["CVSS"],
|
|
cve["CVE"], cve["product"],
|
|
cve["version"]),
|
|
file=output)
|
|
|
|
if show_description:
|
|
print("{0:>9s} + {1}".format(" ", "Description"), file=output)
|
|
|
|
for lin in textwrap.wrap(cve["description"], width=width):
|
|
print("{0:>9s} {1}".format(" ", lin), file=output)
|
|
|
|
if show_reference:
|
|
print("{0:>9s} + {1}".format(" ", "Reference"), file=output)
|
|
|
|
for url in cve["reference"]:
|
|
print("{0:>9s} {1}".format(" ", url), file=output)
|
|
|
|
|
|
def update_feeds(feed_dir, offline=False, start=2002):
|
|
"""Update all JSON feeds"""
|
|
|
|
feed_dir = os.path.realpath(feed_dir)
|
|
year_now = datetime.datetime.now().year
|
|
cve_struct = {}
|
|
|
|
for year in range(start, year_now + 1):
|
|
update_year(cve_struct, year, feed_dir, offline)
|
|
|
|
return cve_struct
|
|
|
|
|
|
def update_year(cve_struct, year, feed_dir, offline):
|
|
"""Update one JSON feed for the particular year"""
|
|
|
|
url_prefix = "https://static.nvd.nist.gov/feeds/json/cve/1.0"
|
|
file_prefix = "nvdcve-1.0-{0}".format(year)
|
|
|
|
meta = {
|
|
"url": "{0}/{1}.meta".format(url_prefix, file_prefix),
|
|
"file": os.path.join(feed_dir, "{0}.meta".format(file_prefix))
|
|
}
|
|
|
|
feed = {
|
|
"url": "{0}/{1}.json.gz".format(url_prefix, file_prefix),
|
|
"file": os.path.join(feed_dir, "{0}.json.gz".format(file_prefix))
|
|
}
|
|
|
|
ctx = {}
|
|
|
|
if not offline:
|
|
ctx = download_feed(meta, feed)
|
|
|
|
if not "meta" in ctx or not "feed" in ctx:
|
|
return
|
|
|
|
if not os.path.isfile(meta["file"]):
|
|
return
|
|
|
|
if not os.path.isfile(feed["file"]):
|
|
return
|
|
|
|
if not "meta" in ctx:
|
|
ctx["meta"] = ctx_meta(meta["file"])
|
|
|
|
if not "sha256" in ctx["meta"]:
|
|
return
|
|
|
|
if not "feed" in ctx:
|
|
ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
|
|
|
|
if not ctx["feed"]:
|
|
return
|
|
|
|
logging.debug("parsing year %s", year)
|
|
|
|
for cve_item in ctx["feed"]["CVE_Items"]:
|
|
iden, cve = parse_item(cve_item)
|
|
|
|
if not iden:
|
|
continue
|
|
|
|
if not cve:
|
|
logging.error("%s parse error", iden)
|
|
break
|
|
|
|
if iden in cve_struct:
|
|
logging.error("%s duplicated", iden)
|
|
break
|
|
|
|
cve_struct[iden] = cve
|
|
|
|
logging.debug("cve records: %d", len(cve_struct))
|
|
|
|
|
|
def ctx_meta(filename):
|
|
"""Parse feed meta file"""
|
|
|
|
if not os.path.isfile(filename):
|
|
return {}
|
|
|
|
ctx = {}
|
|
|
|
with open(filename) as fil:
|
|
for lin in fil:
|
|
pair = lin.split(":", maxsplit=1)
|
|
ctx[pair[0]] = pair[1].rstrip()
|
|
|
|
return ctx
|
|
|
|
|
|
def ctx_gzip(filename, checksum=""):
|
|
"""Parse feed archive file"""
|
|
|
|
if not os.path.isfile(filename):
|
|
return {}
|
|
|
|
with gzip.open(filename) as fil:
|
|
try:
|
|
ctx = fil.read()
|
|
except (EOFError, OSError):
|
|
logging.error("failed to process gz archive %s", filename, exc_info=True)
|
|
return {}
|
|
|
|
if checksum and checksum.upper() != hashlib.sha256(ctx).hexdigest().upper():
|
|
return {}
|
|
|
|
return json.loads(ctx.decode())
|
|
|
|
|
|
def parse_item(cve_item):
|
|
"""Parse one JSON CVE entry"""
|
|
|
|
cve_id = cve_item["cve"]["CVE_data_meta"]["ID"][:]
|
|
impact = cve_item["impact"]
|
|
|
|
if not impact:
|
|
# REJECTed CVE
|
|
return None, None
|
|
|
|
if "baseMetricV3" in impact:
|
|
score = impact["baseMetricV3"]["cvssV3"]["baseScore"]
|
|
elif "baseMetricV2" in impact:
|
|
score = impact["baseMetricV2"]["cvssV2"]["baseScore"]
|
|
else:
|
|
return cve_id, None
|
|
|
|
return cve_id, {
|
|
"score": score,
|
|
"nodes": cve_item["configurations"]["nodes"][:],
|
|
"reference": cve_item["cve"]["references"]["reference_data"][:],
|
|
"description": cve_item["cve"]["description"]["description_data"][0]["value"]
|
|
}
|
|
|
|
|
|
def download_feed(meta, feed):
|
|
"""Download and parse feed"""
|
|
|
|
ctx = {}
|
|
|
|
if not retrieve_url(meta["url"], meta["file"]):
|
|
return {}
|
|
|
|
ctx["meta"] = ctx_meta(meta["file"])
|
|
|
|
if not "sha256" in ctx["meta"]:
|
|
return {}
|
|
|
|
ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
|
|
|
|
if not ctx["feed"]:
|
|
if not retrieve_url(feed["url"], feed["file"]):
|
|
return {}
|
|
|
|
ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
|
|
|
|
return ctx
|
|
|
|
|
|
def retrieve_url(url, filename=None):
|
|
"""Download file by URL"""
|
|
|
|
if filename:
|
|
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
|
|
|
logging.debug("downloading %s", url)
|
|
|
|
try:
|
|
urllib.request.urlretrieve(url, filename=filename)
|
|
except urllib.error.HTTPError:
|
|
logging.error("failed to download URL %s", url, exc_info=True)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def logconfig(debug_flag=False):
|
|
"""Return default log config"""
|
|
|
|
return {
|
|
"version": 1,
|
|
"formatters": {
|
|
"f": {
|
|
"format": "# %(asctime)s %% CVERT %% %(levelname)-8s %% %(message)s"
|
|
}
|
|
},
|
|
"handlers": {
|
|
"h": {
|
|
"class": "logging.StreamHandler",
|
|
"formatter": "f",
|
|
"level": logging.DEBUG if debug_flag else logging.INFO
|
|
}
|
|
},
|
|
"root": {
|
|
"handlers": ["h"],
|
|
"level": logging.DEBUG if debug_flag else logging.INFO
|
|
},
|
|
}
|
|
|
|
|
|
def save_cve(filename, cve_struct):
|
|
"""Save CVE structure in the file"""
|
|
|
|
filename = os.path.realpath(filename)
|
|
|
|
logging.debug("saving %d CVE records to %s", len(cve_struct), filename)
|
|
|
|
with open(filename, "wb") as fil:
|
|
pickle.dump(cve_struct, fil)
|
|
|
|
|
|
def load_cve(filename):
|
|
"""Load CVE structure from the file"""
|
|
|
|
filename = os.path.realpath(filename)
|
|
|
|
logging.debug("loading from %s", filename)
|
|
|
|
with open(filename, "rb") as fil:
|
|
cve_struct = pickle.load(fil)
|
|
|
|
logging.debug("cve records: %d", len(cve_struct))
|
|
|
|
return cve_struct
|