Skip to content
This repository has been archived by the owner on Aug 15, 2022. It is now read-only.

Commit

Permalink
Merge branch 'ofan/pyguppyclient-pyguppy_client_lib' into 'master'
Browse files Browse the repository at this point in the history
Ofan/pyguppyclient pyguppy client lib

See merge request platform/pyguppyclient!5
  • Loading branch information
iiSeymour committed Jul 1, 2020
2 parents 7f943a2 + 7641b22 commit 5352154
Show file tree
Hide file tree
Showing 30 changed files with 706 additions and 368 deletions.
32 changes: 4 additions & 28 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,20 @@ variables:
- apt-get update > /dev/null
- apt-get install -y python3-pip > /dev/null
- apt-get install -y --no-install-recommends ./guppy.deb > /dev/null
- python3 -m pip install -q -r requirements.txt -r development.txt
- python3 -m pip install ${PIP_OPTIONS} -q -r requirements.txt -r development.txt
- python3 setup.py develop
script:
- guppy_basecall_server -p 5555 -c ${CONFIG}.cfg --post_out -l /tmp/log ${GUPPY_ARGS} &
- sleep 15
- make test
- guppy_basecaller -c ${CONFIG}.cfg -p 5555 -s . -i tests/reads/testdata/
- guppy_basecaller -r -c ${CONFIG}.cfg -p 5555 -s . -i tests/reads/testdata/
tags:
- nvidia-docker

cpu:guppy:3.4.1:
cpu:guppy:4.0.11:
<<: *test

cpu:guppy:3.4.5:
<<: *test

cpu:guppy:3.5.2:
<<: *test

cpu:guppy:3.6.0:
<<: *test

gpu:guppy:3.4.1:
<<: *test
variables:
GUPPY_ARGS: "-x cuda:0"

gpu:guppy:3.4.5:
<<: *test
variables:
GUPPY_ARGS: "-x cuda:0"

gpu:guppy:3.5.2:
<<: *test
variables:
GUPPY_ARGS: "-x cuda:0"

gpu:guppy:3.6.0:
gpu:guppy:4.0.11:
<<: *test
variables:
GUPPY_ARGS: "-x cuda:0"
2 changes: 1 addition & 1 deletion examples/pyguppyclient
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def process(read, called, lock):
Example callback function for the Caller that writes fastq files
"""
with lock:
write_fastq(read.read_id, called.seq, called.qual, sys.stdout)
write_fastq(read['metadata']['read_id'], called.seq, called.qual, sys.stdout)


def main(args):
Expand Down
2 changes: 1 addition & 1 deletion pyguppyclient/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def basecall_batch(self, files):

done += 1
read, called = res
samples += read.total_samples - called.trimmed_samples
samples += called.trimmed_samples

if self.callback:
self.callback(read, called, self.lock)
Expand Down
161 changes: 86 additions & 75 deletions pyguppyclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Guppy Client
"""

from collections import deque
import time
import asyncio
import logging
Expand All @@ -14,7 +15,8 @@
from pyguppyclient.utils import parse_config
from pyguppyclient.ipc import simple_request, simple_response
from pyguppyclient.ipc import SimpleRequestType, SimpleReplyType
from pyguppyclient.decode import Config, raw_read_message
from pyguppyclient.decode import Config, PROTO_VERSION, pcl_called_read
from pyguppy_client_lib.client_lib import GuppyClient as PCLClient


logger = logging.getLogger("pyguppyclient")
Expand All @@ -34,6 +36,8 @@ def __init__(self, config_name, host="localhost", port=5555, timeout=0.1, retrie
self.socket.set(RCVTIMEO, 100)
self.socket.connect("tcp://%s:%s" % (host, port))
self.client_id = 0
self.pcl_client = PCLClient("%s:%s" % (host, port), self.config_name)
_init_pcl_client(self.pcl_client)

def __enter__(self):
self.connect()
Expand Down Expand Up @@ -66,23 +70,19 @@ def recv(self):
return message

def connect(self):
attempts = 0
while attempts < self.retries:
attempts += 1
try:
config = self._load_config(self.config_name)
res = self.send(SimpleRequestType.CONNECT, data=0, text=config.name)
self.client_id = res.Data()
return res
except ConnectionError:
time.sleep(self.timeout)

raise ConnectionError(
"Connect with '{}' failed after {} attempts".format(self.config_name, attempts)
)
result = self.pcl_client.result
self.pcl_client.clear_error_state()
ret = self.pcl_client.connect()
if ret == result.already_connected:
pass
elif ret != result.success:
raise ConnectionError(
"Connect with '{}' failed: {}".format(self.config_name,
self.pcl_client.get_error_message())
)

def disconnect(self):
return self.send(SimpleRequestType.DISCONNECT)
return self.pcl_client.disconnect()

def shut_down(self):
return self.send(SimpleRequestType.TERMINATE)
Expand All @@ -91,42 +91,29 @@ def get_configs(self):
res = self.send(SimpleRequestType.GET_CONFIGS)
return [res.Configs(i) for i in range(res.ConfigsLength())]

def _load_config(self, config_name):
loaded_configs = {Config(c).name: Config(c) for c in self.get_configs()}
if config_name not in loaded_configs:
response = self.send(SimpleRequestType.LOAD_CONFIG, text=config_name)
if response.Type() == SimpleReplyType.INVALID_CONFIG:
raise ValueError("'%s' could not be loaded by the server" % config_name)
n = 0
while n <= self.retries:
n += 1
loaded_configs = {Config(c).name: Config(c) for c in self.get_configs()}
if config_name in loaded_configs:
break
time.sleep(self.timeout)
else:
raise TimeoutError("Failed to load config '{}' after {} attempts".format(config_name, n))

return loaded_configs[config_name]

def get_statistics(self):
return self.send(SimpleRequestType.GET_STATISTICS)

def pass_read(self, read):
return self.send(raw_read_message(
self.client_id,
read.read_tag,
read.read_id,
read.daq_offset,
read.daq_scaling,
read.signal,
), simple=False)
read_dict = {
"read_tag": int(read.read_tag),
"read_id": str(read.read_id),
"daq_offset": float(read.daq_offset),
"daq_scaling": float(read.daq_scaling),
"raw_data": read.signal,
}
return self.pcl_client.pass_read(read_dict)


class GuppyBasecallerClient(GuppyClientBase):
"""
Blocking Guppy Basecall Client
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
self.read_cache = deque()

def basecall(self, read, state=False, trace=False):
"""
Basecall a `ReadData` object and get a `CalledReadData` object
Expand All @@ -140,7 +127,7 @@ def basecall(self, read, state=False, trace=False):
n += 1
result = self._get_called_read(state=state, trace=trace)
if result is not None:
return result[1]
return result
time.sleep(self.timeout)

raise TimeoutError(
Expand All @@ -151,17 +138,15 @@ def _get_called_read(self, state=False, trace=False):
"""
Get the `CalledReadData` object back from the server
"""
flag = (not trace) ^ state << 1
res = self.send(SimpleRequestType.GET_FIRST_CALLED_BLOCK, data=flag)
if res is None:
return

read, called = res
while not called.complete:
_, block = self.send(SimpleRequestType.GET_NEXT_CALLED_BLOCK, data=read.read_tag)
called += block
if len(self.read_cache) == 0:
reads, _ = self.pcl_client.get_completed_reads()
self.read_cache.extend(reads)

return read, called
try:
read = self.read_cache.pop()
return read, pcl_called_read(read)
except IndexError:
return


class GuppyAsyncClientBase:
Expand All @@ -178,9 +163,11 @@ def __init__(self, config=None, host='localhost', port=5555, sleep=0):
self.socket.set(zmq.LINGER, 0)
self.socket.set(zmq.RCVTIMEO, 500)
self.client_id = 0
self.pcl_client = PCLClient("%s:%s" % (host, port), self.config_name)
_init_pcl_client(self.pcl_client)

async def __aenter__(self):
await self.connect(self.config)
await self.connect()
return self

async def __aexit__(self, exception_type, exception_value, traceback):
Expand All @@ -202,12 +189,19 @@ async def send(self, message, data=None, text=None, simple=True):
return response

async def connect(self, config):
response = await self.send(SimpleRequestType.CONNECT, data=0, text=config)
await asyncio.sleep(self.sleep)
self.client_id = response.Data()
result = self.pcl_client.result
await self.pcl_client.clear_error_state()
ret = await self.pcl_client.connect()
if ret == result.already_connected:
pass
elif ret != result.success:
raise ConnectionError(
"Connect with '{}' failed: {}".format(self.config_name,
self.pcl_client.get_error_message())
)

async def disconnect(self):
await self.send(SimpleRequestType.DISCONNECT)
await self.pcl_client.disconnect()

async def get_configs(self):
res = await self.send(SimpleRequestType.GET_CONFIGS)
Expand All @@ -218,25 +212,42 @@ async def get_statistics(self):
return stats

async def pass_read(self, read):
return await self.send(raw_read_message(
self.client_id,
read.read_tag,
read.read_id,
read.daq_offset,
read.daq_scaling,
read.signal,
), simple=False
)
read_dict = {
"read_tag": int(read.read_tag),
"read_id": str(read.read_id),
"daq_offset": float(read.daq_offset),
"daq_scaling": float(read.daq_scaling),
"raw_data": read.signal,
}
return await self.pcl_client.pass_read(read_dict)

async def get_called_read(self, trace=False, state=False):
flag = (not trace) ^ state << 1
res = await self.send(SimpleRequestType.GET_FIRST_CALLED_BLOCK, data=flag)
"""
Get the `CalledReadData` object back from the server
"""
if len(self.read_cache) == 0:
reads, _ = await self.pcl_client.get_completed_reads()
self.read_cache.extend(reads)

if not res: return
try:
read = self.read_cache.pop()
return read, pcl_called_read(read)
except IndexError:
return

read, called = res
while not called.complete:
_, block = await self.send(SimpleRequestType.GET_NEXT_CALLED_BLOCK, data=read.read_tag)
called += block

return res
def _init_pcl_client(pcl_client):
"""
Perform basic initialisation of a pyguppy_client_lib client.
"""
pcl_proto_major_version = pcl_client.get_protocol_version()[0]
if pcl_proto_major_version != PROTO_VERSION[0]:
raise Exception("pyguppy_client_lib IPC major version {} does not "
"match pyguppyclient IPC major version {} -- "
"install correct version of "
"pyguppy_client_lib.".format(pcl_proto_major_version,
PROTO_VERSION[0]))
params = {
"max_reads_queued": 10000 # Number of reads the pcl_client can hold
}
pcl_client.set_params(params)
Loading

0 comments on commit 5352154

Please sign in to comment.