agencies

DeepDFA 실행해보기 (END) 최종 - 2 - 본문

Ⅲ. 정보보안

DeepDFA 실행해보기 (END) 최종 - 2 -

agencies 2024. 10. 14. 18:20

이전시간에는 joern 을 실행하는 부분을 진행했습니다.

 

bash scripts/run_dbize.sh

다음은 run_dbize.sh를 실행해야 합니다.

위 파일의 내용을 확인하면 총 2개의 py 파일을 실행합니다.

 

 

dbize.py 및 dbize_graphs.py 에 import 경로를 추가해주고 실행해봅니다.

 

 

 

[dbize.py]

더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import sastvd.helpers.datasets as svdds
import sastvd as svd
import sastvd.helpers.evaluate as ivde
from sastvd.linevd.utils import feature_extraction

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()

sample_mode = args.sample
dsname = args.dsname

df = svdds.ds(dsname, cache=True, sample=sample_mode)
df = svdds.ds_filter(
    df,
    dsname,
    check_file=True,
    check_valid=True,
    vulonly=False,
    load_code=False,
    sample=-1,
    sample_mode=sample_mode,
)
print(df)

#%%
if dsname == "bigvul":
    graph_type = "cfg"
    dep_add_lines = ivde.get_dep_add_lines_bigvul("bigvul", sample=sample_mode)
    dep_add_lines = {k: set(list(v["removed"]) + v["depadd"]) for k, v in dep_add_lines.items()}

    def get_vuln(lineno, _id, dep_add_lines):
        if _id in dep_add_lines and lineno in dep_add_lines[_id]:
            return 1
        else:
            return 0

    def graph_features(_id):
        itempath = svdds.itempath(_id)
        n, e = feature_extraction(itempath,
            graph_type=graph_type,
            return_nodes=True,
            return_edges=True,
            group=False,
            )
        n["vuln"] = n.lineNumber.apply(get_vuln, _id=_id, dep_add_lines=dep_add_lines)
        n = n.drop(columns=["id"])
        n = n.reset_index().rename(columns={"index": "dgl_id"})
        n["graph_id"] = _id
        e["graph_id"] = _id
        n = n.reset_index(drop=True)
        e = e.reset_index(drop=True)
        return n, e

    node_dfs, edge_dfs = zip(*svd.dfmp(df, graph_features, "id"))
elif dsname == "devign":
    graph_type = "cfg"

    def graph_features(row):
        _id = row["id"]
        target = row["target"]
        itempath = svdds.itempath(_id, dsname)
        n, e = feature_extraction(itempath,
            graph_type=graph_type,
            return_nodes=True,
            return_edges=True,
            group=False,
            )
        n["vuln"] = target
        n = n.drop(columns=["id"])
        n = n.reset_index().rename(columns={"index": "dgl_id"})
        n["graph_id"] = _id
        e["graph_id"] = _id
        n = n.reset_index(drop=True)
        e = e.reset_index(drop=True)
        return n, e

    node_dfs, edge_dfs = zip(*svd.dfmp(df, graph_features, ["id", "target"]))

#%%
node_dfs[0]

#%%
edge_dfs[0]

#%%
print(node_dfs[0])
print(edge_dfs[0])

#%%
import pandas as pd
node_dfs = pd.concat(node_dfs, ignore_index=True)
edge_dfs = pd.concat(edge_dfs, ignore_index=True)

#%%
print("percentage of vuln nodes:", node_dfs.value_counts("vuln", normalize=True), sep="\n")
print("percentage of graphs with at least 1 vuln:", node_dfs.groupby("graph_id")["vuln"].agg(lambda g: 1 if g.any() else 0).value_counts(normalize=True), sep="\n")

#%%
sample_text = "_sample" if sample_mode else ""
node_dfs.to_csv(svd.processed_dir() / dsname / f"nodes{sample_text}.csv")
edge_dfs.to_csv(svd.processed_dir() / dsname / f"edges{sample_text}.csv")

print("done")

 

 

이어서 dbize_graphs.py 도 실행합니다.

 

[dbize_graphs.py]

더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import pandas as pd
import sastvd as svd

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()

sample_mode = args.sample
dsname = args.dsname

sample_text = "_sample" if sample_mode else ""
cols = ["Unnamed: 0", "graph_id", "innode", "outnode"]
edge_dfs = pd.read_csv(svd.processed_dir() / dsname / f"edges{sample_text}.csv", index_col=0, usecols=cols)
edge_dfs

#%%
import dgl
graphs = []
graph_ids = []
for graph_id, group in edge_dfs.groupby("graph_id"):
    g = dgl.graph((group["innode"].tolist(), group["outnode"].tolist()))
    g = dgl.add_self_loop(g)
    graphs.append(g)
    graph_ids.append(graph_id)

#%%
from dgl.data.utils import save_graphs
import torch as th
print({"graph_id": th.LongTensor(graph_ids)})
save_graphs(str(svd.processed_dir() / dsname / f"graphs{sample_text}.bin"), graphs, {"graph_id": th.LongTensor(graph_ids)})

print("done")

 

 


 

bash scripts/run_abstract_dataflow.sh

4번째 단계입니다.

 

위 두 명령어를 실행하면 되는데,

마찬가지로 abstract_dataflow_full.py 에 path 를 추가해줍니다.

 

 

python abstract_dataflow_full.py --no-cache --stage 1 명령을 실행합니다.

 

이어서 python abstract_dataflow_full.py --no-cache --stage 2 도 실행합니다.

 

 

[abstract_dataflow_full.py]

더보기
"""
Extract abstract dataflow features from graphs
Yields:
- storage/cache/bigvul/abstract_dataflow.csv (cache)
- storage/processed/bigvul/abstract_dataflow_hash_api_datatype_literal_operator.csv
"""
import sys
sys.path.append("/content/DeepDFA/DDFA")

import argparse
import functools
import json
import re
import traceback
from multiprocessing import Pool

import networkx as nx
import pandas as pd
import code_gnn.analysis.dataflow as dataflow
import sastvd.helpers.datasets as svdds
import sastvd as svd
import tqdm

# Extract dataflow features from CPG

all_assignment_types = (
    "<operator>.assignmentDivision",
    "<operator>.assignmentExponentiation",
    "<operator>.assignmentPlus",
    "<operator>.assignmentMinus",
    "<operator>.assignmentModulo",
    "<operator>.assignmentMultiplication",
    "<operator>.preIncrement",
    "<operator>.preDecrement",
    "<operator>.postIncrement",
    "<operator>.postDecrement",
    "<operator>.assignment",
    "<operator>.assignmentOr",
    "<operator>.assignmentAnd",
    "<operator>.assignmentXor",
    "<operator>.assignmentArithmeticShiftRight",
    "<operator>.assignmentLogicalShiftRight",
    "<operator>.assignmentShiftLeft",
)

def is_decl(n_attr):
    # NOTE: this is local variable declarationsm
    # which are not considered definitions in formal DFA setting.
    # if n_attr["_label"] in ("LOCAL",):
    #     return True

    return n_attr["_label"] == "CALL" and n_attr["name"] in all_assignment_types


def get_dataflow_features(graph_id, raise_all=False, verbose=False):
    try:
        # breakpoint()
        cpg, n, e = dataflow.get_cpg(graph_id, dsname, return_n_e=True)
        # print(cpg)
        # print(n)
        # print(e)
        ast = dataflow.sub(cpg, "AST")
        arg_graph = dataflow.sub(cpg, "ARGUMENT")
        labels = nx.get_node_attributes(cpg, "_label")
        code = nx.get_node_attributes(cpg, "code")
        names = nx.get_node_attributes(cpg, "name")

        def recurse_datatype(v):
            v_attr = cpg.nodes[v]
            if verbose:
                print("recursing", v, v_attr)

            name_idx = {
                "<operator>.indirectIndexAccess": 1,
                "<operator>.indirectFieldAccess": 1,
                "<operator>.indirection": 1,
                "<operator>.fieldAccess": 1,
                "<operator>.postIncrement": 1,
                "<operator>.postDecrement": 1,
                "<operator>.preIncrement": 1,
                "<operator>.preDecrement": 1,
                "<operator>.addressOf": 1,
                "<operator>.cast": 2,
                "<operator>.addition": 1,
            }
            if v_attr["_label"] == "IDENTIFIER":
                return v, v_attr["typeFullName"]
            elif v_attr["_label"] == "CALL":
                if v_attr["name"] in name_idx.keys():
                    # TODO: Get field data type, not struct data type
                    args = {cpg.nodes[s]["order"]: s for s in arg_graph.successors(v)}
                    arg = args[name_idx[v_attr["name"]]]
                    arg_attr = cpg.nodes[arg]
                    if verbose:
                        print("index", arg, arg_attr)
                        if v_attr["name"] == "<operator>.addition":
                            print("addition debug", v, v_attr, arg, arg_attr)
                    if arg_attr["_label"] == "IDENTIFIER":
                        return arg, arg_attr["typeFullName"]
                    elif arg_attr["_label"] == "CALL":
                        return recurse_datatype(arg)
                    else:
                        raise NotImplementedError(
                            f"recurse_datatype index could not handle {v} {v_attr} -> {arg} {arg_attr}"
                        )
            raise NotImplementedError(
                f"recurse_datatype var could not handle {v} {v_attr}"
            )

        def get_raw_datatype(decl):
            decl_attr = cpg.nodes[decl]

            if verbose:
                print("parent", decl, decl_attr)

            if decl_attr["_label"] == "LOCAL":
                return decl, decl_attr["typeFullName"]
            elif decl_attr["_label"] == "CALL" and decl_attr[
                "name"
            ] in all_assignment_types + ("<operator>.cast",):
                args = {cpg.nodes[s]["order"]: s for s in arg_graph.successors(decl)}
                return recurse_datatype(args[1])
            else:
                raise NotImplementedError(
                    f"""get_raw_datatype did not handle {decl} {decl_attr}"""
                )

        def grab_declfeats(node_id):
            fields = []
            try:
                ret = get_raw_datatype(node_id)
                if ret is not None:
                    child_id, child_datatype = ret
                    fields.append(("datatype", child_id, child_datatype))

                # create a copy of the AST with method definitions excluded.
                # this avoids an issue where some variable definitions descend to
                # method definitions (probably by mistake), shown in graph 3.
                my_ast = ast.copy()
                my_ast.remove_nodes_from(
                    [
                        n
                        for n, attr in ast.nodes(data=True)
                        if attr["_label"] == "METHOD"
                    ]
                )

                to_search = nx.descendants(my_ast, node_id)
                for n in to_search:
                    if verbose:
                        print(
                            f"{node_id} desc {n} {code.get(n, None)} {names.get(n, None)} {nx.shortest_path(ast, node_id, n)}"
                        )
                    if labels[n] == "LITERAL":
                        fields.append(("literal", n, code.get(n, pd.NA)))
                    if labels[n] == "CALL":
                        if m := re.match(r"<operator>\.(.*)", names[n]):
                            operator_name = m.group(1)
                            if operator_name not in ("indirection",):
                                fields.append(("operator", n, operator_name))
                        # handle API call
                        else:
                            fields.append(("api", n, names[n]))
            except Exception:
                print("node error", node_id, traceback.format_exc())
                if raise_all:
                    raise
            return fields

        # nx.set_node_attributes(
        #     ast,
        #     {n: f"{n}: {attr['code']}" for n, attr in ast.nodes(data=True)},
        #     "label",
        # )
        # A = nx.drawing.nx_agraph.to_agraph(ast)
        # A.layout("dot")
        # A.draw("abcd.png")

        # n = n.rename(columns={"id": "node_id"})
        n["graph_id"] = graph_id
        # print("select nodes")
        # print(n["node_id"].isin(n for n, attr in cpg.nodes(data=True) if is_decl(attr)))
        decls = n[
            n["node_id"].isin(n for n, attr in cpg.nodes(data=True) if is_decl(attr))
        ].copy()
        decls["fields"] = decls["node_id"].apply(grab_declfeats)
        decls = decls.explode("fields").dropna()
        if verbose: print("extracted fields:", decls["fields"], sep="\n")
        if len(decls) > 0:
            decls["subkey"], decls["subkey_node_id"], decls["subkey_text"] = zip(
                *decls["fields"]
            )
        else:
            decls["subkey"] = None
            decls["subkey_node_id"] = None
            decls["subkey_text"] = None
        return decls
    except Exception:
        print("graph error", graph_id, traceback.format_exc())
        if raise_all:
            raise


# Get all abstract dataflow info
def get_dataflow_features_df():
    # print(get_dataflow_features_df)
    csv_file = (
        svd.cache_dir() / f"{dsname}/abstract_dataflow{'_sample' if args.sample else ''}.csv"
    )
    if csv_file.exists() and args.cache:
        dataflow_df = pd.read_csv(csv_file)
    else:
        dataflow_df = pd.DataFrame()
        all_df = svdds.ds(dsname, sample=args.sample)
        with Pool(args.workers) as pool:
            for decls_df in tqdm.tqdm(
                pool.imap(
                # map(
                    functools.partial(
                        get_dataflow_features,
                        raise_all=args.sample,
                        verbose=args.verbose,
                    ),
                    all_df.id,
                ),
                total=len(all_df),
                desc="get abstract dataflow features",
            ):
                dataflow_df = pd.concat([dataflow_df, decls_df], ignore_index=True)

        dataflow_df = dataflow_df[
            ["graph_id", "node_id", "subkey", "subkey_node_id", "subkey_text"]
        ]

        csv_file.parent.mkdir(exist_ok=True, parents=True)
        dataflow_df.to_csv(csv_file)

    return dataflow_df


def cleanup_datatype(df):
    """Assign datatype to cleaned-up version"""
    df.loc[df["subkey"] == "datatype", "subkey_text"] = dataflow_df[
        "subkey_text"
    ].apply(
        lambda dt: dt
        if pd.isna(dt)
        else re.sub(
            r"\s+", r" ", re.sub(r"^const ", r"", re.sub(r"\s*\[.*\]", r"[]", dt))
        ).strip()
    )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Abstract dataflow")
    parser.add_argument("--dsname", default="bigvul")
    parser.add_argument("--sample", action="store_true", help="Extract sample only")
    parser.add_argument("--verbose", action="store_true", help="Verbose output")
    parser.add_argument("--cache", action="store_true")
    parser.add_argument("--no-cache", dest="cache", action="store_false")
    parser.set_defaults(cache=True)
    parser.add_argument(
        "--workers", type=int, default=6, help="How many workers to use"
    )
    parser.add_argument("--stage", type=int, default=1, help="Which stages to execute")
    parser.add_argument("--select_subkeys", nargs="+", default=["api", "datatype", "literal", "operator"], help="Which subkeys to export")
    args = parser.parse_args()

    dsname = args.dsname

    args.select_subkeys = sorted(args.select_subkeys)

    dataflow_df = get_dataflow_features_df()
    print("dataflow_df", dataflow_df)
    print("dataflow_df counts", dataflow_df.value_counts("subkey"))
    print("dataflow_df na", dataflow_df[dataflow_df["subkey_text"].isna()])

    if args.stage <= 1:
        exit()

"""
generate hash value for each node
"""


def to_hash(group, select_subkeys):
    # print(group)
    _hash = {
        subkey: sorted(
            [
                s for s in group[group["subkey"] == subkey]["subkey_text"].tolist()
            ]
        )
        for subkey in select_subkeys
    }
    return json.dumps(_hash)


if __name__ == "__main__":
    # get most common subkeys
    # TODO: don't filter out missing files/graphs
    # source = svddc.BigVulDataset(partition="sample" if args.sample else "train", undersample=False)
    # source_df = source.df
    # print("generate hash from train", source_df, sep="\n")

    # source_df = pd.merge(source_df, dataflow_df, left_on="id", right_on="graph_id")
    # select_key = "datatype"
    # source_vc = source_df[source_df["subkey"] == select_key].value_counts("subkey_text")
    # print("train values", source_vc)


    # Export dataset
    # TODO: export more combinations of subkeys
    # select_subkeys = ["datatype", "operator", "api", "literal"]
    # select = {
    #     select_key: source_vc.index.sort_values().tolist(),
    # }
    hashes = dataflow_df.groupby(["graph_id", "node_id"]).apply(to_hash, select_subkeys=args.select_subkeys)
    all_df = dataflow_df.set_index(["graph_id", "node_id"]).join(
        hashes.to_frame("hash")
    ).reset_index()
    print("Got hashes")
    print(all_df)
    all_df = (
        all_df[["graph_id", "node_id", "hash"]]
        .sort_values(by=["graph_id", "node_id"])
        .reset_index(drop=True)
    )[["graph_id", "node_id", "hash"]].drop_duplicates()
    print("hash result")
    print(all_df)
    print(all_df["hash"].value_counts(dropna=False, normalize=True))

    all_df.to_csv(
        svd.get_dir(svd.processed_dir() / dsname) / f"abstract_dataflow_hash_{'_'.join(args.select_subkeys)}{'_sample' if args.sample else ''}.csv"
    )

 


마지막 단계입니다.

 

run_absdf.sh 파일 안에는

dbize_absdf.py 을 실행하라고 합니다.

 

 

실행을 하면 아래와 같이 오류가 발생하는데,

 

linevul_splits.csv
2.21MB

 

이 파일이 없어서 그런것이니  DDFA > storage > external > 에 추가해줍니다.

 

[dbize_absdf.py]

더보기
#%%
import sys
sys.path.append("/content/DeepDFA/DDFA")
import pandas as pd
import sastvd.helpers.datasets as svdds
import sastvd as svd

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--sample", action="store_true")
parser.add_argument("--dsname", default="bigvul")
args = parser.parse_args()

sample_mode = args.sample
dsname = args.dsname

sample_text = "_sample" if sample_mode else ""
cols = ["Unnamed: 0", "graph_id", "node_id"]
node_dfs = pd.read_csv(svd.processed_dir() / dsname / f"nodes{sample_text}.csv", index_col=0, usecols=cols)
node_dfs

#%%
for limitall in [1, 10, 100, 500, 1000, 5000, 10000]:
    for sfeat in ["datatype", "api", "literal", "operator"]:
        my_node_df = node_dfs.copy()
        limitsubkeys = limitall
        split = "fixed"
        seed = 0
        feat = f"_ABS_DATAFLOW_{sfeat}_all_limitall_{limitall}_limitsubkeys_{limitsubkeys}"
        dst_file = svd.processed_dir() / dsname / f"nodes_feat_{feat}_{split}{sample_text}.csv"
        print("processing", feat, "to", dst_file)

        abs_df, abs_df_hashes = svdds.abs_dataflow(feat, dsname, sample_mode, split=split, seed=seed)
        all_hash_idxs = abs_df_hashes["all"]
        all_hashes = abs_df.set_index(["graph_id", "node_id"])["hash.all"]

        def get_hash_idx(row):
            _hash = all_hashes.get((row["graph_id"], row["node_id"]), None)
            if _hash is None:
                # nid not in abstract features - not definition
                return 0
            else:
                # if None, then nid maps to UNKNOWN token
                return all_hash_idxs.get(_hash, all_hash_idxs[None]) + 1
        my_node_df[feat] = svd.dfmp(my_node_df, get_hash_idx, ["graph_id", "node_id"])
        my_node_df.to_csv(dst_file)
        print(dst_file, "saved")

 

 

 

 

 

 


 

각 파일 단위로는 어떤 파일인지는 더 분석이 필요하지만,

Deepdfa를 1회라도 돌려본 부분에 대해서 의의를 갖습니다.

 

 

DeepDFA.gz
8.90MB