Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/rmq device #30

Merged
merged 5 commits into from
Dec 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions src/AMSWorkflow/ams/stage.py
Original file line number Diff line number Diff line change
@@ -220,6 +220,7 @@ def header_format(self) -> str:
- 4 bytes are the number of elements in the message. Limit max: 2^32 - 1
- 2 bytes are the input dimension. Limit max: 65535
- 2 bytes are the output dimension. Limit max: 65535
- 4 bytes are for aligning memory to 8

|__Header_size__|__Datatype__|__Rank__|__#elem__|__InDim__|__OutDim__|...real data...|

@@ -229,7 +230,7 @@ def header_format(self) -> str:
|__Header_(12B)__|__Input 1__|__Output 1__|...|__Input_K__|__Output_K__|

"""
return "BBHIHH"
return "BBHIHHI"

def endianness(self) -> str:
"""
@@ -272,6 +273,7 @@ def _parse_header(self, body: str) -> dict:
res["num_element"],
res["input_dim"],
res["output_dim"],
res["padding"],
) = struct.unpack(fmt, body[:hsize])
assert hsize == res["hsize"]
assert res["datatype"] in [4, 8]
@@ -314,9 +316,9 @@ def _decode(self, body: str) -> Tuple[np.array]:
while body:
header_info = self._parse_header(body)
temp_input, temp_output = self._parse_data(body, header_info)
print(f"input shape {temp_input.shape} outpute shape {temp_output.shape}")
# total size of byte we read for that message
chunk_size = header_info["hsize"] + header_info["dsize"]
print(f"Processing message #{i}")
input.append(temp_input)
output.append(temp_output)
# We remove the current message and keep going
@@ -442,6 +444,7 @@ def __call__(self):
fn = get_unique_fn()
fn = f"{self.out_dir}/{fn}.{self.suffix}"
is_terminate = False
total_bytes_written = 0
with self.data_writer_cls(fn) as fd:
bytes_written = 0
while True:
@@ -454,6 +457,8 @@ def __call__(self):
bytes_written += data.inputs.size * data.inputs.itemsize
bytes_written += data.outputs.size * data.outputs.itemsize
fd.store(data.inputs, data.outputs)
total_bytes_written += data.inputs.size * data.inputs.itemsize
total_bytes_written += data.outputs.size * data.outputs.itemsize
# FIXME: We currently decide to chunk files to 2GB
# of contents. Is this a good size?
if is_terminate or bytes_written >= 2 * 1024 * 1024 * 1024:
@@ -465,7 +470,7 @@ def __call__(self):
break

end = time.time()
print(f"Spend {end - start} at {self.__class__.__name__}")
print(f"Spend {end - start} {total_bytes_written} at {self.__class__.__name__}")


class PushToStore(Task):
@@ -545,7 +550,6 @@ def __init__(self, db_dir, store, dest_dir=None, stage_dir=None, db_type="hdf5")
initializes the Pipeline class to write the final data in the 'dest_dir' using a file writer of type 'db_type'
and optionally caching the data in the 'stage_dir' before making them available in the cache store.
"""
print("DATABASE DIR IS: ", db_dir)
self.ams_config = AMSInstance.from_path(db_dir)

if dest_dir is not None:
@@ -782,12 +786,12 @@ class RMQPipeline(Pipeline):
rmq_queue: The RMQ queue to listen to.
"""

def __init__(self, store, dest_dir, stage_dir, db_type, credentials, cacert, rmq_queue):
def __init__(self, db_dir, store, dest_dir, stage_dir, db_type, credentials, cacert, rmq_queue):
"""
Initialize a RMQPipeline that will write data to the 'dest_dir' and optionally publish
these files to the kosh-store 'store' by using the stage_dir as an intermediate directory.
"""
super().__init__(store, dest_dir, stage_dir, db_type)
super().__init__(db_dir, store, dest_dir, stage_dir, db_type)
self._credentials = Path(credentials)
self._cacert = Path(cacert)
self._rmq_queue = rmq_queue
@@ -819,7 +823,17 @@ def from_cli(cls, args):
"""
Create RMQPipeline from the user provided CLI.
"""
return cls(args.store, args.dest_dir, args.stage_dir, args.db_type, args.creds, args.cert, args.queue)
print("Creating database from here", args.persistent_db_path)
return cls(
args.persistent_db_path,
args.store,
args.dest_dir,
args.stage_dir,
args.db_type,
args.creds,
args.cert,
args.queue,
)


def get_pipeline(src_mechanism="fs"):
5 changes: 3 additions & 2 deletions src/AMSWorkflow/ams_wf/AMSDBStage.py
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ def main():
"--load", "-l", dest="user_module", help="Path implementing a custom pipeline stage module", default=None
)
parser.add_argument(
"--class", "-cls", dest="user_class", help="Class implementing the 'Action' performed on data", default=None
"--class", dest="user_class", help="Class implementing the 'Action' performed on data", default=None
)
parser.add_argument(
"--policy",
@@ -42,6 +42,7 @@ def main():
user_class = None
user_args = None
user_prog = ""
print(f"User class is {args.user_module} {args.user_class}")

if args.user_module is not None:
user_class = load_class(args.user_module, args.user_class)
@@ -55,6 +56,7 @@ def main():
user_args, extras = user_parser.parse_known_args(extras)

pipeline_cls = get_pipeline(args.mechanism)
print(pipeline_cls)
pipeline_parser = argparse.ArgumentParser(
prog=pipeline_cls.__name__,
description="Pipeline mechanism to load data from specified end-point",
@@ -81,5 +83,4 @@ def main():


if __name__ == "__main__":
print("{0}".format(" ".join(sys.argv)))
main()
8 changes: 7 additions & 1 deletion src/AMSlib/ml/hdcache.hpp
Original file line number Diff line number Diff line change
@@ -415,6 +415,11 @@ class HDCache
} else {
_evaluate(ndata, data, is_acceptable);
}

if (cache_location == AMSResourceType::DEVICE) {
deviceCheckErrors(__FILE__, __LINE__);
}

DBG(UQModule, "Done with evalution of uq")
}

@@ -621,7 +626,8 @@ class HDCache
#ifdef __ENABLE_CUDA__
random_uq_device<<<1, 1>>>(is_acceptable, ndata, acceptable_error);
#else
THROW(std::runtime_error, "Random-uq is not configured to use device allocations");
THROW(std::runtime_error,
"Random-uq is not configured to use device allocations");
#endif
} else {
random_uq_host(is_acceptable, ndata, acceptable_error);
14 changes: 14 additions & 0 deletions src/AMSlib/ml/surrogate.hpp
Original file line number Diff line number Diff line change
@@ -203,6 +203,10 @@ class SurrogateModel
tensorToArray(output, num_elements, num_out, outputs);
}

if (is_device()) {
deviceCheckErrors(__FILE__, __LINE__);
}

DBG(Surrogate,
"Evaluate surrogate model (%ld, %ld) -> (%ld, %ld)",
num_elements,
@@ -357,7 +361,17 @@ class SurrogateModel
if (typeid(TypeInValue) == typeid(double)) return true;
return false;
}

#endif

inline bool is_device() const
{
#ifdef __ENABLE_TORCH__
return model_resource == AMSResourceType::DEVICE;
#else
return false;
#endif
}

bool is_DeltaUQ() { return _is_DeltaUQ; }
};
Loading
Oops, something went wrong.