Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
Tags
- 경기팀
- HTML
- Service
- 화학물질불법유통온라인감시단
- 국정원
- 정보보안
- 연구모임
- 화학물질
- 12기
- UKPT
- suninatas
- PHP
- codeup
- webhacking
- 대외활동
- 국가기록원
- 불법유통근절
- 불법유통
- 프로젝트
- UKPT level
- nurisec
- 웹 해킹 입문
- 파이썬
- 화학물질안전원
- 도구모음
- MITRE ATT&CK
- 여행
- 국가정보원
- Los
- 기타정보
Archives
- Today
- Total
agencies
DeepDFA 실행해보기 (END) 최종 - 2 - 본문
이전시간에는 joern 을 실행하는 부분을 진행했습니다.
bash scripts/run_dbize.sh
다음은 run_dbize.sh를 실행해야 합니다.
위 파일의 내용을 확인하면 총 2개의 py 파일을 실행합니다.
dbize.py 및 dbize_graphs.py 에 import 경로를 추가해주고 실행해봅니다.
[dbize.py]
더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import sastvd.helpers.datasets as svdds
import sastvd as svd
import sastvd.helpers.evaluate as ivde
from sastvd.linevd.utils import feature_extraction
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()
sample_mode = args.sample
dsname = args.dsname
df = svdds.ds(dsname, cache=True, sample=sample_mode)
df = svdds.ds_filter(
df,
dsname,
check_file=True,
check_valid=True,
vulonly=False,
load_code=False,
sample=-1,
sample_mode=sample_mode,
)
print(df)
#%%
if dsname == "bigvul":
graph_type = "cfg"
dep_add_lines = ivde.get_dep_add_lines_bigvul("bigvul", sample=sample_mode)
dep_add_lines = {k: set(list(v["removed"]) + v["depadd"]) for k, v in dep_add_lines.items()}
def get_vuln(lineno, _id, dep_add_lines):
if _id in dep_add_lines and lineno in dep_add_lines[_id]:
return 1
else:
return 0
def graph_features(_id):
itempath = svdds.itempath(_id)
n, e = feature_extraction(itempath,
graph_type=graph_type,
return_nodes=True,
return_edges=True,
group=False,
)
n["vuln"] = n.lineNumber.apply(get_vuln, _id=_id, dep_add_lines=dep_add_lines)
n = n.drop(columns=["id"])
n = n.reset_index().rename(columns={"index": "dgl_id"})
n["graph_id"] = _id
e["graph_id"] = _id
n = n.reset_index(drop=True)
e = e.reset_index(drop=True)
return n, e
node_dfs, edge_dfs = zip(*svd.dfmp(df, graph_features, "id"))
elif dsname == "devign":
graph_type = "cfg"
def graph_features(row):
_id = row["id"]
target = row["target"]
itempath = svdds.itempath(_id, dsname)
n, e = feature_extraction(itempath,
graph_type=graph_type,
return_nodes=True,
return_edges=True,
group=False,
)
n["vuln"] = target
n = n.drop(columns=["id"])
n = n.reset_index().rename(columns={"index": "dgl_id"})
n["graph_id"] = _id
e["graph_id"] = _id
n = n.reset_index(drop=True)
e = e.reset_index(drop=True)
return n, e
node_dfs, edge_dfs = zip(*svd.dfmp(df, graph_features, ["id", "target"]))
#%%
node_dfs[0]
#%%
edge_dfs[0]
#%%
print(node_dfs[0])
print(edge_dfs[0])
#%%
import pandas as pd
node_dfs = pd.concat(node_dfs, ignore_index=True)
edge_dfs = pd.concat(edge_dfs, ignore_index=True)
#%%
print("percentage of vuln nodes:", node_dfs.value_counts("vuln", normalize=True), sep="\n")
print("percentage of graphs with at least 1 vuln:", node_dfs.groupby("graph_id")["vuln"].agg(lambda g: 1 if g.any() else 0).value_counts(normalize=True), sep="\n")
#%%
sample_text = "_sample" if sample_mode else ""
node_dfs.to_csv(svd.processed_dir() / dsname / f"nodes{sample_text}.csv")
edge_dfs.to_csv(svd.processed_dir() / dsname / f"edges{sample_text}.csv")
print("done")
이어서 dbize_graphs.py 도 실행합니다.
[dbize_graphs.py]
더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import pandas as pd
import sastvd as svd
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()
sample_mode = args.sample
dsname = args.dsname
sample_text = "_sample" if sample_mode else ""
cols = ["Unnamed: 0", "graph_id", "innode", "outnode"]
edge_dfs = pd.read_csv(svd.processed_dir() / dsname / f"edges{sample_text}.csv", index_col=0, usecols=cols)
edge_dfs
#%%
import dgl
graphs = []
graph_ids = []
for graph_id, group in edge_dfs.groupby("graph_id"):
g = dgl.graph((group["innode"].tolist(), group["outnode"].tolist()))
g = dgl.add_self_loop(g)
graphs.append(g)
graph_ids.append(graph_id)
#%%
from dgl.data.utils import save_graphs
import torch as th
print({"graph_id": th.LongTensor(graph_ids)})
save_graphs(str(svd.processed_dir() / dsname / f"graphs{sample_text}.bin"), graphs, {"graph_id": th.LongTensor(graph_ids)})
print("done")
bash scripts/run_abstract_dataflow.sh
4번째 단계입니다.
위 두 명령어를 실행하면 되는데,
마찬가지로 abstract_dataflow_full.py 에 path 를 추가해줍니다.
python abstract_dataflow_full.py --no-cache --stage 1 명령을 실행합니다.
이어서 python abstract_dataflow_full.py --no-cache --stage 2 도 실행합니다.
[abstract_dataflow_full.py]
더보기
"""
Extract abstract dataflow features from graphs
Yields:
- storage/cache/bigvul/abstract_dataflow.csv (cache)
- storage/processed/bigvul/abstract_dataflow_hash_api_datatype_literal_operator.csv
"""
import sys
sys.path.append("/content/DeepDFA/DDFA")
import argparse
import functools
import json
import re
import traceback
from multiprocessing import Pool
import networkx as nx
import pandas as pd
import code_gnn.analysis.dataflow as dataflow
import sastvd.helpers.datasets as svdds
import sastvd as svd
import tqdm
# Extract dataflow features from CPG
all_assignment_types = (
"<operator>.assignmentDivision",
"<operator>.assignmentExponentiation",
"<operator>.assignmentPlus",
"<operator>.assignmentMinus",
"<operator>.assignmentModulo",
"<operator>.assignmentMultiplication",
"<operator>.preIncrement",
"<operator>.preDecrement",
"<operator>.postIncrement",
"<operator>.postDecrement",
"<operator>.assignment",
"<operator>.assignmentOr",
"<operator>.assignmentAnd",
"<operator>.assignmentXor",
"<operator>.assignmentArithmeticShiftRight",
"<operator>.assignmentLogicalShiftRight",
"<operator>.assignmentShiftLeft",
)
def is_decl(n_attr):
# NOTE: this is local variable declarationsm
# which are not considered definitions in formal DFA setting.
# if n_attr["_label"] in ("LOCAL",):
# return True
return n_attr["_label"] == "CALL" and n_attr["name"] in all_assignment_types
def get_dataflow_features(graph_id, raise_all=False, verbose=False):
try:
# breakpoint()
cpg, n, e = dataflow.get_cpg(graph_id, dsname, return_n_e=True)
# print(cpg)
# print(n)
# print(e)
ast = dataflow.sub(cpg, "AST")
arg_graph = dataflow.sub(cpg, "ARGUMENT")
labels = nx.get_node_attributes(cpg, "_label")
code = nx.get_node_attributes(cpg, "code")
names = nx.get_node_attributes(cpg, "name")
def recurse_datatype(v):
v_attr = cpg.nodes[v]
if verbose:
print("recursing", v, v_attr)
name_idx = {
"<operator>.indirectIndexAccess": 1,
"<operator>.indirectFieldAccess": 1,
"<operator>.indirection": 1,
"<operator>.fieldAccess": 1,
"<operator>.postIncrement": 1,
"<operator>.postDecrement": 1,
"<operator>.preIncrement": 1,
"<operator>.preDecrement": 1,
"<operator>.addressOf": 1,
"<operator>.cast": 2,
"<operator>.addition": 1,
}
if v_attr["_label"] == "IDENTIFIER":
return v, v_attr["typeFullName"]
elif v_attr["_label"] == "CALL":
if v_attr["name"] in name_idx.keys():
# TODO: Get field data type, not struct data type
args = {cpg.nodes[s]["order"]: s for s in arg_graph.successors(v)}
arg = args[name_idx[v_attr["name"]]]
arg_attr = cpg.nodes[arg]
if verbose:
print("index", arg, arg_attr)
if v_attr["name"] == "<operator>.addition":
print("addition debug", v, v_attr, arg, arg_attr)
if arg_attr["_label"] == "IDENTIFIER":
return arg, arg_attr["typeFullName"]
elif arg_attr["_label"] == "CALL":
return recurse_datatype(arg)
else:
raise NotImplementedError(
f"recurse_datatype index could not handle {v} {v_attr} -> {arg} {arg_attr}"
)
raise NotImplementedError(
f"recurse_datatype var could not handle {v} {v_attr}"
)
def get_raw_datatype(decl):
decl_attr = cpg.nodes[decl]
if verbose:
print("parent", decl, decl_attr)
if decl_attr["_label"] == "LOCAL":
return decl, decl_attr["typeFullName"]
elif decl_attr["_label"] == "CALL" and decl_attr[
"name"
] in all_assignment_types + ("<operator>.cast",):
args = {cpg.nodes[s]["order"]: s for s in arg_graph.successors(decl)}
return recurse_datatype(args[1])
else:
raise NotImplementedError(
f"""get_raw_datatype did not handle {decl} {decl_attr}"""
)
def grab_declfeats(node_id):
fields = []
try:
ret = get_raw_datatype(node_id)
if ret is not None:
child_id, child_datatype = ret
fields.append(("datatype", child_id, child_datatype))
# create a copy of the AST with method definitions excluded.
# this avoids an issue where some variable definitions descend to
# method definitions (probably by mistake), shown in graph 3.
my_ast = ast.copy()
my_ast.remove_nodes_from(
[
n
for n, attr in ast.nodes(data=True)
if attr["_label"] == "METHOD"
]
)
to_search = nx.descendants(my_ast, node_id)
for n in to_search:
if verbose:
print(
f"{node_id} desc {n} {code.get(n, None)} {names.get(n, None)} {nx.shortest_path(ast, node_id, n)}"
)
if labels[n] == "LITERAL":
fields.append(("literal", n, code.get(n, pd.NA)))
if labels[n] == "CALL":
if m := re.match(r"<operator>\.(.*)", names[n]):
operator_name = m.group(1)
if operator_name not in ("indirection",):
fields.append(("operator", n, operator_name))
# handle API call
else:
fields.append(("api", n, names[n]))
except Exception:
print("node error", node_id, traceback.format_exc())
if raise_all:
raise
return fields
# nx.set_node_attributes(
# ast,
# {n: f"{n}: {attr['code']}" for n, attr in ast.nodes(data=True)},
# "label",
# )
# A = nx.drawing.nx_agraph.to_agraph(ast)
# A.layout("dot")
# A.draw("abcd.png")
# n = n.rename(columns={"id": "node_id"})
n["graph_id"] = graph_id
# print("select nodes")
# print(n["node_id"].isin(n for n, attr in cpg.nodes(data=True) if is_decl(attr)))
decls = n[
n["node_id"].isin(n for n, attr in cpg.nodes(data=True) if is_decl(attr))
].copy()
decls["fields"] = decls["node_id"].apply(grab_declfeats)
decls = decls.explode("fields").dropna()
if verbose: print("extracted fields:", decls["fields"], sep="\n")
if len(decls) > 0:
decls["subkey"], decls["subkey_node_id"], decls["subkey_text"] = zip(
*decls["fields"]
)
else:
decls["subkey"] = None
decls["subkey_node_id"] = None
decls["subkey_text"] = None
return decls
except Exception:
print("graph error", graph_id, traceback.format_exc())
if raise_all:
raise
# Get all abstract dataflow info
def get_dataflow_features_df():
# print(get_dataflow_features_df)
csv_file = (
svd.cache_dir() / f"{dsname}/abstract_dataflow{'_sample' if args.sample else ''}.csv"
)
if csv_file.exists() and args.cache:
dataflow_df = pd.read_csv(csv_file)
else:
dataflow_df = pd.DataFrame()
all_df = svdds.ds(dsname, sample=args.sample)
with Pool(args.workers) as pool:
for decls_df in tqdm.tqdm(
pool.imap(
# map(
functools.partial(
get_dataflow_features,
raise_all=args.sample,
verbose=args.verbose,
),
all_df.id,
),
total=len(all_df),
desc="get abstract dataflow features",
):
dataflow_df = pd.concat([dataflow_df, decls_df], ignore_index=True)
dataflow_df = dataflow_df[
["graph_id", "node_id", "subkey", "subkey_node_id", "subkey_text"]
]
csv_file.parent.mkdir(exist_ok=True, parents=True)
dataflow_df.to_csv(csv_file)
return dataflow_df
def cleanup_datatype(df):
"""Assign datatype to cleaned-up version"""
df.loc[df["subkey"] == "datatype", "subkey_text"] = dataflow_df[
"subkey_text"
].apply(
lambda dt: dt
if pd.isna(dt)
else re.sub(
r"\s+", r" ", re.sub(r"^const ", r"", re.sub(r"\s*\[.*\]", r"[]", dt))
).strip()
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Abstract dataflow")
parser.add_argument("--dsname", default="bigvul")
parser.add_argument("--sample", action="store_true", help="Extract sample only")
parser.add_argument("--verbose", action="store_true", help="Verbose output")
parser.add_argument("--cache", action="store_true")
parser.add_argument("--no-cache", dest="cache", action="store_false")
parser.set_defaults(cache=True)
parser.add_argument(
"--workers", type=int, default=6, help="How many workers to use"
)
parser.add_argument("--stage", type=int, default=1, help="Which stages to execute")
parser.add_argument("--select_subkeys", nargs="+", default=["api", "datatype", "literal", "operator"], help="Which subkeys to export")
args = parser.parse_args()
dsname = args.dsname
args.select_subkeys = sorted(args.select_subkeys)
dataflow_df = get_dataflow_features_df()
print("dataflow_df", dataflow_df)
print("dataflow_df counts", dataflow_df.value_counts("subkey"))
print("dataflow_df na", dataflow_df[dataflow_df["subkey_text"].isna()])
if args.stage <= 1:
exit()
"""
generate hash value for each node
"""
def to_hash(group, select_subkeys):
# print(group)
_hash = {
subkey: sorted(
[
s for s in group[group["subkey"] == subkey]["subkey_text"].tolist()
]
)
for subkey in select_subkeys
}
return json.dumps(_hash)
if __name__ == "__main__":
# get most common subkeys
# TODO: don't filter out missing files/graphs
# source = svddc.BigVulDataset(partition="sample" if args.sample else "train", undersample=False)
# source_df = source.df
# print("generate hash from train", source_df, sep="\n")
# source_df = pd.merge(source_df, dataflow_df, left_on="id", right_on="graph_id")
# select_key = "datatype"
# source_vc = source_df[source_df["subkey"] == select_key].value_counts("subkey_text")
# print("train values", source_vc)
# Export dataset
# TODO: export more combinations of subkeys
# select_subkeys = ["datatype", "operator", "api", "literal"]
# select = {
# select_key: source_vc.index.sort_values().tolist(),
# }
hashes = dataflow_df.groupby(["graph_id", "node_id"]).apply(to_hash, select_subkeys=args.select_subkeys)
all_df = dataflow_df.set_index(["graph_id", "node_id"]).join(
hashes.to_frame("hash")
).reset_index()
print("Got hashes")
print(all_df)
all_df = (
all_df[["graph_id", "node_id", "hash"]]
.sort_values(by=["graph_id", "node_id"])
.reset_index(drop=True)
)[["graph_id", "node_id", "hash"]].drop_duplicates()
print("hash result")
print(all_df)
print(all_df["hash"].value_counts(dropna=False, normalize=True))
all_df.to_csv(
svd.get_dir(svd.processed_dir() / dsname) / f"abstract_dataflow_hash_{'_'.join(args.select_subkeys)}{'_sample' if args.sample else ''}.csv"
)
마지막 단계입니다.
run_absdf.sh 파일 안에는
dbize_absdf.py 을 실행하라고 합니다.
실행을 하면 아래와 같이 오류가 발생하는데,
이 파일이 없어서 그런것이니 DDFA > storage > external > 에 추가해줍니다.
[dbize_absdf.py]
더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import pandas as pd
import sastvd.helpers.datasets as svdds
import sastvd as svd
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()
sample_mode = args.sample
dsname = args.dsname
sample_text = "_sample" if sample_mode else ""
cols = ["Unnamed: 0", "graph_id", "node_id"]
node_dfs = pd.read_csv(svd.processed_dir() / dsname / f"nodes{sample_text}.csv", index_col=0, usecols=cols)
node_dfs
#%%
for limitall in [1, 10, 100, 500, 1000, 5000, 10000]:
for sfeat in ["datatype", "api", "literal", "operator"]:
my_node_df = node_dfs.copy()
limitsubkeys = limitall
split = "fixed"
seed = 0
feat = f"_ABS_DATAFLOW_{sfeat}_all_limitall_{limitall}_limitsubkeys_{limitsubkeys}"
dst_file = svd.processed_dir() / dsname / f"nodes_feat_{feat}_{split}{sample_text}.csv"
print("processing", feat, "to", dst_file)
abs_df, abs_df_hashes = svdds.abs_dataflow(feat, dsname, sample_mode, split=split, seed=seed)
all_hash_idxs = abs_df_hashes["all"]
all_hashes = abs_df.set_index(["graph_id", "node_id"])["hash.all"]
def get_hash_idx(row):
_hash = all_hashes.get((row["graph_id"], row["node_id"]), None)
if _hash is None:
# nid not in abstract features - not definition
return 0
else:
# if None, then nid maps to UNKNOWN token
return all_hash_idxs.get(_hash, all_hash_idxs[None]) + 1
my_node_df[feat] = svd.dfmp(my_node_df, get_hash_idx, ["graph_id", "node_id"])
my_node_df.to_csv(dst_file)
print(dst_file, "saved")
각 파일 단위로는 어떤 파일인지는 더 분석이 필요하지만,
Deepdfa를 1회라도 돌려본 부분에 대해서 의의를 갖습니다.
'Ⅲ. 정보보안' 카테고리의 다른 글
DeepDFA setup (0) | 2024.10.21 |
---|---|
DeepDFA 실행해보기 (END) 최종 - 1[일부 수정] - : joern 스크립트 import 부분 (1) | 2024.10.15 |
DeepDFA 실행해보기 (END) 최종 - 1 - (3) | 2024.10.14 |
DeepDFA(초안) (1) | 2024.10.12 |
파이썬 엑셀 불러와서 읽을 때 (용량이 너무 커서 안 될 경우) (0) | 2024.10.12 |