agencies

MOBSy (웹 사이트 구축) 초안(두번째) 본문

Ⅴ. 프로젝트

MOBSy (웹 사이트 구축) 초안(두번째)

agencies 2024. 11. 30. 16:27

각 로그인 사용자마다, 홈 디렉토리 적용!

 

 

sbom 생성시마다 try 폴더(숫자폴더)가 생성됨

 

 

sbom 새로 정의해주고!

%%writefile sbom.py



import sys
import os
import uuid
import zipfile
import time
import csv
import json
import subprocess
import pandas as pd
import hashlib
import shutil

# GraphState 클래스
class GraphState:
    def __init__(self):
        self.state = {
            "file_path": "",
            "output_path": "",
            "total_time": 0,
            "time_spent": 0,
            "status": "",
        }

    def update_state(self, key, value):
        if key in self.state:
            self.state[key] = value
            print(f"State updated: {key} = {value}")
        else:
            print(f"Warning: '{key}' is not a valid state attribute.")

# InputNode 클래스
class InputNode:
    def __init__(self, graph_state, zip_file_path, output_path):
        self.graph_state = graph_state
        self.zip_file_path = zip_file_path
        self.output_path = output_path

    def _clean_output_directory(self):
        base_output_path = self.output_path
        folders_to_clean = ["AnA", "SBOM", "TEMP", "extracted"]

        for folder in folders_to_clean:
            folder_path = os.path.join(base_output_path, folder)
            if os.path.exists(folder_path):
                for filename in os.listdir(folder_path):
                    file_path = os.path.join(folder_path, filename)
                    try:
                        if os.path.isfile(file_path) or os.path.islink(file_path):
                            os.unlink(file_path)
                        elif os.path.isdir(file_path):
                            shutil.rmtree(file_path)
                    except Exception as e:
                        print(f"Failed to delete {file_path}. Reason: {e}")

    def run(self):
        self._clean_output_directory()

        if not os.path.exists(self.zip_file_path):
            raise FileNotFoundError(f"The file {self.zip_file_path} does not exist.")

        print(f"Processing ZIP file: {self.zip_file_path}")
        self.graph_state.update_state("file_path", self.zip_file_path)

        extracted_path = os.path.join(self.output_path, "extracted")
        os.makedirs(extracted_path, exist_ok=True)

        with zipfile.ZipFile(self.zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(extracted_path)

        self.graph_state.update_state("output_path", self.output_path)
        self.graph_state.update_state("status", "ZIP file extracted")
        print(f"Files extracted to: {extracted_path}")

        return extracted_path

# CFileExtractNode 클래스
class CFileExtractNode:
    def __init__(self, graph_state):
        self.graph_state = graph_state

    def run(self):
        output_path = self.graph_state.state.get("output_path")
        if not output_path:
            print("Output path not found in the graph state.")
            return None

        extracted_path = os.path.join(output_path, "extracted")
        os.makedirs(extracted_path, exist_ok=True)

        output_csv = os.path.join(output_path, "c_files.csv")
        self.process_c_files(extracted_path, output_csv)
        print(f".c files CSV created at: {output_csv}")

        self.graph_state.update_state("status", "C files extracted and saved")
        return output_csv

    def process_c_files(self, project_dir, output_csv):
        c_files = []
        for root, dirs, files in os.walk(project_dir):
            for file in files:
                if file.endswith('.c'):
                    file_path = os.path.join(root, file)
                    with open(file_path, 'r', encoding='utf-8') as f:
                        code = f.read()
                        c_files.append((file, code))
                    print(f"Processed .c file: {file_path}")
        self.create_c_files_csv(c_files, output_csv)

    def create_c_files_csv(self, files, output_csv):
        if not files:
            print("No .c files found.")
            return

        with open(output_csv, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['file_name', 'original_code'])
            for file_name, code in files:
                writer.writerow([file_name, code])

# SBOMGenerateNode 클래스
class SBOMGenerateNode:
    def __init__(self, graph_state):
        self.graph_state = graph_state

    def run(self):
        output_path = self.graph_state.state.get("output_path")
        extracted_path = os.path.join(output_path, "extracted")

        if not os.path.exists(extracted_path):
            print("Extracted path not found.")
            return None

        result = self.generate_sbom_from_project_dir(extracted_path)
        return result

    def generate_sbom_from_project_dir(self, project_dir):
        output_path = self.graph_state.state.get("output_path")
        sbom_path = os.path.join(output_path, "SBOM")
        os.makedirs(sbom_path, exist_ok=True)

        sbom_output_file = os.path.join(sbom_path, "sbom.json")
        oss_csv_output_file = os.path.join(sbom_path, "oss_csv.csv")

        subprocess.run(["cdxgen", "-t", "c", "-o", sbom_output_file, project_dir], check=True)
        print(f"SBOM generated at: {sbom_output_file}")

        self.convert_sbom_to_csv(sbom_output_file, oss_csv_output_file)
        shutil.move("usages.slices.json", sbom_path)

        return sbom_output_file, oss_csv_output_file





    def convert_sbom_to_csv(self, json_file, csv_file):
        try:
            with open(json_file, encoding='utf-8') as f:
                data = json.load(f)

            components = data.get('components', [])
            rows = [
                {
                    'name': comp.get('name'),
                    'version': comp.get('version'),
                    'type': comp.get('type')
                }
                for comp in components
            ]

            df = pd.DataFrame(rows)
            df.to_csv(csv_file, index=False)
            print(f"Converted SBOM JSON to CSV: {csv_file}")
        except Exception as e:
            print(f"Error converting SBOM JSON to CSV: {e}")

# main 함수
def main(zip_file_path, output_path):
    graph_state = GraphState()

    # Step 1: Extract ZIP
    input_node = InputNode(graph_state, zip_file_path, output_path)
    extracted_path = input_node.run()

    # Step 2: Extract .c files to CSV
    c_file_node = CFileExtractNode(graph_state)
    c_file_csv = c_file_node.run()

    # Step 3: Generate SBOM
    sbom_node = SBOMGenerateNode(graph_state)
    sbom_result = sbom_node.run()

    return {"extracted_path": extracted_path, "c_file_csv": c_file_csv, "sbom_result": sbom_result}

# 프로그램 실행
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python sbom.py <zip_file_path> <output_path>")
        sys.exit(1)

    zip_file_path = sys.argv[1]
    output_path = sys.argv[2]
    main(zip_file_path, output_path)

 

 

플라스크 수정해주고

# Flask 코드 작성
from flask import Flask, render_template
from flask import request, redirect, url_for, session, jsonify, send_from_directory

from flask_ngrok import run_with_ngrok
import os
import zipfile
import subprocess


app = Flask(__name__)
run_with_ngrok(app)  # ngrok을 사용하여 외부에서 접근 가능하게 만듦


# 사용자별 홈 디렉토리
HOME_BASE_DIR = './home'
os.makedirs(HOME_BASE_DIR, exist_ok=True)  # 홈 디렉토리 생성






# 업로드된 파일 저장 경로
UPLOAD_FOLDER = './uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)  # 폴더 생성
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['OUTPUT_FOLDER_SBOM'] = 'output/SBOM'




# ZIP 파일 업로드 및 처리
@app.route('/upload', methods=['POST'])
def upload_zip():
    if 'file' not in request.files:
        return jsonify({'message': 'No file part'}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({'message': 'No selected file'}), 400

    # ZIP 파일만 허용
    if not file.filename.endswith('.zip'):
        return jsonify({'message': 'Only ZIP files are allowed'}), 400

    # 사용자 세션 확인
    if 'username' not in session:
        return jsonify({'message': '로그인이 필요합니다.'}), 401

    username = session['username']
    user_home = os.path.join('./home', username)  # 사용자 홈 디렉토리
    os.makedirs(user_home, exist_ok=True)  # 사용자 홈 디렉토리 생성

    # 사용자 시도별 폴더 생성
    try_count = len(os.listdir(user_home)) + 1
    try_dir = os.path.join(user_home, str(try_count))  # 시도별 폴더
    os.makedirs(try_dir, exist_ok=True)

    # ZIP 파일 저장 경로
    zip_path = os.path.join(try_dir, file.filename)
    file.save(zip_path)

    # Output 경로 설정 (try 디렉토리 하위에 output 디렉토리 생성)
    output_dir = os.path.join(try_dir, 'output')
    os.makedirs(output_dir, exist_ok=True)

    try:
        # sbom.py 실행
        result = subprocess.run(
            ['python', './sbom.py', zip_path, output_dir],
            capture_output=True, text=True, check=True
        )
        print("sbom.py stdout:", result.stdout)  # 표준 출력 로그
        print("sbom.py stderr:", result.stderr)  # 표준 에러 로그

        # SBOM 파일 경로 확인
        sbom_file = os.path.join(output_dir, "SBOM", "sbom.json")
        print(f"SBOM 파일 경로: {sbom_file}")
        print(f"SBOM 파일 존재 여부: {os.path.exists(sbom_file)}")

        if os.path.exists(sbom_file):
            # 파일 다운로드 URL 제공
            file_url = f'/download/{username}/{try_count}/sbom.json'
            

            return jsonify({'output': result.stdout, 'file_url': file_url}), 200
        else:
            print(f"SBOM 파일이 경로에 없습니다: {sbom_file}")
            return jsonify({'message': 'SBOM 파일 생성 실패'}), 500

    except subprocess.CalledProcessError as e:
        print("sbom.py 실행 중 에러 발생")
        print("stderr:", e.stderr)  # 에러 로그 출력
        return jsonify({'message': 'SBOM 생성 중 오류 발생', 'error': e.stderr}), 500












@app.route('/download/<username>/<try_count>/<filename>', methods=['GET'])
def download_file(username, try_count, filename):
    """동적 경로에서 생성된 파일 다운로드"""
    # SBOM 파일 디렉토리 경로 구성
    directory = os.path.join('./home', username, try_count, 'output', 'SBOM')
    file_path = os.path.join(directory, filename)

    # 디버깅: 경로 확인
    print(f"다운로드 요청된 파일 경로: {file_path}")

    if not os.path.exists(file_path):
        print(f"파일이 존재하지 않습니다: {file_path}")
        return jsonify({'message': '파일을 찾을 수 없습니다.'}), 404

    try:
        return send_from_directory(directory, filename, as_attachment=True)
    except Exception as e:
        print(f"파일 다운로드 오류: {e}")
        return jsonify({'message': '파일 다운로드 중 오류가 발생했습니다.'}), 500





@app.route('/')
def index():
    "<h1>/start로 가세요</h1>"
    return render_template('start.html')  # 책소개페이지

@app.route("/start")
def start():
    return render_template("start.html")  # templates 폴더의 index.html 파일 로드


@app.route("/dashboard")
def dashboard():
    if 'username' not in session:
        # 로그인하지 않은 경우 리디렉션
        return redirect(url_for('start'))

    username = session['username']  # 세션에서 사용자 이름 가져오기
    return render_template('dashboard.html', username=username)


app.secret_key = 'your_secret_key'

# SQLite 데이터베이스 설정
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db.init_app(app)

# 데이터베이스 생성
with app.app_context():
    db.create_all()

@app.route('/signup', methods=['POST'])
def signup():
    email = request.form.get('email')
    username = request.form.get('username')
    password = request.form.get('password')

    # 유효성 검사
    if not email or not username or not password:
        return jsonify({"message": "모든 필드를 입력하세요."}), 400

    # 중복 사용자 확인
    if User.query.filter_by(email=email).first():
        return jsonify({"message": "이미 존재하는 이메일입니다."}), 400
    if User.query.filter_by(username=username).first():
        return jsonify({"message": "이미 존재하는 사용자 이름입니다."}), 400

    # 사용자 저장
    new_user = User(email=email, username=username, password=password)
    db.session.add(new_user)
    db.session.commit()
    return jsonify({"message": "회원가입 성공!"}), 200


@app.route('/login', methods=['POST'])
def login():
    username = request.form.get('username')
    password = request.form.get('password')

    # 사용자 확인
    user = User.query.filter_by(username=username, password=password).first()
    if not user:
        return jsonify({"success": False, "message": "잘못된 사용자 이름 또는 비밀번호입니다."}), 401

    # 로그인 성공
    session['username'] = user.username  # 세션에 사용자 이름 저장
    # 사용자별 홈 디렉토리 생성
    user_home = os.path.join(HOME_BASE_DIR, username)
    os.makedirs(user_home, exist_ok=True)
    return jsonify({"success": True, "message": "로그인 성공!"}), 200



if __name__ == "__main__":
    app.run()