Skip to content

Commit

Permalink
control flow, opcode type
Browse files Browse the repository at this point in the history
  • Loading branch information
kokifish committed Dec 21, 2024
1 parent a61a8fc commit 08242d1
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 53 deletions.
16 changes: 14 additions & 2 deletions examples/dis_demo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse

import ohre
from ohre.abcre.dis.ControlFlow import ControlFlow
from ohre.abcre.dis.DisFile import DisFile
from ohre.misc import Log

Expand All @@ -15,8 +16,19 @@
dis_file = DisFile(dis_path)

print(f"> {dis_file}")

# print(f"\n> {dis_file.debug_deep()}")
for method in dis_file.methods:
print(f">> {method.debug_deep()}")
# for method in dis_file.methods:
# print(f">> {method.debug_deep()}")

# for asmstr in dis_file.asmstrs:
# print(f">> {asmstr}")

# === reverse truly START
# print(f">> before ControlFlow build {dis_file.methods[0].debug_deep()}")
# dis_file.methods[0].split_native_code_block()
# print(f">> after ControlFlow build {dis_file.methods[0].debug_deep()}")

for asm_method in dis_file.methods:
asm_method.split_native_code_block()
print(f">> CFed: {asm_method.debug_deep()}")
21 changes: 12 additions & 9 deletions ohre/abcre/dis/AsmMethod.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Any, Dict, Iterable, List, Tuple

from ohre.abcre.dis.AsmTypes import AsmTypes
from ohre.misc import utils
from ohre.misc import Log
from ohre.abcre.dis.NAC import NAC
from ohre.abcre.dis.NACBlock import NACBlock
from ohre.abcre.dis.ControlFlow import ControlFlow
from ohre.abcre.dis.NAC_LV import NAC_LV
from ohre.abcre.dis.NACBlocks import NACBlocks
from ohre.misc import Log, utils


class AsmMethod:
Expand All @@ -22,6 +21,11 @@ def __init__(self, slotNumberIdx, lines: List[str]):
insts = self._process_method(lines)
self.nac_blocks = NACBlocks(insts)

def split_native_code_block(self):
assert self.nac_blocks.IR_lv == NAC_LV.NATIVE
self.nac_blocks = ControlFlow.split_native_code_block(self.nac_blocks)
self.nac_blocks.IR_lv = NAC_LV.NATIVE_BLOCK_SPLITED

def _process_1st_line(self, line: str):
parts = line.split(" ")
assert parts[0] == ".function"
Expand Down Expand Up @@ -84,18 +88,17 @@ def _process_common_inst(self, line: str) -> List[str]:
idx = utils.find_next_delimiter(line, start_idx)
ret.append(line[start_idx: idx].strip())
idx = idx + 1
print(f"final ret({len(ret)}) {ret}")
return ret

def __str__(self):
return self.debug_short()

def debug_short(self) -> str:
out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\
args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})"
out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} ret {self.return_type} \
file: {self.file_name}\n\
args({len(self.args)}) {self.args} nac_blocks({len(self.nac_blocks)})"
return out

def debug_deep(self) -> str:
out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\
args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})\n{self.nac_blocks.debug_deep()}"
out = f"{self.debug_short()}\n{self.nac_blocks.debug_deep()}"
return out
36 changes: 36 additions & 0 deletions ohre/abcre/dis/ControlFlow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from ohre.abcre.dis.NACBlock import NACBlock
from ohre.abcre.dis.NACBlocks import NACBlocks
from ohre.abcre.dis.NACTYPE import NACTYPE
from ohre.misc import Log, utils


class ControlFlow():
def split_native_code_block(blocks: NACBlocks) -> NACBlocks:
assert len(blocks) == 1
nac_block = blocks.nac_blocks[0]
delimited_id: list = list()
for i in range(len(nac_block)):
nac = nac_block.nacs[i]
if (nac.type == NACTYPE.LABEL):
delimited_id.append(i)
elif (nac.type == NACTYPE.COND_JMP or nac.type == NACTYPE.UNCN_JMP or nac.type == NACTYPE.RETURN):
if (i + 1 < len(nac_block)):
delimited_id.append(i + 1)
delimited_id = sorted(list(set(delimited_id)))
if (len(nac_block) not in delimited_id):
delimited_id.append(len(nac_block))
debug_out = ""
for idx in delimited_id:
if (idx < len(nac_block)):
debug_out += f"{idx}-{nac_block.nacs[idx]}; "
else:
debug_out += f"{idx} nac_block len {len(nac_block)}"
Log.info(f"[ControlFlow] delimited id-nac {debug_out}", False)

final_nac_blocks: list = list()
idx_start = 0
for i in range(len(delimited_id)):
idx_end = delimited_id[i]
final_nac_blocks.append(nac_block.get_slice_block(idx_start, idx_end))
idx_start = idx_end
return NACBlocks(final_nac_blocks)
37 changes: 23 additions & 14 deletions ohre/abcre/dis/ISA_reader.py → ohre/abcre/dis/ISA.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import json
import os
from typing import Any, Dict, Iterable, List, Tuple
Expand Down Expand Up @@ -44,35 +45,43 @@ def _get_opstr_dict(self) -> Dict[str, Dict]:
description: str = group["description"].strip() if "description" in group.keys() else None
verification: List | None = group["verification"] if "verification" in group.keys() else None
exceptions: List | None = group["exceptions"] if "exceptions" in group.keys() else None
properties: List | None = group["properties"] if "properties" in group.keys() else None
properties_common: List | None = group["properties"] if "properties" in group.keys() else None
namespace: str = group["namespace"].strip() if "namespace" in group.keys() else None
pseudo: str = group["pseudo"].strip() if "pseudo" in group.keys() else None
semantics: str = group["semantics"].strip() if "semantics" in group.keys() else None

assert "instructions" in group.keys()
for ins in group["instructions"]:
assert "sig" in ins.keys() and "opcode_idx" in ins.keys()
opstr = ins["sig"].split(" ")[0].strip()
opcode_idx = ins["opcode_idx"]

acc = ins["acc"] if "acc" in ins.keys() else None
format = ins["format"] if "format" in ins.keys() else None
prefix = ins["prefix"] if "prefix" in ins.keys() else None
for inst in group["instructions"]:
assert "sig" in inst.keys() and "opcode_idx" in inst.keys()
opstr = inst["sig"].split(" ")[0].strip()
opcode_idx = inst["opcode_idx"]

acc = inst["acc"] if "acc" in inst.keys() else None
format = inst["format"] if "format" in inst.keys() else None
prefix = inst["prefix"] if "prefix" in inst.keys() else None
properties_inst: List | None = inst["properties"] if "properties" in inst.keys() else None
properties = None
if (properties_inst is not None and properties_common is not None):
properties = copy.deepcopy(properties_common + properties_inst)
elif (properties_inst is not None and properties_common is None):
properties = copy.deepcopy(properties_inst)
elif (properties_inst is None and properties_common is not None):
properties = copy.deepcopy(properties_common)

if (prefix is not None): # final_opcode = prefix_opcode|op_code # concat, not 'or'
prefix_opcode = self._get_prefix_opcode(prefix)
assert prefix_opcode != -1
opcode_idx = [(prefix_opcode << 8) + op_code for op_code in opcode_idx]

ret[opstr] = {
"sig": ins["sig"],
"sig": inst["sig"],
"acc": acc, "opcode_idx": opcode_idx, "prefix": prefix, "format": format, "title": title,
"description": description, "verification": verification, "exceptions": exceptions,
"properties": properties, "namespace": namespace, "pseudo": pseudo, "semantics": semantics}
return ret

def get_opcodes(self, opstr: str) -> List | None:
opcode_info_d = self.get_opcode_info_dict(opstr)
opcode_info_d = self.get_opstr_info_dict(opstr)
if (opcode_info_d is None):
return None
else:
Expand All @@ -82,7 +91,7 @@ def get_opcodes(self, opstr: str) -> List | None:
Log.warn(f"[ISA] opstr {opstr}, opcode_idx not in {opcode_info_d.keys()}")
return None

def get_opcode_info_dict(self, opstr: str) -> Dict | None:
def get_opstr_info_dict(self, opstr: str) -> Dict | None:
if opstr in self.opstr2infod.keys():
return self.opstr2infod[opstr]
else:
Expand All @@ -103,8 +112,8 @@ def get_opcode_info_dict(self, opstr: str) -> Dict | None:
# print(json.dumps(isa.ori_d["groups"], indent=4))
assert isa.get_opcodes("deprecated.getiteratornext") == [0xfc02]
assert isa.get_opcodes("callruntime.notifyconcurrentresult") == [0xfb00]
for ins_str in ["mov", "callruntime.definefieldbyindex", "isin"]:
print(f"{ins_str}: {utils.hexstr(isa.get_opcodes(ins_str))} {isa.get_opcode_info_dict(ins_str)}")
for ins_str in ["mov", "callruntime.definefieldbyindex", "isin", "jequndefined"]:
print(f"{ins_str}: {utils.hexstr(isa.get_opcodes(ins_str))} {isa.get_opstr_info_dict(ins_str)}")
title_set = set()
for opstr in isa.opstr2infod.keys():
title_set.add(isa.opstr2infod[opstr]["title"])
Expand Down
5 changes: 4 additions & 1 deletion ohre/abcre/dis/NAC.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Any, Dict, Iterable, List, Tuple

from ohre.abcre.dis.NACTYPE import NACTYPE


Expand All @@ -10,7 +11,9 @@ def __init__(self, op_args: List[str]):
assert len(op_args) > 0
self.op = op_args[0]
self.type = NACTYPE.get_NAC_type(self.op)
self.args = list()
if (self.type == NACTYPE.LABEL and self.op.endswith(":")):
self.op = self.op[:-1]
self.args: list = list()
for i in range(1, len(op_args)):
self.args.append(op_args[i])

Expand Down
42 changes: 24 additions & 18 deletions ohre/abcre/dis/NACBlock.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
import copy
from typing import Any, Dict, Iterable, List, Tuple

from ohre.abcre.dis.NAC import NAC
from ohre.abcre.dis.NACTYPE import NACTYPE
import copy


class NACBLOCK_LV:
NATIVE = 0
LEVEL1 = 1
LEVEL2 = 2


class NACBlock():
def __init__(self, insts: List[List[str]], level=NACBLOCK_LV.NATIVE):
assert len(insts) > 0
class NACBlock(): # asm instruction(NAC) cantained
def __init__(self, in_l: List[List[str]] | List[NAC]):
assert len(in_l) >= 0
self.nacs: List[NAC] = list()
self.level = level
for inst in insts:
assert len(inst) > 0
self.nacs.append(NAC(inst))
if (isinstance(in_l[0], NAC)): # NAC in list
self.nacs = copy.deepcopy(in_l)
else: # maybe list in list # anyway, try init NAC using element in list
for inst in in_l:
assert len(inst) > 0
self.nacs.append(NAC(inst))

def get_slice_block(self, idx_start: int, idx_end: int):
return NACBlock(copy.deepcopy(self.nacs[idx_start: idx_end]))

def __str__(self):
return self.debug_short()

def __len__(self):
return len(self.nacs)

def debug_short(self):
out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}"
out = f"NACBlock: nacs {len(self.nacs)}"
return out

def debug_deep(self):
out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}\n"
out = f"NACBlock: nacs {len(self.nacs)}\n"
for i in range(len(self.nacs)):
out += f"{i}\t{self.nacs[i].debug_deep()}\n"
return out
if (self.nacs[i].type == NACTYPE.LABEL):
out += f"{i} {self.nacs[i].debug_deep()}\n"
else:
out += f"{i}\t{self.nacs[i].debug_deep()}\n"
return out.strip()
28 changes: 20 additions & 8 deletions ohre/abcre/dis/NACBlocks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import copy
from typing import Any, Dict, Iterable, List, Tuple
from ohre.abcre.dis.NACBlock import NACBlock

from ohre.abcre.dis.NAC import NAC
from ohre.abcre.dis.NAC_LV import NAC_LV
from ohre.abcre.dis.NACBlock import NACBlock
from ohre.abcre.dis.NACTYPE import NACTYPE
import copy


class NACBlocks():
def __init__(self, insts: List[List[str]]):
self.nac_blocks: List[NACBlock] = [NACBlock(insts)]
class NACBlocks(): # NAC block contained, build control flow graph inside a single NACBlocks for one method
def __init__(self, in_l: List[List[str]] | List[NACBlock]):
assert len(in_l) >= 0
self.nac_blocks: List[NACBlock] = list()
self.IR_lv = NAC_LV.NATIVE # native

if (isinstance(in_l[0], NACBlock)): # NACBlock in list
self.nac_blocks = copy.deepcopy(in_l)
else: # maybe list(str) in list # anyway, try init NACBlock using element(asm codea str list) in list
self.nac_blocks: List[NACBlock] = [NACBlock(in_l)]

def __str__(self):
return self.debug_short()
Expand All @@ -16,12 +25,15 @@ def __str__(self):
def len(self):
return len(self.nac_blocks)

def __len__(self):
return len(self.nac_blocks)

def debug_short(self):
out = f"NACBlocks: block len {len(self.nac_blocks)}"
out = f"NACBlocks: nac block({len(self.nac_blocks)}) {NAC_LV.get_code_name(self.IR_lv)}"
return out

def debug_deep(self):
out = f"{self.debug_short()}\n"
for i in range(len(self.nac_blocks)):
out += f"{i}-block: {self.nac_blocks[i].debug_deep()}\n"
return out
out += f"[{i}/{len(self.nac_blocks)}]-block: {self.nac_blocks[i].debug_deep()}\n"
return out
Loading

0 comments on commit 08242d1

Please sign in to comment.