agencies

joern 을 통한 취약한 함수에 들어가는 파라미터 backward slicing 본문

Ⅳ. 기타

joern 을 통한 취약한 함수에 들어가는 파라미터 backward slicing

agencies 2024. 11. 27. 17:42

colab joern 설치 과정

!sudo apt install openjdk-21-jdk -y
!chmod +x joern-install.sh
!./joern-install.sh
!unzip joern-cli.zip

 

 

backward slicing 
여기서는 대상 파일이 test.c

취약한 함수가 func_3 으로 가정 

import subprocess

# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "func_3"

# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")

// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()

// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
  println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")

  // 변수 사용 위치 추적 (methodName 내부로 제한)
  val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
  variableUses.foreach {{ useNode =>
    // Backward slicing: 데이터 의존성 추적
    val relatedNodes = useNode.ddgIn
    relatedNodes.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      if (!visitedLines.contains(lineNumber) && lineNumber != -1) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"사용된 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 추가 AST 기반 탐색: 파라미터가 다른 변수로 전달된 경우 탐지
    cpg.assignment
      .where(_.method.nameExact(methodName)) // methodName 내부로 제한
      .filter(_.argument.code.contains(variableName))
      .foreach {{ assignNode =>
        val lineNumber = assignNode.lineNumber.getOrElse(-1)
        if (!visitedLines.contains(lineNumber) && lineNumber != -1) {{
          visitedLines.add(lineNumber)
          result += lineNumber
          println(s"사용된 코드: ${{assignNode.code}} (줄 번호: $lineNumber)")

          // 새로 정의된 변수 추적
          val definedVariable = assignNode.code.split("=").head.trim // 왼쪽 피연산자
          println("변수 '" + variableName + "'가 영향을 준 변수: " + definedVariable)
          traceBackwardSlicing(definedVariable, methodName)
        }}
      }}
  }}
}}

// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val func3Calls = callsInMain.name("{func_name}")

if (func3Calls.isEmpty) {{
  println("Main 함수에서 취약한 함수를 발견하지 못했습니다.")
}} else {{
  println("Main 함수에서 취약한 함수 발견!")
  func3Calls.argument.foreach {{ arg =>
    val variableName = arg.code
    println(s"파라미터: $variableName")
    traceBackwardSlicing(variableName, "main")
  }}
}}

// 지역 함수 작업
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
  methods.foreach {{ method =>
    println("=== 함수 '" + method.name + "'의 파라미터 추출 ===")
    method.parameter.foreach {{ param =>
      val paramName = param.name
      println("파라미터 이름: " + paramName)
      traceBackwardSlicing(paramName, method.name)
    }}
  }}
}}

// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""

# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
    script_file.write(joern_script_content)

# Joern 실행
result = subprocess.run(
    ["/content/joern-cli/joern", "--script", script_path],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

# Joern 실행 결과 출력
if result.returncode == 0:
    print("Joern 실행 성공!")
    print("=== 분석 결과 ===")
    print(result.stdout.strip())
else:
    print("Joern 실행 실패!")
    print("=== 오류 메시지 ===")
    print(result.stderr.strip())

 

 

 

main에서 찾기

1.sc
0.00MB

 

 

지역함수에서 찾기

2.sc
0.00MB

 

 

 

test.c 파일

test.c
0.00MB

 

 


고도화 1차 (SSL 프로젝트 대상)

import subprocess
import json

# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test2.c"
func_name = "ssl3_do_change_cipher_spec"

# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")

// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()

// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
  println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")

  // 변수 사용 위치 추적 (methodName 내부로 제한)
  val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
  variableUses.foreach {{ useNode =>
    // 데이터 의존성 추적 (ddgIn)
    val dataDependencies = useNode.ddgIn
    dataDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 변수 사용이 포함된 모든 호출을 추적
    val callsUsingVariable = useNode.inCall
    callsUsingVariable.foreach {{ callNode =>
      val lineNumber = callNode.lineNumber.getOrElse(-1)
      val nodeCode = callNode.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 제어 흐름 상의 관련성 추적 (cfgIn)
    val controlDependencies = useNode.cfgIn
    controlDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}
  }}
}}

// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")

if (!funcCalls.isEmpty) {{
  println("Main 함수에서 취약한 함수 발견!")
  funcCalls.argument.foreach {{ arg =>
    val variableName = arg.code
    println(s"파라미터: ${{variableName}}")
    traceBackwardSlicing(variableName, "main")
  }}

  // 함수 호출 추적
  funcCalls.foreach {{ callNode =>
    callNode.inAssignment.foreach {{ assignNode =>
      val assignedVariable = assignNode.target.code
      println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
      traceBackwardSlicing(assignedVariable, "main")
    }}
  }}
}}

// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
  methods.foreach {{ method =>
    println(s"=== 함수 '${{method.name}}' 작업 ===")
    method.parameter.foreach {{ param =>
      val paramName = param.name
      println(s"파라미터: ${{paramName}}")
      traceBackwardSlicing(paramName, method.name)
    }}

    // 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
    method.call.name("strcpy").argument.foreach {{ arg =>
      val variableName = arg.code
      println(s"관련 변수: ${{variableName}}")
      traceBackwardSlicing(variableName, method.name)
    }}
  }}
}}

// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""









# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
    script_file.write(joern_script_content)

# Joern 실행
result = subprocess.run(
    ["/content/joern-cli/joern", "--script", script_path],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

# Joern 실행 결과 출력
if result.returncode == 0:
    print("Joern 실행 성공!")
    output = result.stdout.strip()
    print("=== 분석 결과 ===")
    print(output)
    
    # JSON 결과 추출 및 Python 변수로 저장
    try:
        lines = json.loads(output.splitlines()[-1])  # 마지막 JSON 출력 파싱
        print(f"Python 변수로 저장된 라인 번호: {lines}")
    except json.JSONDecodeError as e:
        print("JSON 디코딩 오류:", e)
else:
    print("Joern 실행 실패!")
    print("=== 오류 메시지 ===")
    print(result.stderr.strip())

 

이곳에서 문제가 발생되는 코드 (full file) : ssls3_pkt.c.txt

ssls3_pkt.c.txt
0.06MB

 

old new patch

CVE-2014-3470_4.3_CWE-476_4a448cff073ebc892ac0520a5e767a112ceb64d8_s3_pkt.c@@ssl3_do_change_cipher_spec_NEW.vul
0.00MB
CVE-2014-3470_4.3_CWE-476_4a448cff073ebc892ac0520a5e767a112ceb64d8_s3_pkt.c@@ssl3_do_change_cipher_spec_OLD.vul
0.00MB
CVE-2014-3470_4.3_CWE-476_4a448cff073ebc892ac0520a5e767a112ceb64d8_s3_pkt.c@@ssl3_do_change_cipher_spec.patch
0.00MB

 

 

결과


해당하는 줄의 소스코드 출력하기

(고도화 버전)

 

 

colab 코드

import subprocess
import json

# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "vulnerable_printf"

# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")

// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()

// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
  println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")

  // 변수 사용 위치 추적 (methodName 내부로 제한)
  val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
  variableUses.foreach {{ useNode =>
    // 데이터 의존성 추적 (ddgIn)
    val dataDependencies = useNode.ddgIn
    dataDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 변수 사용이 포함된 모든 호출을 추적
    val callsUsingVariable = useNode.inCall
    callsUsingVariable.foreach {{ callNode =>
      val lineNumber = callNode.lineNumber.getOrElse(-1)
      val nodeCode = callNode.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 제어 흐름 상의 관련성 추적 (cfgIn)
    val controlDependencies = useNode.cfgIn
    controlDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}
  }}
}}

// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")

if (!funcCalls.isEmpty) {{
  println("Main 함수에서 취약한 함수 발견!")
  funcCalls.argument.foreach {{ arg =>
    val variableName = arg.code
    println(s"파라미터: ${{variableName}}")
    traceBackwardSlicing(variableName, "main")
  }}

  // 함수 호출 추적
  funcCalls.foreach {{ callNode =>
    callNode.inAssignment.foreach {{ assignNode =>
      val assignedVariable = assignNode.target.code
      println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
      traceBackwardSlicing(assignedVariable, "main")
    }}
  }}
}}

// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
  methods.foreach {{ method =>
    println(s"=== 함수 '${{method.name}}' 작업 ===")
    method.parameter.foreach {{ param =>
      val paramName = param.name
      println(s"파라미터: ${{paramName}}")
      traceBackwardSlicing(paramName, method.name)
    }}

    // 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
    method.call.name("strcpy").argument.foreach {{ arg =>
      val variableName = arg.code
      println(s"관련 변수: ${{variableName}}")
      traceBackwardSlicing(variableName, method.name)
    }}
  }}
}}

// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""









# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
    script_file.write(joern_script_content)

# Joern 실행
result = subprocess.run(
    ["/content/joern-cli/joern", "--script", script_path],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

# Joern 실행 결과 출력
if result.returncode == 0:
    print("Joern 실행 성공!")
    output = result.stdout.strip()
    print("=== 분석 결과 ===")
    print(output)
    
    # JSON 결과 추출 및 Python 변수로 저장
    try:
        lines = json.loads(output.splitlines()[-1])  # 마지막 JSON 출력 파싱
        lines.sort()  # 리스트를 오름차순으로 정렬
        print(f"Python 변수로 저장된 라인 번호 (오름차순): {lines}")
        # 파일 읽어서 특정 라인 번호의 내용을 출력
        with open(file_name, "r") as file:
            file_content = file.readlines()  # 모든 라인을 리스트로
        print("=== 특정 라인의 소스 코드 ===")
        for line_number in lines:
            if 0 < line_number <= len(file_content):  # 유효한 라인 번호인지 확인
                print(f"{line_number}: {file_content[line_number - 1].strip()}")
    except json.JSONDecodeError as e:
        print("JSON 디코딩 오류:", e)
else:
    print("Joern 실행 실패!")
    print("=== 오류 메시지 ===")
    print(result.stderr.strip())

더 더 ! 고도화 (db 생성 및 해시)

 

import subprocess
import json
import csv
import hashlib
import os

# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "dir_current"
csv_file_name = "/content/backward-db.csv"  # CSV 파일명

# Joern 스크립트 생성
# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")

// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()

// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
  println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")

  // 변수 사용 위치 추적 (methodName 내부로 제한)
  val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
  variableUses.foreach {{ useNode =>
    // 데이터 의존성 추적 (ddgIn)
    val dataDependencies = useNode.ddgIn
    dataDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 변수 사용이 포함된 모든 호출을 추적
    val callsUsingVariable = useNode.inCall
    callsUsingVariable.foreach {{ callNode =>
      val lineNumber = callNode.lineNumber.getOrElse(-1)
      val nodeCode = callNode.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
      }}
    }}

    // 제어 흐름 상의 관련성 추적 (cfgIn)
    val controlDependencies = useNode.cfgIn
    controlDependencies.foreach {{ node =>
      val lineNumber = node.lineNumber.getOrElse(-1)
      val nodeCode = node.code
      if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
          nodeCode != null && nodeCode.contains(variableName)) {{
        visitedLines.add(lineNumber)
        result += lineNumber
        println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
      }}
    }}
  }}
}}

// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")

if (!funcCalls.isEmpty) {{
  println("Main 함수에서 취약한 함수 발견!")
  funcCalls.argument.foreach {{ arg =>
    val variableName = arg.code
    println(s"파라미터: ${{variableName}}")
    traceBackwardSlicing(variableName, "main")
  }}

  // 함수 호출 추적
  funcCalls.foreach {{ callNode =>
    callNode.inAssignment.foreach {{ assignNode =>
      val assignedVariable = assignNode.target.code
      println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
      traceBackwardSlicing(assignedVariable, "main")
    }}
  }}
}}

// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
  methods.foreach {{ method =>
    println(s"=== 함수 '${{method.name}}' 작업 ===")
    method.parameter.foreach {{ param =>
      val paramName = param.name
      println(s"파라미터: ${{paramName}}")
      traceBackwardSlicing(paramName, method.name)
    }}

    // 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
    method.call.name("strcpy").argument.foreach {{ arg =>
      val variableName = arg.code
      println(s"관련 변수: ${{variableName}}")
      traceBackwardSlicing(variableName, method.name)
    }}
  }}
}}

// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""

# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
    script_file.write(joern_script_content)

# Joern 실행
result = subprocess.run(
    ["/content/joern-cli/joern", "--script", script_path],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    text=True
)

# Joern 실행 결과 출력 및 CSV 생성
if result.returncode == 0:
    print("Joern 실행 성공!")
    output = result.stdout.strip()
    print("=== 분석 결과 ===")
    print(output)
    
    try:
        # JSON 결과 추출 및 Python 변수로 저장
        lines = json.loads(output.splitlines()[-1])  # 마지막 JSON 출력 파싱
        lines.sort()  # 리스트를 오름차순으로 정렬
        print(f"Python 변수로 저장된 라인 번호 (오름차순): {lines}")
        
        # 파일 읽어서 특정 라인 번호의 소스 코드 추출
        with open(file_name, "r") as file:
            file_content = file.readlines()  # 모든 라인을 리스트로
            


        print("=== 특정 라인의 소스 코드 ===")
        params = []  # 추출한 라인 소스코드를 저장
        for line_number in lines:
            if 0 < line_number <= len(file_content):  # 유효한 라인 번호인지 확인
                print(f"{line_number}: {file_content[line_number - 1].strip()}")
                line_code = file_content[line_number - 1].strip()
                params.append(line_code)
        
        # 기존 CSV 읽어서 해시 값 확인
        existing_hashes = set()
        if os.path.exists(csv_file_name):
            with open(csv_file_name, "r") as csv_file:
                csv_reader = csv.reader(csv_file)
                next(csv_reader, None)  # 헤더 건너뛰기
                for row in csv_reader:
                    if len(row) >= 4:  # 해시 값이 있는지 확인
                        existing_hashes.add(row[3])
        
        # CSV 파일에 추가
        with open(csv_file_name, "a", newline="") as csv_file:
            csv_writer = csv.writer(csv_file)
            
            # 헤더가 없는 경우 추가
            if not os.path.exists(csv_file_name) or os.stat(csv_file_name).st_size == 0:
                csv_writer.writerow(["file_name", "func_name", "params", "hash"])
            
            for param in params:
                hash_value = hashlib.md5(param.encode()).hexdigest()  # MD5 해시 생성
                if hash_value not in existing_hashes:  # 중복 확인
                    csv_writer.writerow([file_name, func_name, param, hash_value])
                    existing_hashes.add(hash_value)  # 새로운 해시 추가
        
        print(f"CSV 파일 업데이트 완료: {csv_file_name}")
    
    except json.JSONDecodeError as e:
        print("JSON 디코딩 오류:", e)
else:
    print("Joern 실행 실패!")
    print("=== 오류 메시지 ===")
    print(result.stderr.strip())

 

내일은 코드 추상화 진행 예정!