From 08242d1c9325271118a183e28711689eacd3f02a Mon Sep 17 00:00:00 2001 From: koki Date: Sun, 22 Dec 2024 03:24:35 +0800 Subject: [PATCH] control flow, opcode type --- examples/dis_demo.py | 16 +++- ohre/abcre/dis/AsmMethod.py | 21 +++--- ohre/abcre/dis/ControlFlow.py | 36 +++++++++ ohre/abcre/dis/{ISA_reader.py => ISA.py} | 37 ++++++---- ohre/abcre/dis/NAC.py | 5 +- ohre/abcre/dis/NACBlock.py | 42 ++++++----- ohre/abcre/dis/NACBlocks.py | 28 +++++-- ohre/abcre/dis/NACTYPE.py | 93 +++++++++++++++++++++++- ohre/abcre/dis/NAC_LV.py | 10 +++ 9 files changed, 235 insertions(+), 53 deletions(-) create mode 100644 ohre/abcre/dis/ControlFlow.py rename ohre/abcre/dis/{ISA_reader.py => ISA.py} (76%) create mode 100644 ohre/abcre/dis/NAC_LV.py diff --git a/examples/dis_demo.py b/examples/dis_demo.py index 966ceae..aafebbb 100644 --- a/examples/dis_demo.py +++ b/examples/dis_demo.py @@ -1,6 +1,7 @@ import argparse import ohre +from ohre.abcre.dis.ControlFlow import ControlFlow from ohre.abcre.dis.DisFile import DisFile from ohre.misc import Log @@ -15,8 +16,19 @@ dis_file = DisFile(dis_path) print(f"> {dis_file}") + # print(f"\n> {dis_file.debug_deep()}") - for method in dis_file.methods: - print(f">> {method.debug_deep()}") + # for method in dis_file.methods: + # print(f">> {method.debug_deep()}") + # for asmstr in dis_file.asmstrs: # print(f">> {asmstr}") + + # === reverse truly START + # print(f">> before ControlFlow build {dis_file.methods[0].debug_deep()}") + # dis_file.methods[0].split_native_code_block() + # print(f">> after ControlFlow build {dis_file.methods[0].debug_deep()}") + + for asm_method in dis_file.methods: + asm_method.split_native_code_block() + print(f">> CFed: {asm_method.debug_deep()}") diff --git a/ohre/abcre/dis/AsmMethod.py b/ohre/abcre/dis/AsmMethod.py index 58aa4a8..10b06db 100644 --- a/ohre/abcre/dis/AsmMethod.py +++ b/ohre/abcre/dis/AsmMethod.py @@ -1,11 +1,10 @@ from typing import Any, Dict, Iterable, List, Tuple from ohre.abcre.dis.AsmTypes import AsmTypes -from ohre.misc import utils -from ohre.misc import Log -from ohre.abcre.dis.NAC import NAC -from ohre.abcre.dis.NACBlock import NACBlock +from ohre.abcre.dis.ControlFlow import ControlFlow +from ohre.abcre.dis.NAC_LV import NAC_LV from ohre.abcre.dis.NACBlocks import NACBlocks +from ohre.misc import Log, utils class AsmMethod: @@ -22,6 +21,11 @@ def __init__(self, slotNumberIdx, lines: List[str]): insts = self._process_method(lines) self.nac_blocks = NACBlocks(insts) + def split_native_code_block(self): + assert self.nac_blocks.IR_lv == NAC_LV.NATIVE + self.nac_blocks = ControlFlow.split_native_code_block(self.nac_blocks) + self.nac_blocks.IR_lv = NAC_LV.NATIVE_BLOCK_SPLITED + def _process_1st_line(self, line: str): parts = line.split(" ") assert parts[0] == ".function" @@ -84,18 +88,17 @@ def _process_common_inst(self, line: str) -> List[str]: idx = utils.find_next_delimiter(line, start_idx) ret.append(line[start_idx: idx].strip()) idx = idx + 1 - print(f"final ret({len(ret)}) {ret}") return ret def __str__(self): return self.debug_short() def debug_short(self) -> str: - out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\ -args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})" + out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} ret {self.return_type} \ +file: {self.file_name}\n\ +args({len(self.args)}) {self.args} nac_blocks({len(self.nac_blocks)})" return out def debug_deep(self) -> str: - out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\ -args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})\n{self.nac_blocks.debug_deep()}" + out = f"{self.debug_short()}\n{self.nac_blocks.debug_deep()}" return out diff --git a/ohre/abcre/dis/ControlFlow.py b/ohre/abcre/dis/ControlFlow.py new file mode 100644 index 0000000..cda0e47 --- /dev/null +++ b/ohre/abcre/dis/ControlFlow.py @@ -0,0 +1,36 @@ +from ohre.abcre.dis.NACBlock import NACBlock +from ohre.abcre.dis.NACBlocks import NACBlocks +from ohre.abcre.dis.NACTYPE import NACTYPE +from ohre.misc import Log, utils + + +class ControlFlow(): + def split_native_code_block(blocks: NACBlocks) -> NACBlocks: + assert len(blocks) == 1 + nac_block = blocks.nac_blocks[0] + delimited_id: list = list() + for i in range(len(nac_block)): + nac = nac_block.nacs[i] + if (nac.type == NACTYPE.LABEL): + delimited_id.append(i) + elif (nac.type == NACTYPE.COND_JMP or nac.type == NACTYPE.UNCN_JMP or nac.type == NACTYPE.RETURN): + if (i + 1 < len(nac_block)): + delimited_id.append(i + 1) + delimited_id = sorted(list(set(delimited_id))) + if (len(nac_block) not in delimited_id): + delimited_id.append(len(nac_block)) + debug_out = "" + for idx in delimited_id: + if (idx < len(nac_block)): + debug_out += f"{idx}-{nac_block.nacs[idx]}; " + else: + debug_out += f"{idx} nac_block len {len(nac_block)}" + Log.info(f"[ControlFlow] delimited id-nac {debug_out}", False) + + final_nac_blocks: list = list() + idx_start = 0 + for i in range(len(delimited_id)): + idx_end = delimited_id[i] + final_nac_blocks.append(nac_block.get_slice_block(idx_start, idx_end)) + idx_start = idx_end + return NACBlocks(final_nac_blocks) diff --git a/ohre/abcre/dis/ISA_reader.py b/ohre/abcre/dis/ISA.py similarity index 76% rename from ohre/abcre/dis/ISA_reader.py rename to ohre/abcre/dis/ISA.py index f4a8af2..08bd72f 100644 --- a/ohre/abcre/dis/ISA_reader.py +++ b/ohre/abcre/dis/ISA.py @@ -1,3 +1,4 @@ +import copy import json import os from typing import Any, Dict, Iterable, List, Tuple @@ -44,20 +45,28 @@ def _get_opstr_dict(self) -> Dict[str, Dict]: description: str = group["description"].strip() if "description" in group.keys() else None verification: List | None = group["verification"] if "verification" in group.keys() else None exceptions: List | None = group["exceptions"] if "exceptions" in group.keys() else None - properties: List | None = group["properties"] if "properties" in group.keys() else None + properties_common: List | None = group["properties"] if "properties" in group.keys() else None namespace: str = group["namespace"].strip() if "namespace" in group.keys() else None pseudo: str = group["pseudo"].strip() if "pseudo" in group.keys() else None semantics: str = group["semantics"].strip() if "semantics" in group.keys() else None assert "instructions" in group.keys() - for ins in group["instructions"]: - assert "sig" in ins.keys() and "opcode_idx" in ins.keys() - opstr = ins["sig"].split(" ")[0].strip() - opcode_idx = ins["opcode_idx"] - - acc = ins["acc"] if "acc" in ins.keys() else None - format = ins["format"] if "format" in ins.keys() else None - prefix = ins["prefix"] if "prefix" in ins.keys() else None + for inst in group["instructions"]: + assert "sig" in inst.keys() and "opcode_idx" in inst.keys() + opstr = inst["sig"].split(" ")[0].strip() + opcode_idx = inst["opcode_idx"] + + acc = inst["acc"] if "acc" in inst.keys() else None + format = inst["format"] if "format" in inst.keys() else None + prefix = inst["prefix"] if "prefix" in inst.keys() else None + properties_inst: List | None = inst["properties"] if "properties" in inst.keys() else None + properties = None + if (properties_inst is not None and properties_common is not None): + properties = copy.deepcopy(properties_common + properties_inst) + elif (properties_inst is not None and properties_common is None): + properties = copy.deepcopy(properties_inst) + elif (properties_inst is None and properties_common is not None): + properties = copy.deepcopy(properties_common) if (prefix is not None): # final_opcode = prefix_opcode|op_code # concat, not 'or' prefix_opcode = self._get_prefix_opcode(prefix) @@ -65,14 +74,14 @@ def _get_opstr_dict(self) -> Dict[str, Dict]: opcode_idx = [(prefix_opcode << 8) + op_code for op_code in opcode_idx] ret[opstr] = { - "sig": ins["sig"], + "sig": inst["sig"], "acc": acc, "opcode_idx": opcode_idx, "prefix": prefix, "format": format, "title": title, "description": description, "verification": verification, "exceptions": exceptions, "properties": properties, "namespace": namespace, "pseudo": pseudo, "semantics": semantics} return ret def get_opcodes(self, opstr: str) -> List | None: - opcode_info_d = self.get_opcode_info_dict(opstr) + opcode_info_d = self.get_opstr_info_dict(opstr) if (opcode_info_d is None): return None else: @@ -82,7 +91,7 @@ def get_opcodes(self, opstr: str) -> List | None: Log.warn(f"[ISA] opstr {opstr}, opcode_idx not in {opcode_info_d.keys()}") return None - def get_opcode_info_dict(self, opstr: str) -> Dict | None: + def get_opstr_info_dict(self, opstr: str) -> Dict | None: if opstr in self.opstr2infod.keys(): return self.opstr2infod[opstr] else: @@ -103,8 +112,8 @@ def get_opcode_info_dict(self, opstr: str) -> Dict | None: # print(json.dumps(isa.ori_d["groups"], indent=4)) assert isa.get_opcodes("deprecated.getiteratornext") == [0xfc02] assert isa.get_opcodes("callruntime.notifyconcurrentresult") == [0xfb00] - for ins_str in ["mov", "callruntime.definefieldbyindex", "isin"]: - print(f"{ins_str}: {utils.hexstr(isa.get_opcodes(ins_str))} {isa.get_opcode_info_dict(ins_str)}") + for ins_str in ["mov", "callruntime.definefieldbyindex", "isin", "jequndefined"]: + print(f"{ins_str}: {utils.hexstr(isa.get_opcodes(ins_str))} {isa.get_opstr_info_dict(ins_str)}") title_set = set() for opstr in isa.opstr2infod.keys(): title_set.add(isa.opstr2infod[opstr]["title"]) diff --git a/ohre/abcre/dis/NAC.py b/ohre/abcre/dis/NAC.py index 1620ecf..01225f2 100644 --- a/ohre/abcre/dis/NAC.py +++ b/ohre/abcre/dis/NAC.py @@ -1,4 +1,5 @@ from typing import Any, Dict, Iterable, List, Tuple + from ohre.abcre.dis.NACTYPE import NACTYPE @@ -10,7 +11,9 @@ def __init__(self, op_args: List[str]): assert len(op_args) > 0 self.op = op_args[0] self.type = NACTYPE.get_NAC_type(self.op) - self.args = list() + if (self.type == NACTYPE.LABEL and self.op.endswith(":")): + self.op = self.op[:-1] + self.args: list = list() for i in range(1, len(op_args)): self.args.append(op_args[i]) diff --git a/ohre/abcre/dis/NACBlock.py b/ohre/abcre/dis/NACBlock.py index 29e0f13..a8ece0a 100644 --- a/ohre/abcre/dis/NACBlock.py +++ b/ohre/abcre/dis/NACBlock.py @@ -1,33 +1,39 @@ +import copy from typing import Any, Dict, Iterable, List, Tuple + from ohre.abcre.dis.NAC import NAC from ohre.abcre.dis.NACTYPE import NACTYPE -import copy - -class NACBLOCK_LV: - NATIVE = 0 - LEVEL1 = 1 - LEVEL2 = 2 - -class NACBlock(): - def __init__(self, insts: List[List[str]], level=NACBLOCK_LV.NATIVE): - assert len(insts) > 0 +class NACBlock(): # asm instruction(NAC) cantained + def __init__(self, in_l: List[List[str]] | List[NAC]): + assert len(in_l) >= 0 self.nacs: List[NAC] = list() - self.level = level - for inst in insts: - assert len(inst) > 0 - self.nacs.append(NAC(inst)) + if (isinstance(in_l[0], NAC)): # NAC in list + self.nacs = copy.deepcopy(in_l) + else: # maybe list in list # anyway, try init NAC using element in list + for inst in in_l: + assert len(inst) > 0 + self.nacs.append(NAC(inst)) + + def get_slice_block(self, idx_start: int, idx_end: int): + return NACBlock(copy.deepcopy(self.nacs[idx_start: idx_end])) def __str__(self): return self.debug_short() + def __len__(self): + return len(self.nacs) + def debug_short(self): - out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}" + out = f"NACBlock: nacs {len(self.nacs)}" return out def debug_deep(self): - out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}\n" + out = f"NACBlock: nacs {len(self.nacs)}\n" for i in range(len(self.nacs)): - out += f"{i}\t{self.nacs[i].debug_deep()}\n" - return out \ No newline at end of file + if (self.nacs[i].type == NACTYPE.LABEL): + out += f"{i} {self.nacs[i].debug_deep()}\n" + else: + out += f"{i}\t{self.nacs[i].debug_deep()}\n" + return out.strip() diff --git a/ohre/abcre/dis/NACBlocks.py b/ohre/abcre/dis/NACBlocks.py index 946b500..72b1f8c 100644 --- a/ohre/abcre/dis/NACBlocks.py +++ b/ohre/abcre/dis/NACBlocks.py @@ -1,13 +1,22 @@ +import copy from typing import Any, Dict, Iterable, List, Tuple -from ohre.abcre.dis.NACBlock import NACBlock + from ohre.abcre.dis.NAC import NAC +from ohre.abcre.dis.NAC_LV import NAC_LV +from ohre.abcre.dis.NACBlock import NACBlock from ohre.abcre.dis.NACTYPE import NACTYPE -import copy -class NACBlocks(): - def __init__(self, insts: List[List[str]]): - self.nac_blocks: List[NACBlock] = [NACBlock(insts)] +class NACBlocks(): # NAC block contained, build control flow graph inside a single NACBlocks for one method + def __init__(self, in_l: List[List[str]] | List[NACBlock]): + assert len(in_l) >= 0 + self.nac_blocks: List[NACBlock] = list() + self.IR_lv = NAC_LV.NATIVE # native + + if (isinstance(in_l[0], NACBlock)): # NACBlock in list + self.nac_blocks = copy.deepcopy(in_l) + else: # maybe list(str) in list # anyway, try init NACBlock using element(asm codea str list) in list + self.nac_blocks: List[NACBlock] = [NACBlock(in_l)] def __str__(self): return self.debug_short() @@ -16,12 +25,15 @@ def __str__(self): def len(self): return len(self.nac_blocks) + def __len__(self): + return len(self.nac_blocks) + def debug_short(self): - out = f"NACBlocks: block len {len(self.nac_blocks)}" + out = f"NACBlocks: nac block({len(self.nac_blocks)}) {NAC_LV.get_code_name(self.IR_lv)}" return out def debug_deep(self): out = f"{self.debug_short()}\n" for i in range(len(self.nac_blocks)): - out += f"{i}-block: {self.nac_blocks[i].debug_deep()}\n" - return out \ No newline at end of file + out += f"[{i}/{len(self.nac_blocks)}]-block: {self.nac_blocks[i].debug_deep()}\n" + return out diff --git a/ohre/abcre/dis/NACTYPE.py b/ohre/abcre/dis/NACTYPE.py index 983cd15..edea9fa 100644 --- a/ohre/abcre/dis/NACTYPE.py +++ b/ohre/abcre/dis/NACTYPE.py @@ -1,9 +1,20 @@ +import os + +from ohre.abcre.dis.ISA import ISA from ohre.abcre.enum.BaseEnum import BaseEnum +from ohre.misc import Log, utils + + +def _value_in_key_of_dict(d: dict, key, value): + if (key in d.keys() and d[key] is not None and value in d[key]): + return True + return False class NACTYPE(BaseEnum): def __init__(self): super().__init__() + ASSIGN = 0 # at most 3 arg COND_JMP = 1 # 3 arg UNCN_JMP = 2 # 1 arg # unconditional @@ -13,8 +24,88 @@ def __init__(self): RETURN = 6 # 1 arg IMPORT = 11 LABEL = 12 + NOP = 20 + # >= 30: need more analysis + CMP_INST = 30 # comparation instructions + OBJ_VISIT = 31 # object visitors + DEFINITION = 32 # definition instuctions + ITER = 33 + OBJ_LD = 34 + OBJ_CREATE = 35 + BINARY_OP = 40 + UNARY_OP = 41 + MOV = 42 + ACC_LD = 43 + ACC_ST = 44 + # default: UNKNOWN = 99 + isa: ISA | None = None + @classmethod def get_NAC_type(cls, op: str) -> int: - return NACTYPE.UNKNOWN \ No newline at end of file + if (cls.isa is None): + NACTYPE.init_from_ISAyaml(os.path.join(os.path.dirname(os.path.abspath(__file__)), "isa.yaml")) + op = op.strip() + if (op.endswith(":")): + return NACTYPE.LABEL + + info_d = cls.isa.get_opstr_info_dict(op) + # print(f"op {op} info_d {info_d}") + assert info_d is not None and "title" in info_d.keys() + if (_value_in_key_of_dict(info_d, "properties", "return")): + return NACTYPE.RETURN + elif (op == "nop"): + return NACTYPE.NOP + # unconditional jump + elif (op == "jmp"): + return NACTYPE.UNCN_JMP + # conditional jump + elif (_value_in_key_of_dict(info_d, "properties", "jump") and _value_in_key_of_dict(info_d, "properties", "conditional")): + return NACTYPE.COND_JMP + elif (_value_in_key_of_dict(info_d, "properties", "conditional_throw")): + return NACTYPE.COND_THROW + elif ("prefix" in info_d.keys() and info_d["prefix"] == "throw"): + return NACTYPE.UNCN_THROW + elif ("call instructions" in info_d["title"] or "call runtime functions" in info_d["title"]): + return NACTYPE.CALL + # TODO: future work + elif ("comparation instructions" in info_d["title"]): + return NACTYPE.CMP_INST + elif ("object visitors" in info_d["title"].lower()): + return NACTYPE.OBJ_VISIT + elif ("definition instuctions" in info_d["title"].lower()): + return NACTYPE.DEFINITION + elif ("constant object loaders" in info_d["title"].lower()): + return NACTYPE.OBJ_LD + elif ("object creaters" in info_d["title"].lower()): + return NACTYPE.OBJ_CREATE + elif ("iterator instructions" in info_d["title"].lower()): + return NACTYPE.ITER + elif ("binary operations" in info_d["title"].lower()): + return NACTYPE.BINARY_OP + elif ("unary operations" in info_d["title"].lower()): + return NACTYPE.UNARY_OP + elif ("Dynamic move register-to-register".lower() in info_d["title"].lower()): + return NACTYPE.MOV + elif ("load accumulator" in info_d["title"].lower()): + return NACTYPE.ACC_LD + elif ("store accumulator" in info_d["title"].lower()): + return NACTYPE.ACC_ST + Log.warn(f"[NACTYPE] op {op} get UNKNOWN type") + return NACTYPE.UNKNOWN + + @classmethod + def init_from_ISAyaml(cls, yaml_path: str): + cls.isa = ISA(yaml_path) + + +if __name__ == "__main__": + NACTYPE.init_from_ISAyaml(os.path.join(os.path.dirname(os.path.abspath(__file__)), "isa.yaml")) + # for inst in [ + # "mov", "return", "ldobjbyname", "jeqz", "jnez", "jstricteq", "jnstricteq", "throw", "throw.notexists", + # "throw.ifnotobject"]: + # print(f"inst {inst}: {NACTYPE.get_code_name(NACTYPE.get_NAC_type(inst))}") + for inst in NACTYPE.isa.opstr2infod.keys(): + print(f"inst {inst}: {NACTYPE.get_code_name(NACTYPE.get_NAC_type(inst))}") + assert NACTYPE.get_code_name(NACTYPE.get_NAC_type(inst)) != "UNKNOWN" diff --git a/ohre/abcre/dis/NAC_LV.py b/ohre/abcre/dis/NAC_LV.py new file mode 100644 index 0000000..e8fe35e --- /dev/null +++ b/ohre/abcre/dis/NAC_LV.py @@ -0,0 +1,10 @@ +from ohre.abcre.enum.BaseEnum import BaseEnum + + +class NAC_LV(BaseEnum): + def __init__(self): + super().__init__() + NATIVE = 0 + NATIVE_BLOCK_SPLITED = 1 + IR_LV1 = 2 + IR_LV2 = 3