-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test IO on large scale data #48
Comments
Looking for that "performance" label @hammer... |
I'll generate some big plink files and report back. |
I've done some simulations and generated a decent sized dataset:
We've got a 45G plink bed file, which is for 500K diploid samples. The total size of the zarr is
So, the chunking is good - looks like about 6M per compressed chunk. I think that's going to be a very good size for most applications. What's less good is this: there's about 13K files in this zarr directory. We are very much not going to make any friends with people using HPC systems if they have to move 13K files around and open/close substantial numbers of those files at a time. There are good options for doing this better in zarr, but I think it's unreasonable for users to need to understand these and have to provide the options to I think the way to go is to provide a command line utility which will package up some of these common requirements as options to the CLI, and provide things like progress monitors etc. I'll open an issue on sgkit-plink to track The plink conversion worked really, really well, I was very impressed! There's room for improvement in terms of parallelisation though, I think - I'll open an issue to track also. |
Thanks for the stress test @jeromekelleher!
While I think we should provide a CLI to |
Yep, having an API function to do the most of the work is the way to go all right. |
That's awesome, I haven't tried anything nearly that big yet so thanks for running it! I'm definitely relieved that it didn't take more elbow grease. I have yet to find a good way to profile compiled code so assuming some of the suboptimalities are related to pysnptools and not Dask, what would you normally do to try to isolate where bottlenecks are if not in python? |
I can call in some people who might be able to help us out if we have a workload that's easy to set up and run. perf, valgrind, and gperftools are some tools that I've seen others use successfully in the past. I have not been involved in systems software projects since the eBPF tracing tools reached maturity but they also look quite fun. |
I should also mention that @toddlipcon used the Chromium tracing framework in Kudu. I'm not sure if it has much utility for single process applications, but just figured I'd point it out. |
Actually in Kudu we use it only for single process (multi-thread) tracing. There's a new version of this framework that's more standalone that I just came across here: https://perfetto.dev/ |
If I may add something here, in the face of (python) performance issue I like a top-down approach and apply Occam's razor. So in this case we could for example start with py-spy to easily get quick tracing information (this will likely provides clues), and we could also easily measure the performance without IO to compare the two scenarios. Apart from that already mentioned by @hammer eBPF tools are great (definitely recommended), but I would reach for them as you go down the rabbit hole (when you roughly know what you are looking for). Edit: py-spy has support for native frames and Edit 2: btw sysdig is good too, but it might be hard to apply in this scenario. |
I think the single threaded performance from pysnptools is probably quite good, and doesn't need any serious profiling at this point. I had a look at What I found surprising was that it wasn't using enough CPU. I had 40 threads available, but it was only using about 200% CPU. So, it's either an IO issue (we're waiting for storage) or some sort of parallelism issue. I guess it would be good to separate out the read and write side of this by writing the genotypes into a big array in RAM first, and then writing that zarr. Then we'll have a better idea of where the bottlenecks are. |
Great work! This is very promising. @jeromekelleher was it easy to generate the plink file? Would be good to doc how to do this for a variety of sizes. |
It's straightforward, but made pretty tedious by having to go through VCF first. The ideal pipeline would be tskit->sgkit->plink, which would mean we're not dependent on any external binaries. We're missing a few pieces of this puzzle first though. Here's the makefile I've been using:
|
@jeromekelleher do you know if the PLINK 2.0 It looks like pysnptools has a .bed writer but it seems to only take in-memory arrays. I haven't tried it yet but maybe it will work if provided a Dask array instead. Might be worth some experimentation or digging around in their source code. |
I don't think we should worry about optimising the writing plink-dataset-larger-than-ram case @eric-czech - in practise, people who want to work with large plink datasets using plink will have enough RAM to work with the dataset. I think it's fine to have memory only, especially for an initial version. |
Hey @jeromekelleher I was experimenting with this and I'm fairly certain the issue is that the GIL is stopping pysnptools from doing anything in parallel when multiple dask workers are running with the default 'threads' scheduler. I saw this same issue but you can easily get far greater CPU utilization by changing the default scheduler to use the multiprocessing backend (so the workers aren't in the same python process), i.e. dropping this at the top of the script: import dask
# Set the default scheduler to be used globally on computation of all arrays
dask.config.set(scheduler='processes') If you get a chance, I'd love to know if that makes the same difference for you as it does for me. |
I'm also finding that It's much better to use the 'threads' scheduler to read the data and the 'processes' scheduler to write it. For example: with dask.config.set(scheduler='threads'):
# Lots of inter-worker communication happens on read, but
# parallelism is low for python code limited by GIL.
ds = read_plink(...)
with dask.config.set(scheduler='processes'):
# Very little inter-worker communication happens on write and the GIL
# isn't an issue on separate processes.
ds.to_zarr(...) The biggest drawback of the 'processes' scheduler is that inter-worker communication is very expensive. Either way, that setup above would likely make for some solid improvements. |
Thanks @eric-czech, I'll try this out when I get a chance. (Catching up on a significant backlog atm) |
I've tried this out @eric-czech, and it's not looking like the GIL is the bottleneck here. Using the "processes" scheduler doesn't seem to change much - my server is still running ~95% idle as far as top is concerned (2.5% user, 3% system). There are occasional spikes where CPU usage goes up and all the worker processes are busy, but mostly there seems to be a lot of waiting for going on. I'm fairly sure IO isn't the bottleneck. Probably looking at the dynamic dask graph would help to understand what's going on here - I'm a dask noob though, so not much ideas on how to dig into it. |
Can you share the script you're running? |
import sgkit_plink
import sys
import dask.config
from dask.diagnostics import ProgressBar
if __name__ == "__main__":
path = sys.argv[1]
with dask.config.set(scheduler='threads'):
with ProgressBar():
ds = sgkit_plink.read_plink(path=path, bim_sep=" ", fam_sep=" ")
print("Read dataset")
print(ds)
with dask.config.set(scheduler='processes'):
with ProgressBar():
ds.to_zarr(sys.argv[2], mode="w")
print("Done!") |
ps. the ProgressBar on the first |
Hm I assumed increasing parallelism for the reads would improve the problem on write too but I guess not (sorry should not have assumed that). I tried your script and saw the same. More specifically where I saw improvements was on something like this: read_plink(...)['call/genotype'].mean(axis=(1,2)).compute() With With The former takes about 3x as long. I'm not sure why the zarr writes need to be synchronized, but it looks like that might be the case: ds = read_plink(...)
dask.config.set(scheduler='processes')
with ProgressBar(), ResourceProfiler() as rprof, Profiler() as prof:
ds.to_zarr('/tmp/ukb_chr21.zarr', mode="w")
visualize([prof, rprof]) That or the lulls could be related to inter-worker communication after data for blocks has been fetched. Maybe some combination of both? I'm not sure. |
Super cool, thanks @eric-czech! Hopefully we can tune this behaviour out by taking more control of the zarr store we're writing to. Really useful data though, this is excellent. |
A quick update from the bgen side of the world: I ran a script to convert UKB chromosomes 21 and 22 to zarr using our current bgen-reader wrapper in sgkit-bgen (f80601b This is also using the multiprocessing scheduler and I saw a similar lack of CPU utilization without it much like the comments above when testing PLINK reads. At 14 hours for 36.5G, that's about 2.5G converted per hour and I'd like to compare that directly to the PLINK numbers @jeromekelleher shared in sgkit-dev/sgkit-plink#6 (45G / 6.5 hrs = ~7G per hr) but the schedulers were different as were the core/thread counts and the fact that I was writing out over the network rather than to local disk. These could be useful points of comparison for other experiments though. |
We'd like to see how the existing conversion pipelines work on large scale data. A good start would be to convert a convert a ~100G plink file to zarr. This should give us some information about how things are performing.
The text was updated successfully, but these errors were encountered: