agencies

MOBSy (웹 사이트 구축) 초안 본문

Ⅴ. 프로젝트

MOBSy (웹 사이트 구축) 초안

agencies 2024. 11. 30. 02:19

지난 시간에 이어서 colab 환경에서 웹 사이트를 구축하고자 한다.

 

 

기본 설치 명령어

!pip install flask flask-ngrok
#mobsy mobsy!2345 (ngrok)
!wget https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!./ngrok authtoken 2pVOYm4ltw1s3uz2KSwrWdmncyw_82kt3McUqdiyV8WLRZQn5
!mkdir templates
!mkdir static
!mkdir instance
!mkdir uploads
!mkdir output


!pip install flask flask-mysql flask-login
!pip install flask-sqlalchemy
!pip install langgraph
!npm install -g @cyclonedx/cdxgen
!sudo apt update
!sudo apt install openjdk-21-jdk -y
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin

db = SQLAlchemy()

# 사용자 테이블 정의
class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)  # 고유 ID
    email = db.Column(db.String(150), unique=True, nullable=False)  # 이메일
    username = db.Column(db.String(150), unique=True, nullable=False)  # 사용자 이름
    password = db.Column(db.String(150), nullable=False)  # 비밀번호

 

 

SBOM 생성하는 노드

%%writefile sbom.py
import sys
import os
import uuid
import zipfile
import time

class GraphState:
    def __init__(self):
        self.state = {
            "file_path": "",
            "output_path": "",  # ouput 파일 경로
            "total_time": 0,
            "time_spent": 0,
            "status": "",
        }

    def add_node(self, name, func):
        self.nodes.append({"name": name, "func": func})

    def update_state(self, key, value):
        if key in self.state:
            self.state[key] = value
            print(f"State updated: {key} = {value}")
        else:
            print(f"Warning: '{key}' is not a valid state attribute.")


class InputNode:
    
    def __init__(self, graph_state):
        self.node_id = str(uuid.uuid4())
        self.graph_state = graph_state

    def get_user_input(self):
        print(zip_file_path)
        zip_file_path = zip_file_path
        #zip_file_path = input("Enter path to the zip file: ")
        if not os.path.exists(zip_file_path):
            raise FileNotFoundError(f"File not found: {zip_file_path}")
        return zip_file_path

    def extract_zip_file(self, zip_file_path, extract_to):
        os.makedirs(extract_to, exist_ok=True)
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
        return extract_to

    def run(self):
        start_time = time.time()
        zip_file_path = self.get_user_input()
        self.graph_state.update_state("file_path", zip_file_path)
        output_path = "/content/output"
        extract_to = os.path.join(output_path, "extracted")
        extracted_path = self.extract_zip_file(zip_file_path, extract_to)

        if extracted_path:
            self.graph_state.update_state("output_path", output_path)
            self.graph_state.update_state("extracted_path", extracted_path)
            self.graph_state.update_state("status", "ZIP file extracted")

        time_spent = int((time.time() - start_time) * 1000)
        self.graph_state.update_state("time_spent", time_spent)
        self.graph_state.update_state("total_time", self.graph_state.state.get("total_time", 0) + time_spent)
        return extracted_path


import os
import uuid
import zipfile
import time

class InputNode:
    def __init__(self, graph_state):
        self.graph_state = graph_state

    def _clean_output_directory(self):
        """
        Deletes files within the 'AnA', 'SBOM', 'TEMP', and 'extracted' directories
        and any files directly under '/content/output', while keeping the directory structure intact.
        """

        base_output_path = "/content/output"
        folders_to_clean = ["AnA", "SBOM", "TEMP", "extracted"]

        # Clean specified folders
        for folder in folders_to_clean:
            folder_path = os.path.join(base_output_path, folder)
            if os.path.exists(folder_path):
                for filename in os.listdir(folder_path):
                    file_path = os.path.join(folder_path, filename)
                    try:
                        if os.path.isfile(file_path) or os.path.islink(file_path):
                            os.unlink(file_path)  # Remove files or symbolic links
                        elif os.path.isdir(file_path):
                            shutil.rmtree(file_path)  # Remove directories
                    except Exception as e:
                        print(f"Failed to delete {file_path}. Reason: {e}")

        # Remove files directly under '/content/output'
        for filename in os.listdir(base_output_path):
            file_path = os.path.join(base_output_path, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)
            except Exception as e:
                print(f"Failed to delete {file_path}. Reason: {e}")

    def run(self):
        """
        Main execution of the InputNode:
        - Cleans up old files.
        - Prompts user for the ZIP file path.
        - Extracts the contents of the ZIP file.
        """

        # Step 1: Clean previous output files
        self._clean_output_directory()

        # Step 2: Prompt for ZIP file path
        
        #zip_file_path = input("Enter path to the zip file: ").strip()
        if not os.path.exists(zip_file_path):
            raise FileNotFoundError(f"The file {zip_file_path} does not exist.")

        print(zip_file_path)
        self.graph_state.update_state("file_path", zip_file_path)

        # Step 3: Create output path
        output_path = "/content/output"
        os.makedirs(output_path, exist_ok=True)
        self.graph_state.update_state("output_path", output_path)

        # Step 4: Extract ZIP file
        extracted_path = os.path.join(output_path, "extracted")
        os.makedirs(extracted_path, exist_ok=True)
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(extracted_path)

        self.graph_state.update_state("status", "ZIP file extracted")
        print(f"Files extracted to: {extracted_path}")



import uuid
import os
import csv

class CFileExtractNode:
    def __init__(self, graph_state):
        self.node_id = str(uuid.uuid4())  # Unique node ID
        self.graph_state = graph_state

    # Create a CSV file from the .c files without code normalization
    def create_c_files_csv(self, files, output_csv):
        if not files:
            print("No .c files found.")
            self.graph_state.update_state("status", "No .c files found for CSV creation.")
            return

        with open(output_csv, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['file_name', 'original_code'])  # Original code, no normalization
            for file_name, code in files:
                writer.writerow([file_name, code])
        print(f".c files CSV created: {output_csv}")
        self.graph_state.update_state("status", "C files extracted and saved")

    # Process .c files: Extract and save to CSV without normalization
    def process_c_files(self, project_dir, output_csv):
        c_files = []
        for root, dirs, files in os.walk(project_dir):
            for file in files:
                if file.endswith('.c'):
                    file_path = os.path.join(root, file)
                    with open(file_path, 'r', encoding='utf-8') as f:
                        code = f.read()
                        c_files.append((file, code))  # Append without normalization
                    print(f"Processed .c file: {file_path}")
        self.create_c_files_csv(c_files, output_csv)

    # Node execution function
    def run(self):
        # Get extracted path from the graph state
        output_path = self.graph_state.state.get("output_path")
        if not output_path:
            print("Output path not found in the graph state.")
            return None

        # Define the extracted directory and CSV output path
        extracted_path = os.path.join(output_path, "extracted")
        os.makedirs(extracted_path, exist_ok=True)  # Ensure extracted directory exists

        output_csv = os.path.join(extracted_path, "c_files.csv")

        # Process .c files in the extracted path and save to CSV
        self.process_c_files(extracted_path, output_csv)

        # Update state to reflect completion
        self.graph_state.update_state("status", "C files extracted and saved")
        return output_csv




#sbom 생성
import os
import json
import csv
import uuid
import subprocess
import pandas as pd
import hashlib
import shutil

class SBOMGenerateNode:
    def __init__(self, graph_state):
        self.node_id = str(uuid.uuid4())  # Unique node ID
        self.graph_state = graph_state

    # Generate SBOM and save to JSON file
    def generate_sbom_from_project_dir(self, project_dir):
        self.graph_state.update_state("status", "Generating SBOM from project directory")

        # Check if the directory contains any .c files
        c_file_exists = any(file.endswith('.c') for root, dirs, files in os.walk(project_dir) for file in files)
        if not c_file_exists:
            print("No .c files found in the project directory.")
            self.graph_state.update_state("status", "No .c files found for SBOM generation")
            return None

        try:
            output_path = self.graph_state.state.get("output_path")
            sbom_path = os.path.join(self.graph_state.state.get("output_path"), "SBOM")
            os.makedirs(sbom_path, exist_ok=True)


            # sbom_output_file = os.path.join(sbom_path, "sbom.json")
            # oss_csv_output_file = os.path.join(sbom_path, "oss_csv.csv")  # oss_csv_file 경로 수정
            # usages_json_file = os.path.join(sbom_path, "usages.slices.json")
            # #usages_csv_file = os.path.join(sbom_path, f"{os.path.basename(project_dir)}_usages.csv")
            # usages_csv_file = os.path.join(sbom_path, "usages.slices.csv")

            sbom_output_file = os.path.join(sbom_path, "sbom.json")
            oss_csv_output_file = os.path.join(sbom_path, "oss_csv.csv")
            usages_json_file = os.path.join(output_path, "usages.slices.json")  # 수정된 경로
            usages_csv_file = os.path.join(sbom_path, "usages.slices.csv")


            # Run cdxgen to generate SBOM JSON
            subprocess.run(["cdxgen", "-t", "c", "-o", sbom_output_file, project_dir], check=True)
            print(f"SBOM generated at {sbom_output_file}")

            # Convert the SBOM JSON to CSV for OSS information
            self.convert_sbom_to_csv(sbom_output_file, oss_csv_output_file)

            # 파일 이동
            original_usages_json_file = "/content/usages.slices.json"
            # original_usages_json_file = "/content/usages.slices.json"
            if os.path.exists(original_usages_json_file):
                shutil.move(original_usages_json_file, usages_json_file)
                print(f"Moved usages.slices.json to {usages_json_file}")

            # Convert usages JSON to CSV if usages.slices.json exists
            if os.path.exists(usages_json_file):
                self.convert_usages_json_to_csv(usages_json_file, usages_csv_file)
                print("usages.slices.json converted to CSV")
            else:
                print("usages.slices.json not found")
                self.graph_state.update_state("status", "usages.slices.json not found")
                return None

            # Update state with the SBOM and CSV file paths
            self.graph_state.update_state("status", "SBOM, OSS CSV, and Usages CSV generated successfully")

            return sbom_output_file, oss_csv_output_file, usages_csv_file  # oss_csv_file 포함하여 반환
        except Exception as e:
            print(f"Error during SBOM generation: {e}")
            self.graph_state.update_state("status", f"Failed during SBOM generation: {e}")
            return None

    # Convert SBOM JSON to CSV with OSS information
    def convert_sbom_to_csv(self, json_file, csv_file):
        all_data = []
        try:

            with open(json_file, encoding='utf-8') as f:
                data = json.load(f)
            components = data.get('components', [])

            for component in components:
                properties = {prop['name']: prop['value'] for prop in component.get('properties', [])}
                all_data.append({
                    'name': component.get('name'),
                    'Version': component.get('version'),
                    'BOM Ref': component.get('bom-ref'),
                    'Type': component.get('type'),
                    'PURL': component.get('purl'),

                })

            df = pd.DataFrame(all_data)
            df.to_csv(csv_file, index=False)
            print(f"OSS information CSV file created: {csv_file}")
        except Exception as e:
            print(f"Error during CSV conversion: {e}")
            self.graph_state.update_state("status", f"Error during CSV conversion: {e}")

    # Convert usages.slices.json to CSV with additional information
    def convert_usages_json_to_csv(self, json_file, csv_file):
        rows = []
        try:
            if not os.path.exists(json_file):
                print(f"JSON file not found: {json_file}")
                self.graph_state.update_state("status", f"JSON file not found: {json_file}")
                return

            with open(json_file, 'r', encoding='utf-8') as f:
                data = json.load(f)

            for obj in data['objectSlices']:
                full_name = obj.get('fullName', '')
                signature = obj.get('signature', '')
                file_name = obj.get('fileName', '')
                line_number = obj.get('lineNumber', '')
                column_number = obj.get('columnNumber', '')

                for usage in obj.get('usages', []):
                    target_obj = usage.get('targetObj', {})
                    usage_name = target_obj.get('name', '')
                    usage_type_fullname = target_obj.get('typeFullName', '')
                    usage_line_number = target_obj.get('lineNumber', '')
                    usage_column_number = target_obj.get('columnNumber', '')

                    invoked_calls = usage.get('invokedCalls', [])
                    invoked_calls_info = ', '.join([call.get('name', '') for call in invoked_calls])

                    arg_to_calls = usage.get('argToCalls', [])
                    arg_to_calls_info = ', '.join([arg.get('name', '') for arg in arg_to_calls])

                    # Generate MD5 hash for usage name
                    usage_name_hash = hashlib.md5(usage_name.encode()).hexdigest() if usage_name else ''

                    rows.append({
                        "Function Full Name": full_name,
                        "Signature": signature,
                        "Defined File Name": file_name,
                        "Defined Line Number": line_number,
                        "Defined Column Number": column_number,
                        "Usage Name": usage_name,
                        "Hash": usage_name_hash,
                        "Usage Type Full Name": usage_type_fullname,
                        "Usage Line Number": usage_line_number,
                        "Usage Column Number": usage_column_number,
                        "Invoked Calls": invoked_calls_info,
                        "Argument to Calls": arg_to_calls_info
                    })

            df = pd.DataFrame(rows)
            df.to_csv(csv_file, index=False)
            print(f"Usages information CSV file created: {csv_file}")
        except Exception as e:
            print(f"Error during Usages JSON to CSV conversion: {e}")
            self.graph_state.update_state("status", f"Error during Usages CSV conversion: {e}")

    # Node execution function
    def run(self):
        start_time = time.time()

        # Retrieve extracted path and project name
        output_path = self.graph_state.state.get("output_path")
        extracted_path = os.path.join(output_path, "extracted")
        project_dir = extracted_path

        print(f"Output path: {output_path}")
        print(f"Extracted path: {extracted_path}")
        # print(f"Project name from ZIP: {project_name}")
        print(f"Constructed project directory path: {project_dir}")

        if not os.path.exists(project_dir):
            print("Project directory not found.")
            return None

        result = self.generate_sbom_from_project_dir(project_dir)

        time_spent = int((time.time() - start_time) * 1000)  # Convert to milliseconds
        self.graph_state.update_state("time_spent", time_spent)
        self.graph_state.update_state("total_time", self.graph_state.state.get("total_time", 0) + time_spent)

        return result



def main(zip_file_path):
    graph_state = GraphState()
    input_node = InputNode(graph_state)
    extracted_path = input_node.run()  # ZIP 파일 처리

    c_file_node = CFileExtractNode(graph_state)
    c_file_node.run()  # .c 파일 처리 및 CSV 생성

    sbom_node = SBOMGenerateNode(graph_state)
    result = sbom_node.run()  # SBOM 생성

    return result

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: python sbom.py <zip_file_path>")
        sys.exit(1)

    zip_file_path = sys.argv[1]
    main(zip_file_path)

 

 

플라스크

# Flask 코드 작성
from flask import Flask, render_template
from flask import request, redirect, url_for, session, jsonify, send_from_directory

from flask_ngrok import run_with_ngrok
import os
import zipfile
import subprocess


app = Flask(__name__)
run_with_ngrok(app)  # ngrok을 사용하여 외부에서 접근 가능하게 만듦



# 업로드된 파일 저장 경로
UPLOAD_FOLDER = './uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)  # 폴더 생성
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['OUTPUT_FOLDER_SBOM'] = 'output/SBOM'


# ZIP 파일 업로드 및 처리
@app.route('/upload', methods=['POST'])
def upload_zip():
    if 'file' not in request.files:
        return jsonify({'message': 'No file part'}), 400

    file = request.files['file']
    if file.filename == '':
        return jsonify({'message': 'No selected file'}), 400

    # ZIP 파일만 허용
    if not file.filename.endswith('.zip'):
        return jsonify({'message': 'Only ZIP files are allowed'}), 400

    # 파일 저장
    zip_path = os.path.join(app.config['UPLOAD_FOLDER'], file.filename)
    file.save(zip_path)

    try:
        # sbom.py 스크립트 실행
        result = subprocess.run(['python', 'sbom.py', zip_path], capture_output=True, text=True, check=True)
        # 파일 생성 경로 가정
        generated_file = os.path.join(app.config['OUTPUT_FOLDER_SBOM'], 'sbom.json')

        if os.path.exists(generated_file):
            file_url = f'/output/sbom/sbom.json'  # Flask에서 다운로드 URL
            return jsonify({'output': result.stdout, 'file_url': file_url}), 200
        else:
            return jsonify({'message': 'SBOM 파일 생성 실패'}), 500


    except subprocess.CalledProcessError as e:
        return jsonify({'message': 'SBOM 생성 중 오류 발생', 'error': e.stderr}), 500


@app.route('/output/sbom/<filename>', methods=['GET'])
def download_file(filename):
    """생성된 파일 다운로드"""
    directory = app.config['OUTPUT_FOLDER_SBOM']
    try:
        return send_from_directory(directory, filename, as_attachment=True)
    except FileNotFoundError:
        return jsonify({'message': '파일을 찾을 수 없습니다.'}), 404





@app.route('/')
def index():
    "<h1>/start로 가세요</h1>"
    return render_template('start.html')  # 책소개페이지

@app.route("/start")
def start():
    return render_template("start.html")  # templates 폴더의 index.html 파일 로드


@app.route("/dashboard")
def dashboard():
    if 'username' not in session:
        # 로그인하지 않은 경우 리디렉션
        return redirect(url_for('start'))

    username = session['username']  # 세션에서 사용자 이름 가져오기
    return render_template('dashboard.html', username=username)


app.secret_key = 'your_secret_key'

# SQLite 데이터베이스 설정
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

db.init_app(app)

# 데이터베이스 생성
with app.app_context():
    db.create_all()

@app.route('/signup', methods=['POST'])
def signup():
    email = request.form.get('email')
    username = request.form.get('username')
    password = request.form.get('password')

    # 유효성 검사
    if not email or not username or not password:
        return jsonify({"message": "모든 필드를 입력하세요."}), 400

    # 중복 사용자 확인
    if User.query.filter_by(email=email).first():
        return jsonify({"message": "이미 존재하는 이메일입니다."}), 400
    if User.query.filter_by(username=username).first():
        return jsonify({"message": "이미 존재하는 사용자 이름입니다."}), 400

    # 사용자 저장
    new_user = User(email=email, username=username, password=password)
    db.session.add(new_user)
    db.session.commit()
    return jsonify({"message": "회원가입 성공!"}), 200


@app.route('/login', methods=['POST'])
def login():
    username = request.form.get('username')
    password = request.form.get('password')

    # 사용자 확인
    user = User.query.filter_by(username=username, password=password).first()
    if not user:
        return jsonify({"success": False, "message": "잘못된 사용자 이름 또는 비밀번호입니다."}), 401

    # 로그인 성공
    session['username'] = user.username  # 세션에 사용자 이름 저장
    return jsonify({"success": True, "message": "로그인 성공!"}), 200


if __name__ == "__main__":
    app.run()

 


 

 

로그인을 하고 난 뒤 sbom을 생성해보자!

 

이렇게 파일 다운로드 버튼이 생긴다!

 

누르면!

 

 


dashboard.html
0.01MB
start.html
0.01MB
index.html
0.00MB