agencies

CVE database (구축) : 고도화 (combined -> ) 본문

Ⅳ. 기타

CVE database (구축) : 고도화 (combined -> )

agencies 2024. 11. 25. 16:07
import pandas as pd
import os
import re
import requests
from bs4 import BeautifulSoup

# 초기 설정
csv_file_path = "output.csv"
columns = [
    "index", "all_file_name", "CVE-ID", "Description", "Affected_Product", "Affected_Version",
    "Vulnerability_Type", "Severity", "CVSS_Score", "Attack_Vector", "Impact-C", "Impact-I", "Impact-A",
    "Exploitability", "Published_Date", "Last_Modified_Date",
    "Privileges_Required","file_name", "func_name", "old", "new", "Vendor"
]









# `get_info` 함수 정의
def get_info(cve_id):
    url = f"https://www.cvedetails.com/cve/{cve_id}/"
    headers = {
        'User-Agent': (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
        )
    }

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        print(f"Failed to retrieve data for {cve_id}")
        return ["None"] * len(columns)  # 반환값 길이와 columns 일치

    soup = BeautifulSoup(response.content, 'lxml')

    try:
        # Base Score
        base_score_div = soup.find("div", class_="cvssbox")
        base_score = base_score_div.text.strip() if base_score_div else "None"

        # Severity
        severity = "None"
        table = soup.find("table", class_="table-borderless")
        if table:
            rows = table.find_all("tr")
            if len(rows) > 1:
                severity_td = rows[1].find_all("td")
                if len(severity_td) > 1:
                    severity = severity_td[1].text.strip()

        # Published Date
        published_date = "None"
        updated_date = "None"
        vendor_link = "None"
        published_div = soup.find("div", class_="col-auto flex-fill")
        if published_div:
            published_span = published_div.find("span", string="Published")
            if published_span and published_span.next_sibling:
                published_date = published_span.next_sibling.strip().split()[0]

            updated_span = published_div.find("span", string="Updated")
            if updated_span and updated_span.next_sibling:
                updated_date = updated_span.next_sibling.strip().split()[0]

            source_span = published_div.find("span", string="Source")
            if source_span and source_span.find_next("a"):
                vendor_link = source_span.find_next("a").text.strip()

        # Attack Vector and CIA Impacts
        attack_vector, con, inte, ava = "None", "None", "None", "None"
        cvss_details_row = soup.find("tr", id="cvss_details_row_1")
        if cvss_details_row:
            details_divs = cvss_details_row.find_all("div")
            moa = [div.text for div in details_divs]
            if moa:
                attack_vector = moa[0].split(":")[1].strip().replace("Access Complexity", "")
                con = moa[0].split(":")[4].strip().replace("Integrity Impact", "")
                inte = moa[0].split(":")[5].strip().replace("Availability Impact", "")
                ava = moa[0].split(":")[6].strip()

        # Privileges Required and Exploitability
        pr, exploit = "None", "None"
        cvss_details_row2 = soup.find("tr", id="cvss_details_row_2")
        if cvss_details_row2:
            details_divs2 = cvss_details_row2.find_all("div")
            moa2 = [div2.text for div2 in details_divs2]
            if moa2:
                pr = moa2[0].split(":")[3].strip().replace("User Interaction", "")
                exploit = moa2[0].split(":")[2].strip().replace("Privileges Required", "")

        # CWE ID
        cwe_section = soup.find("h2", id="cvedH2CWEs")
        cwe_list = []
        if cwe_section:
            cwe_items = cwe_section.find_next("ul").find_all("a")
            cwe_list = [cwe_item.text.strip() for cwe_item in cwe_items]
        cwe_ids = ", ".join(cwe_list) if cwe_list else "None"

        # Affected Products and Versions
        product_list = set()
        version_list = set()
        product_section = soup.find("ul", {"id": "affectedCPEsList"})
        if product_section:
            product_items = product_section.find_all("li")
            for item in product_items:
                product_text = item.text.split("Matching versions")[0].strip()
                product_list.add(product_text)

                version_info = item.find("div", class_="d-inline-block")
                if version_info:
                    version_text = version_info.text.strip()
                    if "Version" in version_text or "Versions" in version_text:
                        version_list.add(version_text)

        affected_products = ", ".join(product_list) if product_list else "None"
        affected_versions = ", ".join(version_list) if version_list else "None"

        # Vulnerability Category
        category_section = soup.find("div", class_="col-auto flex-fill pt-2")
        category_list = []
        if category_section:
            category_items = category_section.find_all("span", class_="ssc-vuln-cat")
            category_list = [category_item.text.strip() for category_item in category_items]
        vulnerability_category = ", ".join(category_list) if category_list else "None"

        # Description
        description_div = soup.find("div", id="cvedetailssummary")
        description = description_div.text.strip() if description_div else "None"
        

        return [
           cwe_ids, description, affected_products, affected_versions,
            vulnerability_category, severity, base_score, attack_vector, con, inte, ava,
            exploit, published_date, updated_date, pr, vendor_link
        ]

    except Exception as e:
        print(f"Error parsing data for {cve_id}: {e}")
        return ["N/A"] * len(columns)












# 파일명에서 파일명(.c)과 함수명을 추출하는 함수
def extract_file_details(file):
    # _OLD 또는 _NEW 앞까지만 파일 이름으로 추출
    all_file_name = re.split(r'(_OLD|_NEW)', file)[0]
    
    file_name_match = re.search(r'_[a-f0-9]+_(.+?\.c)', file)
    file_name = file_name_match.group(1) if file_name_match else ""
    
    func_match = re.search(r'@@(.+?)(_OLD|_NEW|$)', file)
    func_name = func_match.group(1) if func_match else ""
    
    return all_file_name, file_name, func_name

# 기존 CSV 파일 불러오기 및 초기화
if os.path.exists(csv_file_path):
    df = pd.read_csv(csv_file_path)
    for col in columns:
        if col not in df.columns:
            df[col] = ""
else:
    df = pd.DataFrame(columns=columns)

# 캐시 초기화
#cve_cache = {row['CVE-ID']: row['CVE_description'] for _, row in df.iterrows() if pd.notna(row['CVE-ID'])}
current_index = df['index'].max() if not df.empty else 0

# 새로운 데이터를 처리할 폴더 설정
vul_folder = "only_c"

try:
    for root, _, files in os.walk(vul_folder):
        for file in files:
            if file.endswith(".c"):
                print(f"[INFO] Processing: {file}")
                file_path = os.path.join(root, file)
                
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # 파일 이름 및 수정 여부 확인
                all_file_name, file_name, func_name = extract_file_details(file)
                if "_OLD.c" in file:
                    column_to_update = 'old'
                elif "_NEW.c" in file:
                    column_to_update = 'new'
                else:
                    continue

                # CVE 및 CWE 정보 추출
                cve_match = re.search(r'CVE-\d{4}-\d{4,7}', file)
                
                cve = cve_match.group(0) if cve_match else ""

                INFO = get_info(cve)


                # 데이터프레임 업데이트 또는 추가
                if all_file_name in df['all_file_name'].values:
                    # 기존 row 업데이트
                    row_index = df.index[df['all_file_name'] == all_file_name][0]
                    df.at[row_index, column_to_update] = content
                    if pd.isna(df.at[row_index, 'CVE-ID']):
                        df.at[row_index, 'CVE-ID'] = cve
                    if pd.isna(df.at[row_index, 'file_name']):
                        df.at[row_index, 'file_name'] = file_name
                    if pd.isna(df.at[row_index, 'func_name']):
                        df.at[row_index, 'func_name'] = func_name
                    
                    # INFO 값을 업데이트
                    cwe_ids, description, affected_products, affected_versions, vulnerability_category, severity, base_score, attack_vector, con, inte, ava, exploit, published_date, updated_date, pr, vendor = INFO
                    df.at[row_index, 'Description'] = description
                    df.at[row_index, 'Affected_Product'] = affected_products
                    df.at[row_index, 'Affected_Version'] = affected_versions
                    df.at[row_index, 'Vulnerability_Type'] = vulnerability_category
                    df.at[row_index, 'Severity'] = severity
                    df.at[row_index, 'CVSS_Score'] = base_score
                    df.at[row_index, 'Attack_Vector'] = attack_vector
                    df.at[row_index, 'Impact-C'] = con
                    df.at[row_index, 'Impact-I'] = inte
                    df.at[row_index, 'Impact-A'] = ava
                    df.at[row_index, 'Exploitability'] = exploit
                    df.at[row_index, 'Published_Date'] = published_date
                    df.at[row_index, 'Last_Modified_Date'] = updated_date
                    df.at[row_index, 'Privileges_Required'] = pr
                    df.at[row_index, 'Vendor'] = vendor
                else:
                    # 새로운 row 추가
                    current_index += 1
                    cwe_ids, description, affected_products, affected_versions, vulnerability_category, severity, base_score, attack_vector, con, inte, ava, exploit, published_date, updated_date, pr, vendor = INFO
                    new_row = {
                        "index": current_index,
                        "all_file_name": all_file_name,
                        "CVE-ID": cve,
                        "Description": description,
                        "Affected_Product": affected_products,
                        "Affected_Version": affected_versions,
                        "Vulnerability_Type": vulnerability_category,
                        "Severity": severity,
                        "CVSS_Score": base_score,
                        "Attack_Vector": attack_vector,
                        "Impact-C": con,
                        "Impact-I": inte,
                        "Impact-A": ava,
                        "Exploitability": exploit,
                        "Published_Date": published_date,
                        "Last_Modified_Date": updated_date,
                        "Privileges_Required": pr,
                        "file_name": file_name,
                        "func_name": func_name,
                        "old": content if column_to_update == 'old' else "",
                        "new": content if column_to_update == 'new' else "",
                        "Vendor": vendor
                    }
                    df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

except KeyboardInterrupt:
    print("\n[INFO] 작업이 중단되었습니다. 현재까지 진행된 내용이 저장되었습니다.")
finally:
    df.to_csv(csv_file_path, index=False)
    print("[INFO] 모든 파일 처리가 완료되었습니다.")


import pandas as pd
import os
import re
import requests
from bs4 import BeautifulSoup

# 초기 설정
csv_file_path = "output.csv"
columns = [
    "index", "all_file_name", "CVE-ID", "Description", "Affected_Product", "Affected_Version",
    "Vulnerability_Type", "Severity", "CVSS_Score", "Attack_Vector", "Impact-C", "Impact-I", "Impact-A",
    "Exploitability", "Published_Date", "Last_Modified_Date",
    "Privileges_Required","file_name", "func_name", "old", "new", "Vendor","patch"
]









# `get_info` 함수 정의
def get_info(cve_id):
    url = f"https://www.cvedetails.com/cve/{cve_id}/"
    headers = {
        'User-Agent': (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
        )
    }

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        print(f"Failed to retrieve data for {cve_id}")
        return ["None"] * len(columns)  # 반환값 길이와 columns 일치

    soup = BeautifulSoup(response.content, 'lxml')

    try:
        # Base Score
        base_score_div = soup.find("div", class_="cvssbox")
        base_score = base_score_div.text.strip() if base_score_div else "None"

        # Severity
        severity = "None"
        table = soup.find("table", class_="table-borderless")
        if table:
            rows = table.find_all("tr")
            if len(rows) > 1:
                severity_td = rows[1].find_all("td")
                if len(severity_td) > 1:
                    severity = severity_td[1].text.strip()

        # Published Date
        published_date = "None"
        updated_date = "None"
        vendor_link = "None"
        published_div = soup.find("div", class_="col-auto flex-fill")
        if published_div:
            published_span = published_div.find("span", string="Published")
            if published_span and published_span.next_sibling:
                published_date = published_span.next_sibling.strip().split()[0]

            updated_span = published_div.find("span", string="Updated")
            if updated_span and updated_span.next_sibling:
                updated_date = updated_span.next_sibling.strip().split()[0]

            source_span = published_div.find("span", string="Source")
            if source_span and source_span.find_next("a"):
                vendor_link = source_span.find_next("a").text.strip()

        # Attack Vector and CIA Impacts
        attack_vector, con, inte, ava = "None", "None", "None", "None"
        cvss_details_row = soup.find("tr", id="cvss_details_row_1")
        if cvss_details_row:
            details_divs = cvss_details_row.find_all("div")
            moa = [div.text for div in details_divs]
            if moa:
                attack_vector = moa[0].split(":")[1].strip().replace("Access Complexity", "")
                con = moa[0].split(":")[4].strip().replace("Integrity Impact", "")
                inte = moa[0].split(":")[5].strip().replace("Availability Impact", "")
                ava = moa[0].split(":")[6].strip()

        # Privileges Required and Exploitability
        pr, exploit = "None", "None"
        cvss_details_row2 = soup.find("tr", id="cvss_details_row_2")
        if cvss_details_row2:
            details_divs2 = cvss_details_row2.find_all("div")
            moa2 = [div2.text for div2 in details_divs2]
            if moa2:
                pr = moa2[0].split(":")[3].strip().replace("User Interaction", "")
                exploit = moa2[0].split(":")[2].strip().replace("Privileges Required", "")

        # CWE ID
        cwe_section = soup.find("h2", id="cvedH2CWEs")
        cwe_list = []
        if cwe_section:
            cwe_items = cwe_section.find_next("ul").find_all("a")
            cwe_list = [cwe_item.text.strip() for cwe_item in cwe_items]
        cwe_ids = ", ".join(cwe_list) if cwe_list else "None"

        # Affected Products and Versions
        product_list = set()
        version_list = set()
        product_section = soup.find("ul", {"id": "affectedCPEsList"})
        if product_section:
            product_items = product_section.find_all("li")
            for item in product_items:
                product_text = item.text.split("Matching versions")[0].strip()
                product_list.add(product_text)

                version_info = item.find("div", class_="d-inline-block")
                if version_info:
                    version_text = version_info.text.strip()
                    if "Version" in version_text or "Versions" in version_text:
                        version_list.add(version_text)

        affected_products = ", ".join(product_list) if product_list else "None"
        affected_versions = ", ".join(version_list) if version_list else "None"

        # Vulnerability Category
        category_section = soup.find("div", class_="col-auto flex-fill pt-2")
        category_list = []
        if category_section:
            category_items = category_section.find_all("span", class_="ssc-vuln-cat")
            category_list = [category_item.text.strip() for category_item in category_items]
        vulnerability_category = ", ".join(category_list) if category_list else "None"

        # Description
        description_div = soup.find("div", id="cvedetailssummary")
        description = description_div.text.strip() if description_div else "None"
        

        return [
           cwe_ids, description, affected_products, affected_versions,
            vulnerability_category, severity, base_score, attack_vector, con, inte, ava,
            exploit, published_date, updated_date, pr, vendor_link
        ]

    except Exception as e:
        print(f"Error parsing data for {cve_id}: {e}")
        return ["N/A"] * len(columns)












# 파일명에서 파일명(.c)과 함수명을 추출하는 함수
def extract_file_details(file):
    # _OLD 또는 _NEW 앞까지만 파일 이름으로 추출
    all_file_name = re.split(r'(_OLD|_NEW|\.patch)', file)[0]
    
    file_name_match = re.search(r'_[a-f0-9]+_(.+?\.c)', file)
    file_name = file_name_match.group(1) if file_name_match else ""
    
    func_match = re.search(r'@@(.+?)(_OLD|_NEW|\.patch)', file)
    func_name = func_match.group(1) if func_match else ""
    
    return all_file_name, file_name, func_name

# 기존 CSV 파일 불러오기 및 초기화
if os.path.exists(csv_file_path):
    df = pd.read_csv(csv_file_path)
    for col in columns:
        if col not in df.columns:
            df[col] = ""
else:
    df = pd.DataFrame(columns=columns)

# 캐시 초기화
#cve_cache = {row['CVE-ID']: row['CVE_description'] for _, row in df.iterrows() if pd.notna(row['CVE-ID'])}
current_index = df['index'].max() if not df.empty else 0

# 새로운 데이터를 처리할 폴더 설정
vul_folder = "test"

try:
    for root, _, files in os.walk(vul_folder):
        for file in files:
            if file.endswith(".vul")or file.endswith(".patch"):
                print(f"[INFO] Processing: {file}")
                file_path = os.path.join(root, file)
                
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # 파일 이름 및 수정 여부 확인
                all_file_name, file_name, func_name = extract_file_details(file)
                if "_OLD.vul" in file:
                    column_to_update = 'old'
                elif "_NEW.vul" in file:
                    column_to_update = 'new'
                elif ".patch" in file:
                    column_to_update = 'patch'
                else:
                    continue

                # CVE 및 CWE 정보 추출
                cve_match = re.search(r'CVE-\d{4}-\d{4,7}', file)
                
                cve = cve_match.group(0) if cve_match else ""

                INFO = get_info(cve)


                # 데이터프레임 업데이트 또는 추가
                if all_file_name in df['all_file_name'].values:
                    # 기존 row 업데이트
                    row_index = df.index[df['all_file_name'] == all_file_name][0]
                    df.at[row_index, column_to_update] = content
                    if pd.isna(df.at[row_index, 'CVE-ID']):
                        df.at[row_index, 'CVE-ID'] = cve
                    if pd.isna(df.at[row_index, 'file_name']):
                        df.at[row_index, 'file_name'] = file_name
                    if pd.isna(df.at[row_index, 'func_name']):
                        df.at[row_index, 'func_name'] = func_name
                    
                    # INFO 값을 업데이트
                    cwe_ids, description, affected_products, affected_versions, vulnerability_category, severity, base_score, attack_vector, con, inte, ava, exploit, published_date, updated_date, pr, vendor = INFO
                    df.at[row_index, 'Description'] = description
                    df.at[row_index, 'Affected_Product'] = affected_products
                    df.at[row_index, 'Affected_Version'] = affected_versions
                    df.at[row_index, 'Vulnerability_Type'] = vulnerability_category
                    df.at[row_index, 'Severity'] = severity
                    df.at[row_index, 'CVSS_Score'] = base_score
                    df.at[row_index, 'Attack_Vector'] = attack_vector
                    df.at[row_index, 'Impact-C'] = con
                    df.at[row_index, 'Impact-I'] = inte
                    df.at[row_index, 'Impact-A'] = ava
                    df.at[row_index, 'Exploitability'] = exploit
                    df.at[row_index, 'Published_Date'] = published_date
                    df.at[row_index, 'Last_Modified_Date'] = updated_date
                    df.at[row_index, 'Privileges_Required'] = pr
                    df.at[row_index, 'Vendor'] = vendor
                else:
                    # 새로운 row 추가
                    current_index += 1
                    cwe_ids, description, affected_products, affected_versions, vulnerability_category, severity, base_score, attack_vector, con, inte, ava, exploit, published_date, updated_date, pr, vendor = INFO
                    new_row = {
                        "index": current_index,
                        "all_file_name": all_file_name,
                        "CVE-ID": cve,
                        "Description": description,
                        "Affected_Product": affected_products,
                        "Affected_Version": affected_versions,
                        "Vulnerability_Type": vulnerability_category,
                        "Severity": severity,
                        "CVSS_Score": base_score,
                        "Attack_Vector": attack_vector,
                        "Impact-C": con,
                        "Impact-I": inte,
                        "Impact-A": ava,
                        "Exploitability": exploit,
                        "Published_Date": published_date,
                        "Last_Modified_Date": updated_date,
                        "Privileges_Required": pr,
                        "file_name": file_name,
                        "func_name": func_name,
                        "old": content if column_to_update == 'old' else "",
                        "new": content if column_to_update == 'new' else "",
                        "Vendor": vendor,
                        "patch":content if column_to_update == 'patch' else ""
                    }
                    df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True)

except KeyboardInterrupt:
    print("\n[INFO] 작업이 중단되었습니다. 현재까지 진행된 내용이 저장되었습니다.")
finally:
    df.to_csv(csv_file_path, index=False)
    print("[INFO] 모든 파일 처리가 완료되었습니다.")

 

 

patch 부분도 추가!