Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- 화학물질안전원
- HTML
- 파이썬
- PHP
- 화학물질불법유통온라인감시단
- 화학물질
- 도구모음
- MITRE ATT&CK
- 연구모임
- codeup
- 불법유통
- UKPT level
- 웹 해킹 입문
- suninatas
- Los
- 국가기록원
- UKPT
- 프로젝트
- 대외활동
- 정보보안
- Service
- 기타정보
- 국가정보원
- 국정원
- 경기팀
- 12기
- nurisec
- 여행
- 불법유통근절
- webhacking
Archives
- Today
- Total
agencies
joern 을 통한 취약한 함수에 들어가는 파라미터 backward slicing 본문
colab joern 설치 과정
!sudo apt install openjdk-21-jdk -y
!chmod +x joern-install.sh
!./joern-install.sh
!unzip joern-cli.zip
backward slicing
여기서는 대상 파일이 test.c
취약한 함수가 func_3 으로 가정
import subprocess
# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "func_3"
# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")
// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()
// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")
// 변수 사용 위치 추적 (methodName 내부로 제한)
val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
variableUses.foreach {{ useNode =>
// Backward slicing: 데이터 의존성 추적
val relatedNodes = useNode.ddgIn
relatedNodes.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
if (!visitedLines.contains(lineNumber) && lineNumber != -1) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"사용된 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
// 추가 AST 기반 탐색: 파라미터가 다른 변수로 전달된 경우 탐지
cpg.assignment
.where(_.method.nameExact(methodName)) // methodName 내부로 제한
.filter(_.argument.code.contains(variableName))
.foreach {{ assignNode =>
val lineNumber = assignNode.lineNumber.getOrElse(-1)
if (!visitedLines.contains(lineNumber) && lineNumber != -1) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"사용된 코드: ${{assignNode.code}} (줄 번호: $lineNumber)")
// 새로 정의된 변수 추적
val definedVariable = assignNode.code.split("=").head.trim // 왼쪽 피연산자
println("변수 '" + variableName + "'가 영향을 준 변수: " + definedVariable)
traceBackwardSlicing(definedVariable, methodName)
}}
}}
}}
}}
// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val func3Calls = callsInMain.name("{func_name}")
if (func3Calls.isEmpty) {{
println("Main 함수에서 취약한 함수를 발견하지 못했습니다.")
}} else {{
println("Main 함수에서 취약한 함수 발견!")
func3Calls.argument.foreach {{ arg =>
val variableName = arg.code
println(s"파라미터: $variableName")
traceBackwardSlicing(variableName, "main")
}}
}}
// 지역 함수 작업
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
methods.foreach {{ method =>
println("=== 함수 '" + method.name + "'의 파라미터 추출 ===")
method.parameter.foreach {{ param =>
val paramName = param.name
println("파라미터 이름: " + paramName)
traceBackwardSlicing(paramName, method.name)
}}
}}
}}
// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""
# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
script_file.write(joern_script_content)
# Joern 실행
result = subprocess.run(
["/content/joern-cli/joern", "--script", script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Joern 실행 결과 출력
if result.returncode == 0:
print("Joern 실행 성공!")
print("=== 분석 결과 ===")
print(result.stdout.strip())
else:
print("Joern 실행 실패!")
print("=== 오류 메시지 ===")
print(result.stderr.strip())
main에서 찾기
지역함수에서 찾기
test.c 파일
고도화 1차 (SSL 프로젝트 대상)
import subprocess
import json
# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test2.c"
func_name = "ssl3_do_change_cipher_spec"
# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")
// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()
// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")
// 변수 사용 위치 추적 (methodName 내부로 제한)
val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
variableUses.foreach {{ useNode =>
// 데이터 의존성 추적 (ddgIn)
val dataDependencies = useNode.ddgIn
dataDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
// 변수 사용이 포함된 모든 호출을 추적
val callsUsingVariable = useNode.inCall
callsUsingVariable.foreach {{ callNode =>
val lineNumber = callNode.lineNumber.getOrElse(-1)
val nodeCode = callNode.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
}}
}}
// 제어 흐름 상의 관련성 추적 (cfgIn)
val controlDependencies = useNode.cfgIn
controlDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
}}
}}
// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")
if (!funcCalls.isEmpty) {{
println("Main 함수에서 취약한 함수 발견!")
funcCalls.argument.foreach {{ arg =>
val variableName = arg.code
println(s"파라미터: ${{variableName}}")
traceBackwardSlicing(variableName, "main")
}}
// 함수 호출 추적
funcCalls.foreach {{ callNode =>
callNode.inAssignment.foreach {{ assignNode =>
val assignedVariable = assignNode.target.code
println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
traceBackwardSlicing(assignedVariable, "main")
}}
}}
}}
// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
methods.foreach {{ method =>
println(s"=== 함수 '${{method.name}}' 작업 ===")
method.parameter.foreach {{ param =>
val paramName = param.name
println(s"파라미터: ${{paramName}}")
traceBackwardSlicing(paramName, method.name)
}}
// 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
method.call.name("strcpy").argument.foreach {{ arg =>
val variableName = arg.code
println(s"관련 변수: ${{variableName}}")
traceBackwardSlicing(variableName, method.name)
}}
}}
}}
// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""
# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
script_file.write(joern_script_content)
# Joern 실행
result = subprocess.run(
["/content/joern-cli/joern", "--script", script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Joern 실행 결과 출력
if result.returncode == 0:
print("Joern 실행 성공!")
output = result.stdout.strip()
print("=== 분석 결과 ===")
print(output)
# JSON 결과 추출 및 Python 변수로 저장
try:
lines = json.loads(output.splitlines()[-1]) # 마지막 JSON 출력 파싱
print(f"Python 변수로 저장된 라인 번호: {lines}")
except json.JSONDecodeError as e:
print("JSON 디코딩 오류:", e)
else:
print("Joern 실행 실패!")
print("=== 오류 메시지 ===")
print(result.stderr.strip())
이곳에서 문제가 발생되는 코드 (full file) : ssls3_pkt.c.txt
old new patch
결과
해당하는 줄의 소스코드 출력하기
(고도화 버전)
colab 코드
import subprocess
import json
# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "vulnerable_printf"
# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")
// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()
// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")
// 변수 사용 위치 추적 (methodName 내부로 제한)
val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
variableUses.foreach {{ useNode =>
// 데이터 의존성 추적 (ddgIn)
val dataDependencies = useNode.ddgIn
dataDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
// 변수 사용이 포함된 모든 호출을 추적
val callsUsingVariable = useNode.inCall
callsUsingVariable.foreach {{ callNode =>
val lineNumber = callNode.lineNumber.getOrElse(-1)
val nodeCode = callNode.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
}}
}}
// 제어 흐름 상의 관련성 추적 (cfgIn)
val controlDependencies = useNode.cfgIn
controlDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
}}
}}
// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")
if (!funcCalls.isEmpty) {{
println("Main 함수에서 취약한 함수 발견!")
funcCalls.argument.foreach {{ arg =>
val variableName = arg.code
println(s"파라미터: ${{variableName}}")
traceBackwardSlicing(variableName, "main")
}}
// 함수 호출 추적
funcCalls.foreach {{ callNode =>
callNode.inAssignment.foreach {{ assignNode =>
val assignedVariable = assignNode.target.code
println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
traceBackwardSlicing(assignedVariable, "main")
}}
}}
}}
// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
methods.foreach {{ method =>
println(s"=== 함수 '${{method.name}}' 작업 ===")
method.parameter.foreach {{ param =>
val paramName = param.name
println(s"파라미터: ${{paramName}}")
traceBackwardSlicing(paramName, method.name)
}}
// 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
method.call.name("strcpy").argument.foreach {{ arg =>
val variableName = arg.code
println(s"관련 변수: ${{variableName}}")
traceBackwardSlicing(variableName, method.name)
}}
}}
}}
// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""
# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
script_file.write(joern_script_content)
# Joern 실행
result = subprocess.run(
["/content/joern-cli/joern", "--script", script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Joern 실행 결과 출력
if result.returncode == 0:
print("Joern 실행 성공!")
output = result.stdout.strip()
print("=== 분석 결과 ===")
print(output)
# JSON 결과 추출 및 Python 변수로 저장
try:
lines = json.loads(output.splitlines()[-1]) # 마지막 JSON 출력 파싱
lines.sort() # 리스트를 오름차순으로 정렬
print(f"Python 변수로 저장된 라인 번호 (오름차순): {lines}")
# 파일 읽어서 특정 라인 번호의 내용을 출력
with open(file_name, "r") as file:
file_content = file.readlines() # 모든 라인을 리스트로
print("=== 특정 라인의 소스 코드 ===")
for line_number in lines:
if 0 < line_number <= len(file_content): # 유효한 라인 번호인지 확인
print(f"{line_number}: {file_content[line_number - 1].strip()}")
except json.JSONDecodeError as e:
print("JSON 디코딩 오류:", e)
else:
print("Joern 실행 실패!")
print("=== 오류 메시지 ===")
print(result.stderr.strip())
더 더 ! 고도화 (db 생성 및 해시)
import subprocess
import json
import csv
import hashlib
import os
# 파일명과 취약한 함수명을 변수로 정의
file_name = "/content/test.c"
func_name = "dir_current"
csv_file_name = "/content/backward-db.csv" # CSV 파일명
# Joern 스크립트 생성
# Joern 스크립트 생성
joern_script_content = f"""
importCode("{file_name}")
// 이미 추적한 줄 번호를 저장하여 중복 방지
val visitedLines = scala.collection.mutable.Set[Int]()
val result = scala.collection.mutable.ListBuffer[Int]()
// 특정 변수의 backward slicing 수행
def traceBackwardSlicing(variableName: String, methodName: String): Unit = {{
println("=== 함수 '" + methodName + "'의 변수 '" + variableName + "' backward slicing ===")
// 변수 사용 위치 추적 (methodName 내부로 제한)
val variableUses = cpg.identifier.nameExact(variableName).where(_.method.nameExact(methodName))
variableUses.foreach {{ useNode =>
// 데이터 의존성 추적 (ddgIn)
val dataDependencies = useNode.ddgIn
dataDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"데이터 의존성 있는 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
// 변수 사용이 포함된 모든 호출을 추적
val callsUsingVariable = useNode.inCall
callsUsingVariable.foreach {{ callNode =>
val lineNumber = callNode.lineNumber.getOrElse(-1)
val nodeCode = callNode.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"변수를 사용하는 호출: ${{callNode.code}} (줄 번호: $lineNumber)")
}}
}}
// 제어 흐름 상의 관련성 추적 (cfgIn)
val controlDependencies = useNode.cfgIn
controlDependencies.foreach {{ node =>
val lineNumber = node.lineNumber.getOrElse(-1)
val nodeCode = node.code
if (!visitedLines.contains(lineNumber) && lineNumber != -1 &&
nodeCode != null && nodeCode.contains(variableName)) {{
visitedLines.add(lineNumber)
result += lineNumber
println(s"제어 흐름 관련 코드: ${{node.code}} (줄 번호: $lineNumber)")
}}
}}
}}
}}
// Main 함수 작업
val callsInMain = cpg.method.name("main").call
val funcCalls = callsInMain.name("{func_name}")
if (!funcCalls.isEmpty) {{
println("Main 함수에서 취약한 함수 발견!")
funcCalls.argument.foreach {{ arg =>
val variableName = arg.code
println(s"파라미터: ${{variableName}}")
traceBackwardSlicing(variableName, "main")
}}
// 함수 호출 추적
funcCalls.foreach {{ callNode =>
callNode.inAssignment.foreach {{ assignNode =>
val assignedVariable = assignNode.target.code
println(s"함수 결과가 변수 '${{assignedVariable}}'에 할당됨")
traceBackwardSlicing(assignedVariable, "main")
}}
}}
}}
// 지역 함수 작업 (vulnerable_function)
val methods = cpg.method.name("{func_name}")
if (methods.size > 0) {{
methods.foreach {{ method =>
println(s"=== 함수 '${{method.name}}' 작업 ===")
method.parameter.foreach {{ param =>
val paramName = param.name
println(s"파라미터: ${{paramName}}")
traceBackwardSlicing(paramName, method.name)
}}
// 함수 내 데이터 의존성 추적 (strcpy와 관련된 변수만 포함)
method.call.name("strcpy").argument.foreach {{ arg =>
val variableName = arg.code
println(s"관련 변수: ${{variableName}}")
traceBackwardSlicing(variableName, method.name)
}}
}}
}}
// JSON 형식으로 결과 출력
println(ujson.write(result.toList))
"""
# Joern 스크립트 저장
script_path = "/content/trace.sc"
with open(script_path, "w") as script_file:
script_file.write(joern_script_content)
# Joern 실행
result = subprocess.run(
["/content/joern-cli/joern", "--script", script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Joern 실행 결과 출력 및 CSV 생성
if result.returncode == 0:
print("Joern 실행 성공!")
output = result.stdout.strip()
print("=== 분석 결과 ===")
print(output)
try:
# JSON 결과 추출 및 Python 변수로 저장
lines = json.loads(output.splitlines()[-1]) # 마지막 JSON 출력 파싱
lines.sort() # 리스트를 오름차순으로 정렬
print(f"Python 변수로 저장된 라인 번호 (오름차순): {lines}")
# 파일 읽어서 특정 라인 번호의 소스 코드 추출
with open(file_name, "r") as file:
file_content = file.readlines() # 모든 라인을 리스트로
print("=== 특정 라인의 소스 코드 ===")
params = [] # 추출한 라인 소스코드를 저장
for line_number in lines:
if 0 < line_number <= len(file_content): # 유효한 라인 번호인지 확인
print(f"{line_number}: {file_content[line_number - 1].strip()}")
line_code = file_content[line_number - 1].strip()
params.append(line_code)
# 기존 CSV 읽어서 해시 값 확인
existing_hashes = set()
if os.path.exists(csv_file_name):
with open(csv_file_name, "r") as csv_file:
csv_reader = csv.reader(csv_file)
next(csv_reader, None) # 헤더 건너뛰기
for row in csv_reader:
if len(row) >= 4: # 해시 값이 있는지 확인
existing_hashes.add(row[3])
# CSV 파일에 추가
with open(csv_file_name, "a", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
# 헤더가 없는 경우 추가
if not os.path.exists(csv_file_name) or os.stat(csv_file_name).st_size == 0:
csv_writer.writerow(["file_name", "func_name", "params", "hash"])
for param in params:
hash_value = hashlib.md5(param.encode()).hexdigest() # MD5 해시 생성
if hash_value not in existing_hashes: # 중복 확인
csv_writer.writerow([file_name, func_name, param, hash_value])
existing_hashes.add(hash_value) # 새로운 해시 추가
print(f"CSV 파일 업데이트 완료: {csv_file_name}")
except json.JSONDecodeError as e:
print("JSON 디코딩 오류:", e)
else:
print("Joern 실행 실패!")
print("=== 오류 메시지 ===")
print(result.stderr.strip())
내일은 코드 추상화 진행 예정!
'Ⅳ. 기타' 카테고리의 다른 글
colab 환경에서 웹 사이트 구축해보기 (flask / ngrok) 및 python 코드 실행해보기! (0) | 2024.11.29 |
---|---|
backward slicing (추상화:joern) + 전체 소스코드 추상화 (0) | 2024.11.29 |
vul 폴더에 있는 patch 파일을 통해 취약한 코드 있는 여부 확인해보기 (0) | 2024.11.25 |
CVE database (구축) : 고도화 (combined -> ) (2) | 2024.11.25 |
deepdfa + linevul 운영(초안)단계 시도해보기! 두번째 (2) | 2024.11.21 |