Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- suninatas
- PHP
- 12기
- webhacking
- 불법유통
- 화학물질안전원
- UKPT
- 경기팀
- nurisec
- Los
- 국가정보원
- 도구모음
- UKPT level
- 정보보안
- 대외활동
- 여행
- codeup
- MITRE ATT&CK
- 파이썬
- 국가기록원
- HTML
- 웹 해킹 입문
- 프로젝트
- 기타정보
- 화학물질
- 연구모임
- Service
- 화학물질불법유통온라인감시단
- 불법유통근절
- 국정원
Archives
- Today
- Total
agencies
deepdfa + linevul 운영(초안)단계 시도해보기! 두번째 본문
import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from transformers import RobertaModel, RobertaConfig, RobertaTokenizer
import dgl
import pandas as pd
# FlowGNNGGNNModule class with debugging
# FlowGNNGGNNModule class with debugging
class FlowGNNGGNNModule(nn.Module):
def __init__(self, feat, input_dim, hidden_dim, n_steps, num_output_layers, label_style, concat_all_absdf, encoder_mode):
super(FlowGNNGGNNModule, self).__init__()
self.feature_keys = {"feature": feat}
self.embedding = nn.Embedding(input_dim, hidden_dim) # ABS_DATAFLOW 임베딩
self.ggnn = dgl.nn.pytorch.GatedGraphConv(
in_feats=hidden_dim,
out_feats=hidden_dim,
n_steps=n_steps,
n_etypes=1, # edge type의 수
)
self.pooling = dgl.nn.pytorch.GlobalAttentionPooling(
gate_nn=nn.Linear(hidden_dim, 1)
) # 그래프 수준의 출력으로 축소
def forward(self, graph, extrafeats):
print("Inside FlowGNN forward...")
print(f"Graph ndata keys: {graph.ndata.keys()}")
print(f"Graph ABS DataFlow Shape: {graph.ndata[self.feature_keys['feature']].shape}")
# ABS DataFlow 임베딩
feat = graph.ndata[self.feature_keys["feature"]]
print(f"Feature Shape Before Embedding: {feat.shape}") # (34, 4)
feat_embed = self.embedding(feat).mean(dim=1) # (34, 128)
print(f"Feature Shape After Embedding: {feat_embed.shape}")
# GGNN 적용
ggnn_out = self.ggnn(graph, feat_embed) # (34, 128)
print(f"GGNN Output Shape: {ggnn_out.shape}")
# 풀링으로 그래프 수준의 임베딩 생성
graph_embed = self.pooling(graph, ggnn_out) # (1, 128) - batch_size, hidden_dim
print(f"Graph-level Embedding Shape: {graph_embed.shape}")
return graph_embed
# RobertaClassificationHead class
class RobertaClassificationHead(nn.Module):
def __init__(self, config, extra_dim):
super().__init__()
self.dense = nn.Linear(config.hidden_size + extra_dim, config.hidden_size)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.out_proj = nn.Linear(config.hidden_size, 2)
def forward(self, features, flowgnn_embed, **kwargs):
x = features[:, 0, :] # [CLS] token
if flowgnn_embed is not None:
x = torch.cat((x, flowgnn_embed), dim=1)
x = self.dropout(x)
x = self.dense(x)
x = torch.tanh(x)
x = self.dropout(x)
x = self.out_proj(x)
return x
# Main model class
class Model(nn.Module):
def __init__(self, encoder, flowgnn_encoder, config, tokenizer, args):
super(Model, self).__init__()
self.encoder = encoder
self.flowgnn_encoder = flowgnn_encoder
self.tokenizer = tokenizer
self.classifier = RobertaClassificationHead(config, self.flowgnn_encoder.ggnn._out_feats)
self.args = args
def forward(self, input_embed=None, labels=None, graphs=None, input_ids=None, attention_mask=None):
# FlowGNN embedding
flowgnn_embed = self.flowgnn_encoder(graphs, {}) if graphs is not None else None
# RoBERTa output
outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
# Combine RoBERTa and FlowGNN outputs
logits = self.classifier(outputs.last_hidden_state, flowgnn_embed)
return logits
# Load tokenizer, config, and RoBERTa encoder
tokenizer = RobertaTokenizer.from_pretrained("roberta-base")
config = RobertaConfig.from_pretrained("roberta-base")
encoder = RobertaModel.from_pretrained("roberta-base")
# Arguments for the model
class Args:
def __init__(self):
self.no_flowgnn = False
args = Args()
# Initialize FlowGNN
flowgnn_encoder = FlowGNNGGNNModule(
feat="_ABS_DATAFLOW",
input_dim=10, # Adjust based on your ABS_DATAFLOW feature range
hidden_dim=128,
n_steps=5,
num_output_layers=2,
label_style="graph",
concat_all_absdf=False,
encoder_mode=True
)
# Initialize the main model
model = Model(encoder=encoder, flowgnn_encoder=flowgnn_encoder, config=config, tokenizer=tokenizer, args=args)
model.eval()
# Paths for nodes, edges, and ABS_DATAFLOW feature files
nodes_path = "DeepDFA/DDFA/storage/processed/bigvul/nodes.csv"
edges_path = "DeepDFA/DDFA/storage/processed/bigvul/edges.csv"
abs_dataflow_files = {
"operator": "DeepDFA/DDFA/storage/processed/bigvul/nodes_feat__ABS_DATAFLOW_operator_all_limitall_1000_limitsubkeys_1000_fixed.csv",
"literal": "DeepDFA/DDFA/storage/processed/bigvul/nodes_feat__ABS_DATAFLOW_literal_all_limitall_1000_limitsubkeys_1000_fixed.csv",
"datatype": "DeepDFA/DDFA/storage/processed/bigvul/nodes_feat__ABS_DATAFLOW_datatype_all_limitall_1000_limitsubkeys_1000_fixed.csv",
"api": "DeepDFA/DDFA/storage/processed/bigvul/nodes_feat__ABS_DATAFLOW_api_all_limitall_1000_limitsubkeys_1000_fixed.csv",
}
# Load nodes and edges
nodes_df = pd.read_csv(nodes_path)
edges_df = pd.read_csv(edges_path)
# Debugging: Print basic statistics of nodes and edges
print(f"Nodes DataFrame Shape: {nodes_df.shape}")
print(f"Edges DataFrame Shape: {edges_df.shape}")
# Load ABS_DATAFLOW features and merge them with nodes
abs_features_list = []
for feature_name, file_path in abs_dataflow_files.items():
abs_dataflow_df = pd.read_csv(file_path)
print(f"Loading ABS DataFlow feature: {feature_name}")
print(f"Feature DataFrame Shape: {abs_dataflow_df.shape}")
# Identify the column containing ABS_DATAFLOW feature
feature_column = [col for col in abs_dataflow_df.columns if "_ABS_DATAFLOW" in col][0]
# Rename the feature column to avoid conflicts
abs_dataflow_df = abs_dataflow_df[["node_id", feature_column]].rename(columns={feature_column: feature_name})
# Debugging: Check for duplicate columns in the merge
print(f"Columns before merge: {nodes_df.columns}")
print(f"ABS DataFlow Columns: {abs_dataflow_df.columns}")
# Merge ABS_DATAFLOW features with nodes
nodes_df = pd.merge(nodes_df, abs_dataflow_df, on="node_id", how="left", suffixes=('', f'_{feature_name}'))
# Collect the feature for tensor conversion
abs_features_list.append(torch.tensor(nodes_df[feature_name].fillna(0).values, dtype=torch.long))
# Combine ABS_DATAFLOW features into a single tensor
abs_features = torch.stack(abs_features_list, dim=1) # (num_nodes, 4)
print(f"ABS Features Shape: {abs_features.shape}")
print(f"ABS Features Sample: {abs_features[:5]}")
g = dgl.graph((edges_df["innode"].values, edges_df["outnode"].values))
g.ndata["_ABS_DATAFLOW"] = abs_features
# Test code file
test_code_path = "DeepDFA/DDFA/storage/processed/bigvul/before/0.c"
with open(test_code_path, "r", encoding="ISO-8859-1") as file:
test_code = file.read()
# Tokenize the test code
inputs = tokenizer(test_code, max_length=512, padding="max_length", truncation=True, return_tensors="pt")
# Evaluate the model
with torch.no_grad():
logits = model(input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], graphs=g)
probs = torch.softmax(logits, dim=-1)
predicted_label = torch.argmax(probs, dim=1).item()
# Print results
print("Predicted Probabilities (Non-Vulnerable, Vulnerable):", probs)
print("Predicted Label:", "Vulnerable (1)" if predicted_label == 1 else "Non-Vulnerable (0)")
출력 메시지
Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Nodes DataFrame Shape: (34, 19)
Edges DataFrame Shape: (34, 10)
Loading ABS DataFlow feature: operator
Feature DataFrame Shape: (34, 4)
Columns before merge: Index(['Unnamed: 0', 'dgl_id', '_label', 'name', 'code', 'lineNumber',
'columnNumber', 'lineNumberEnd', 'columnNumberEnd',
'controlStructureType', 'order', 'fullName', 'typeFullName',
'local_type', 'local_block', 'node_label', 'node_id', 'vuln',
'graph_id'],
dtype='object')
ABS DataFlow Columns: Index(['node_id', 'operator'], dtype='object')
Loading ABS DataFlow feature: literal
Feature DataFrame Shape: (34, 4)
Columns before merge: Index(['Unnamed: 0', 'dgl_id', '_label', 'name', 'code', 'lineNumber',
'columnNumber', 'lineNumberEnd', 'columnNumberEnd',
'controlStructureType', 'order', 'fullName', 'typeFullName',
'local_type', 'local_block', 'node_label', 'node_id', 'vuln',
'graph_id', 'operator'],
dtype='object')
ABS DataFlow Columns: Index(['node_id', 'literal'], dtype='object')
Loading ABS DataFlow feature: datatype
Feature DataFrame Shape: (34, 4)
Columns before merge: Index(['Unnamed: 0', 'dgl_id', '_label', 'name', 'code', 'lineNumber',
'columnNumber', 'lineNumberEnd', 'columnNumberEnd',
'controlStructureType', 'order', 'fullName', 'typeFullName',
'local_type', 'local_block', 'node_label', 'node_id', 'vuln',
'graph_id', 'operator', 'literal'],
dtype='object')
ABS DataFlow Columns: Index(['node_id', 'datatype'], dtype='object')
Loading ABS DataFlow feature: api
Feature DataFrame Shape: (34, 4)
Columns before merge: Index(['Unnamed: 0', 'dgl_id', '_label', 'name', 'code', 'lineNumber',
'columnNumber', 'lineNumberEnd', 'columnNumberEnd',
'controlStructureType', 'order', 'fullName', 'typeFullName',
'local_type', 'local_block', 'node_label', 'node_id', 'vuln',
'graph_id', 'operator', 'literal', 'datatype'],
dtype='object')
ABS DataFlow Columns: Index(['node_id', 'api'], dtype='object')
ABS Features Shape: torch.Size([34, 4])
ABS Features Sample: tensor([[0, 0, 0, 0],
[3, 4, 2, 3],
[2, 2, 2, 2],
[0, 0, 0, 0],
[2, 3, 2, 2]])
Inside FlowGNN forward...
Graph ndata keys: dict_keys(['_ABS_DATAFLOW'])
Graph ABS DataFlow Shape: torch.Size([34, 4])
Feature Shape Before Embedding: torch.Size([34, 4])
Feature Shape After Embedding: torch.Size([34, 128])
GGNN Output Shape: torch.Size([34, 128])
Graph-level Embedding Shape: torch.Size([1, 128])
Predicted Probabilities (Non-Vulnerable, Vulnerable): tensor([[0.5345, 0.4655]])
Predicted Label: Non-Vulnerable (0)
...
'Ⅳ. 기타' 카테고리의 다른 글
vul 폴더에 있는 patch 파일을 통해 취약한 코드 있는 여부 확인해보기 (0) | 2024.11.25 |
---|---|
CVE database (구축) : 고도화 (combined -> ) (2) | 2024.11.25 |
deepdfa + linevul 운영(초안)단계 시도해보기! (1) | 2024.11.20 |
소스코드 글씨를 예쁘게 색칠되도록 변환해주는 온라인 사이트 (0) | 2024.11.15 |
네트워크 및 소프트웨어 드라이브를 자동으로 잡아주는 유틸리티 설치 (0) | 2024.11.15 |