Ⅰ. 프로그래밍

CVE 관련 정보 수집 (초안)

agencies 2024. 11. 19. 11:49

파이선 소스코드

import os
import json
import pandas as pd
import requests
from bs4 import BeautifulSoup

#pip install lxml
#pip install beautifulsoup4
#pip install requests
#pip install 엑셀 관련 + pandas

# JSON 파일들이 저장된 폴더 경로
folder_path = './'

# CSV 파일 컬럼 정의 -"Reported_Date",
columns = [
    "CVE-ID", "CWE-ID", "Description", "Affected_Product", "Affected_Version",
    "Vulnerability_Type", "Severity", "CVSS_Score", "Attack_Vector", "Impact-C","Impact-I","Impact-A",
    "Exploitability",  "Published_Date", "Last_Modified_Date",
    "Privileges_Required", "Affected_Libraries", "Function_Name", "File_Name",
    "Parameter", "Exploit_Details", "Vendor", "Open_Source_Proprietary",
    "Configuration_Requirements", "Dependency_Information"

# 데이터를 담을 리스트 생성
data_list = []

#cve details
def get_info(cve_id):
    url = f"{cve_id}/"
    headers = {
        'User-Agent': (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/ Safari/537.36"

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        print(f"Failed to retrieve data for {cve_id}")
        return ["N/A"] * 15  # 모든 반환값에 기본값 설정

    soup = BeautifulSoup(response.content, 'lxml')

        # Base Score
        base_score_div = soup.find("div", class_="cvssbox")
        base_score = base_score_div.text.strip() if base_score_div else "N/A"

        # Severity
        severity = "N/A"
        table = soup.find("table", class_="table-borderless")
        if table:
            rows = table.find_all("tr")
            if len(rows) > 1:
                severity_td = rows[1].find_all("td")
                if len(severity_td) > 1:
                    severity = severity_td[1].text.strip()

        # Published Date
        published_date = "N/A"
        updated_date = "N/A"
        vendor_link = "N/A"
        published_div = soup.find("div", class_="col-auto flex-fill")
        if published_div:
            published_span = published_div.find("span", string="Published")
            if published_span and published_span.next_sibling:
                published_date = published_span.next_sibling.strip().split()[0]

            updated_span = published_div.find("span", string="Updated")
            if updated_span and updated_span.next_sibling:
                updated_date = updated_span.next_sibling.strip().split()[0]

            source_span = published_div.find("span", string="Source")
            if source_span and source_span.find_next("a"):
                vendor_link = source_span.find_next("a").text.strip()

        # Attack Vector and CIA Impacts
        attack_vector, con, inte, ava = "N/A", "N/A", "N/A", "N/A"
        cvss_details_row = soup.find("tr", id="cvss_details_row_1")
        if cvss_details_row:
            details_divs = cvss_details_row.find_all("div")
            moa = [div.text for div in details_divs]
            if moa:
                attack_vector = moa[0].split(":")[1].strip().replace("Access Complexity", "")
                con = moa[0].split(":")[4].strip().replace("Integrity Impact", "")
                inte = moa[0].split(":")[5].strip().replace("Availability Impact", "")
                ava = moa[0].split(":")[6].strip()

        # Privileges Required and Exploitability
        pr, exploit = "N/A", "N/A"
        cvss_details_row2 = soup.find("tr", id="cvss_details_row_2")
        if cvss_details_row2:
            details_divs2 = cvss_details_row2.find_all("div")
            moa2 = [div2.text for div2 in details_divs2]
            if moa2:
                pr = moa2[0].split(":")[3].strip().replace("User Interaction", "")
                exploit = moa2[0].split(":")[2].strip().replace("Privileges Required", "")

        # CWE ID
        cwe_section = soup.find("h2", id="cvedH2CWEs")
        cwe_list = []
        if cwe_section:
            cwe_items = cwe_section.find_next("ul").find_all("a")
            cwe_list = [cwe_item.text.strip() for cwe_item in cwe_items]
        cwe_ids = ", ".join(cwe_list) if cwe_list else "N/A"

        # Affected Products and Versions
        product_list = set()
        version_list = set()
        product_section = soup.find("ul", {"id": "affectedCPEsList"})
        if product_section:
            product_items = product_section.find_all("li")
            for item in product_items:
                product_text = item.text.split("Matching versions")[0].strip()

                version_info = item.find("div", class_="d-inline-block")
                if version_info:
                    version_text = version_info.text.strip()
                    if "Version" in version_text or "Versions" in version_text:

        affected_products = ", ".join(product_list) if product_list else "N/A"
        affected_versions = ", ".join(version_list) if version_list else "N/A"

        # Vulnerability Category
        category_section = soup.find("div", class_="col-auto flex-fill pt-2")
        category_list = []
        if category_section:
            category_items = category_section.find_all("span", class_="ssc-vuln-cat")
            category_list = [category_item.text.strip() for category_item in category_items]
        vulnerability_category = ", ".join(category_list) if category_list else "N/A"

        return [base_score, cwe_ids, affected_products, affected_versions, vulnerability_category,
                severity, published_date, updated_date, vendor_link, attack_vector, con, inte, ava, pr, exploit]
    except Exception as e:
        print(f"Error parsing data for {cve_id}: {e}")
        return ["N/A"] * 15

# 폴더 내 모든 JSON 파일을 읽어들임
for filename in os.listdir(folder_path):
    if filename.endswith('.json'):
        file_path = os.path.join(folder_path, filename)
        # JSON 파일 열기
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        if data.get("CVE_data_meta", {}).get("STATE") == "RESERVED":
        # JSON 데이터에서 필요한 정보 추출 (존재하지 않으면 "N/A"로 설정)
        cve_id = data.get("CVE_data_meta", {}).get("ID", "N/A")
        #단일 cve_id 테스트
        #cve_id = "CVE-2021-44228"

        #cwe_id = data.get("problemtype", {}).get("problemtype_data", [{}])[0].get("description", [{}])[0].get("value", "N/A")
        description = data.get("description", {}).get("description_data", [{}])[0].get("value", "N/A")
        #vendor_name = data.get("affects", {}).get("vendor", {}).get("vendor_data", [{}])[0].get("vendor_name", "N/A")
        #product_name = data.get("affects", {}).get("vendor", {}).get("vendor_data", [{}])[0].get("product", {}).get("product_data", [{}])[0].get("product_name", "N/A")
        #version_value = data.get("affects", {}).get("vendor", {}).get("vendor_data", [{}])[0].get("product", {}).get("product_data", [{}])[0].get("version", {}).get("version_data", [{}])[0].get("version_value", "N/A")
        # 모든 값들을 "N/A"로 통일
        def normalize(value):
            return "N/A" if value in ["n/a", "N/A", None, ""] else value

        # info
        info = get_info(cve_id)
        scores = info[0] #base_score
        cwe = info[1] #cwe
        pro = info[2] #affected_products
        ver = info[3] # " version
        vuln_type = info[4] # " vulnerability_type,
        severity = info[5] # severity
        published_date = info[6] #published
        updated_date = info[7] #updated
        vendor_link = info[8]#vendor_link
        attack_vector = info[9]#attack_vector
        con = info[10]
        inte = info[11]
        ava = info[12]
        pr = info[13]        # privil
        exploit = info[14] # exploit
        # 새로운 행 생성
        new_row = {
            "CVE-ID": normalize(cve_id),
            "CWE-ID": cwe,
            "Description": normalize(description),
            "Affected_Product": pro,
            "Affected_Version": ver,
            "Vulnerability_Type": vuln_type,
            "Severity": severity,
            "CVSS_Score": scores,
            "Attack_Vector": attack_vector,
            "Impact-C": con,
            "Impact-I": inte,
            "Impact-A": ava,
            "Exploitability": exploit,
            "Published_Date": published_date,
            "Last_Modified_Date": updated_date,
            "Privileges_Required": pr,
            "Affected_Libraries": "N/A",
            "Function_Name": "N/A",
            "File_Name": "N/A",
            "Parameter": "N/A",
            "Exploit_Details": "N/A",
            "Vendor": vendor_link,
            "Open_Source_Proprietary": "N/A",
            "Configuration_Requirements": "N/A",
            "Dependency_Information": "N/A"
        # 데이터 리스트에 추가
        # 저장테스트
        df = pd.DataFrame(data_list, columns=columns)
        output_csv_path = 'cve_data.csv'
        df.to_csv(output_csv_path, index=False, encoding='utf-8-sig')


# DataFrame 생성
df = pd.DataFrame(data_list, columns=columns)

# CSV 파일로 저장
output_csv_path = 'cve_data.csv'
df.to_csv(output_csv_path, index=False, encoding='utf-8-sig')

print(f"CSV 파일이 생성되었습니다: {output_csv_path}")




1차 고도화

(CVE DB 누적 데이터 저장하기)

- 만약 중복된 CVE-ID가 존재하면 pass / 없다면 추가!


import os
import json
import pandas as pd
import requests
from bs4 import BeautifulSoup

# JSON 파일들이 저장된 폴더 경로
folder_path = './'

# CSV 파일 컬럼 정의
columns = [
    "CVE-ID", "CWE-ID", "Description", "Affected_Product", "Affected_Version",
    "Vulnerability_Type", "Severity", "CVSS_Score", "Attack_Vector", "Impact-C", "Impact-I", "Impact-A",
    "Exploitability", "Published_Date", "Last_Modified_Date",
    "Privileges_Required", "Affected_Libraries", "Function_Name", "File_Name",
    "Parameter", "Exploit_Details", "Vendor", "Open_Source_Proprietary",
    "Configuration_Requirements", "Dependency_Information"

# 출력 파일 경로
output_csv_path = 'cve_data.csv'

# 기존 CSV 파일 확인
if os.path.exists(output_csv_path):
    # 기존 데이터 불러오기
    existing_df = pd.read_csv(output_csv_path, encoding='utf-8-sig')
    # 기존 데이터가 없으면 빈 DataFrame 생성
    existing_df = pd.DataFrame(columns=columns)

# `get_info` 함수 정의
def get_info(cve_id):
    url = f"{cve_id}/"
    headers = {
        'User-Agent': (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/ Safari/537.36"

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        print(f"Failed to retrieve data for {cve_id}")
        return ["None"] * len(columns)  # 반환값 길이와 columns 일치

    soup = BeautifulSoup(response.content, 'lxml')

        # Base Score
        base_score_div = soup.find("div", class_="cvssbox")
        base_score = base_score_div.text.strip() if base_score_div else "None"

        # Severity
        severity = "None"
        table = soup.find("table", class_="table-borderless")
        if table:
            rows = table.find_all("tr")
            if len(rows) > 1:
                severity_td = rows[1].find_all("td")
                if len(severity_td) > 1:
                    severity = severity_td[1].text.strip()

        # Published Date
        published_date = "None"
        updated_date = "None"
        vendor_link = "None"
        published_div = soup.find("div", class_="col-auto flex-fill")
        if published_div:
            published_span = published_div.find("span", string="Published")
            if published_span and published_span.next_sibling:
                published_date = published_span.next_sibling.strip().split()[0]

            updated_span = published_div.find("span", string="Updated")
            if updated_span and updated_span.next_sibling:
                updated_date = updated_span.next_sibling.strip().split()[0]

            source_span = published_div.find("span", string="Source")
            if source_span and source_span.find_next("a"):
                vendor_link = source_span.find_next("a").text.strip()

        # Attack Vector and CIA Impacts
        attack_vector, con, inte, ava = "None", "None", "None", "None"
        cvss_details_row = soup.find("tr", id="cvss_details_row_1")
        if cvss_details_row:
            details_divs = cvss_details_row.find_all("div")
            moa = [div.text for div in details_divs]
            if moa:
                attack_vector = moa[0].split(":")[1].strip().replace("Access Complexity", "")
                con = moa[0].split(":")[4].strip().replace("Integrity Impact", "")
                inte = moa[0].split(":")[5].strip().replace("Availability Impact", "")
                ava = moa[0].split(":")[6].strip()

        # Privileges Required and Exploitability
        pr, exploit = "None", "None"
        cvss_details_row2 = soup.find("tr", id="cvss_details_row_2")
        if cvss_details_row2:
            details_divs2 = cvss_details_row2.find_all("div")
            moa2 = [div2.text for div2 in details_divs2]
            if moa2:
                pr = moa2[0].split(":")[3].strip().replace("User Interaction", "")
                exploit = moa2[0].split(":")[2].strip().replace("Privileges Required", "")

        # CWE ID
        cwe_section = soup.find("h2", id="cvedH2CWEs")
        cwe_list = []
        if cwe_section:
            cwe_items = cwe_section.find_next("ul").find_all("a")
            cwe_list = [cwe_item.text.strip() for cwe_item in cwe_items]
        cwe_ids = ", ".join(cwe_list) if cwe_list else "None"

        # Affected Products and Versions
        product_list = set()
        version_list = set()
        product_section = soup.find("ul", {"id": "affectedCPEsList"})
        if product_section:
            product_items = product_section.find_all("li")
            for item in product_items:
                product_text = item.text.split("Matching versions")[0].strip()

                version_info = item.find("div", class_="d-inline-block")
                if version_info:
                    version_text = version_info.text.strip()
                    if "Version" in version_text or "Versions" in version_text:

        affected_products = ", ".join(product_list) if product_list else "None"
        affected_versions = ", ".join(version_list) if version_list else "None"

        # Vulnerability Category
        category_section = soup.find("div", class_="col-auto flex-fill pt-2")
        category_list = []
        if category_section:
            category_items = category_section.find_all("span", class_="ssc-vuln-cat")
            category_list = [category_item.text.strip() for category_item in category_items]
        vulnerability_category = ", ".join(category_list) if category_list else "None"

        # Description
        description_div = soup.find("div", id="cvedetailssummary")
        description = description_div.text.strip() if description_div else "None"

        return [
            cve_id, cwe_ids, description, affected_products, affected_versions,
            vulnerability_category, severity, base_score, attack_vector, con, inte, ava,
            exploit, published_date, updated_date, pr, "N/A", "N/A", "N/A", "N/A",
            "N/A", vendor_link, "N/A", "N/A", "N/A"
    except Exception as e:
        print(f"Error parsing data for {cve_id}: {e}")
        return ["N/A"] * len(columns)

# JSON 파일 처리
for filename in os.listdir(folder_path):
    if filename.endswith('.json'):
        cve_id = os.path.splitext(filename)[0]

        # 중복 확인
        if cve_id in existing_df['CVE-ID'].values:
            print(f"Skipping {cve_id}, already exists in CSV.")

        # 데이터 가져오기
        new_row = get_info(cve_id)

        # 새로운 데이터프레임 생성
        new_df = pd.DataFrame([new_row], columns=columns)

        # 기존 데이터프레임에 병합
        existing_df = pd.concat([existing_df, new_df], ignore_index=True)

        # CSV로 저장
        existing_df.to_csv(output_csv_path, index=False, encoding='utf-8-sig')
        print(f"Processed and saved: {cve_id}")

print(f"CSV 파일이 생성되었습니다: {output_csv_path}")