Skip to content

Commit

Permalink
[tk390] Tratamento de upload e download de arquivo grandes
Browse files Browse the repository at this point in the history
Criação de upload_file
Criação de download_file

Alteração em add_asset e update_asset
Se o servidor receber request.large_file_path em add_asset ou update_asset, registrará no SSM o conteúdo deste arquivo, ignorando o arquivo proveniente de request.file.

Alteração em get_asset
Se executar get_asset e o servidor identificar que asset.file é grande, retornará asset.file='' e asset.large_file_path = asset.file.path

No lado do cliente (opac_ssm_api), para arquivos grandes:
- a execução de add_asset e update_asset em duas etpas: primeiramente executando opac_ssm.upload_file que retorna o nome do arquivo grande e depois executando add_asset incluindo asset.large_file_path.
- a execução de get_asset também é em duas etapas: primeiramente executando opac_ssm.get_asset, depois executando opac_ssm.download_file, usando o arquivo identificado como asset.large_file_path.

Relacionado com
scieloorg/opac_ssm_api#185

Fixes scieloorg#390
  • Loading branch information
robertatakenaka committed Oct 22, 2018
1 parent e578762 commit 755f240
Show file tree
Hide file tree
Showing 6 changed files with 515 additions and 678 deletions.
52 changes: 50 additions & 2 deletions grpc_ssm/grpc_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import grpc

import opac_pb2
import opac_pb2_grpc

SLEEP_TIME = 5

Expand All @@ -17,7 +18,7 @@ def run():

channel = grpc.insecure_channel('localhost:5000')

stubAsset = opac_pb2.AssetServiceStub(channel)
stubAsset = opac_pb2_grpc.AssetServiceStub(channel)

###########################################################################
# Teste asset.add_asset
Expand Down Expand Up @@ -55,6 +56,52 @@ def run():

print("Agora vou verificando o estado da task: %s, %s" % (task.id, task_state))

###########################################################################
# Teste asset.get_asset (large file)
###########################################################################

asset = stubAsset.get_asset(opac_pb2.TaskId(id=task.id))

print("Retornando os dados do Asset: ")

print((task.id, task_state.state, task_info.url, task_info.url_path, asset))

###########################################################################
# Teste asset.add_asset (large file)
###########################################################################

print("Obtendo o arquivo para ser enviado pelo GRPC")

file = open('large_sample.txt', 'rb')

meta = '{"foo": "bar", "pickles": "blaus"}' # String

print("Envida os metadados %s como param metadata." % meta)

task = stubAsset.add_asset(
opac_pb2.Asset(file='', filename='artigo.txt',
type="txt", metadata=meta,
bucket="Bucket Sample",
large_file_path='artigo.txt'))

###########################################################################
# Teste asset.get_task_state (large file)
###########################################################################

print("ID da task: %s" % task.id)

task_state = stubAsset.get_task_state(opac_pb2.TaskId(id=task.id))

print("Verificando o estado da task: %s" % task_state)

print("Dormindo por %s segundos..." % SLEEP_TIME)

time.sleep(SLEEP_TIME) # sleep 10 seconds

task_state = stubAsset.get_task_state(opac_pb2.TaskId(id=task.id))

print("Agora vou verificando o estado da task: %s, %s" % (task.id, task_state))

###########################################################################
# Teste asset.get_asset_info
###########################################################################
Expand All @@ -64,7 +111,7 @@ def run():
print("Informações da url do asset: %s" % task_info)

###########################################################################
# Teste asset.get_asset
# Teste asset.get_asset (large file)
###########################################################################

asset = stubAsset.get_asset(opac_pb2.TaskId(id=task.id))
Expand Down Expand Up @@ -208,5 +255,6 @@ def run():

task = stubBucket.remove_bucket(opac_pb2.BucketName(name="Bucket Sample"))


if __name__ == '__main__':
run()
71 changes: 62 additions & 9 deletions grpc_ssm/grpc_server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python

import os
import tempfile
import time
import logging
import json
Expand All @@ -16,20 +17,64 @@
from assets_manager import tasks
from assets_manager import models

DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 90 * 1024 * 1024
DEFAULT_MAX_SEND_MESSAGE_LENGTH = 90 * 1024 * 1024
DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 5 * 1024 * 1024
DEFAULT_MAX_SEND_MESSAGE_LENGTH = 5 * 1024 * 1024


CHUNK_SIZE = 1024 * 1024 # 1MB
MAX_DOWNLOADED_FILE_SIZE = 2 * CHUNK_SIZE


def get_file_chunks(filename):
with open(filename, 'rb') as f:
while True:
piece = f.read(CHUNK_SIZE)
if len(piece) == 0:
return
yield opac_pb2.File(buffer=piece)


def save_chunks_to_file(chunks, filename):
with open(filename, 'wb') as f:
for chunk in chunks:
f.write(chunk.buffer)


def save_tmpfile(content, file_path=None):
if file_path is None:
tmphandle, file_path = tempfile.mkstemp()
with open(file_path, 'wb') as f:
f.write(content)
return file_path


def delete_temp_file(file_path):
try:
os.unlink(file_path)
except OSError:
logging.info('%s' % file_path)


class Asset(opac_pb2.AssetServiceServicer):

def upload_file(self, request_iterator, context):
tmphandle, tmppath = tempfile.mkstemp()
save_chunks_to_file(request_iterator, tmppath)
return tmppath

def download_file(self, request, context):
if request.large_file_path:
return get_file_chunks(request.large_file_path)

def add_asset(self, request, context):
"""
Return a task id
"""
if request.large_file_path:
request.file = open(request.large_file_path, 'rb').read()
task_result = tasks.add_asset.delay(request.file, request.filename,
request.type, request.metadata,
request.bucket)

return opac_pb2.TaskId(id=task_result.id)

def get_asset(self, request, context):
Expand All @@ -44,13 +89,17 @@ def get_asset(self, request, context):
raise
else:
try:
fp = open(asset.file.path, 'rb')
if os.path.getsize(asset.file.path) > MAX_DOWNLOADED_FILE_SIZE:
file_content = b''
large_file_path = asset.file.path
else:
file_content = open(asset.file.path, 'rb').read()
large_file_path = ''
except IOError as e:
logging.error(e)
context.set_details(e)
raise

return opac_pb2.Asset(file=fp.read(),
return opac_pb2.Asset(file=file_content,
filename=asset.filename,
type=asset.type,
metadata=json.dumps(asset.metadata),
Expand All @@ -60,13 +109,15 @@ def get_asset(self, request, context):
absolute_url=asset.get_absolute_url,
full_absolute_url=asset.get_full_absolute_url,
created_at=asset.created_at.isoformat(),
updated_at=asset.updated_at.isoformat())
updated_at=asset.updated_at.isoformat(),
large_file_path=large_file_path)

def update_asset(self, request, context):
"""
Return a task id
"""

if request.large_file_path:
request.file = open(request.large_file_path, 'rb').read()
task_result = tasks.update_asset.delay(request.uuid, request.file,
request.filename, request.type,
request.metadata, request.bucket)
Expand Down Expand Up @@ -265,6 +316,8 @@ def serve(host='[::]', port=5000, max_workers=4,
servicer.set('', health_pb2.HealthCheckResponse.SERVING)

# Asset
servicer.set('download_file', health_pb2.HealthCheckResponse.SERVING)
servicer.set('upload_file', health_pb2.HealthCheckResponse.SERVING)
servicer.set('get_asset', health_pb2.HealthCheckResponse.SERVING)
servicer.set('add_asset', health_pb2.HealthCheckResponse.SERVING)
servicer.set('update_asset', health_pb2.HealthCheckResponse.SERVING)
Expand Down
1 change: 1 addition & 0 deletions grpc_ssm/large_sample.txt

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions grpc_ssm/opac.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ syntax = "proto3";
service AssetService {
rpc get_asset(TaskId) returns (Asset) {}
rpc add_asset(Asset) returns (TaskId) {}
rpc upload_file(stream File) returns (Reply) {}
rpc download_file(Request) returns (stream File) {}
rpc update_asset(Asset) returns (TaskId) {}
rpc remove_asset(TaskId) returns (AssetRemoved) {}
rpc exists_asset(TaskId) returns (AssetExists) {}
Expand Down Expand Up @@ -32,6 +34,19 @@ message Asset {
string full_absolute_url = 9;
string created_at = 10;
string updated_at = 11;
string large_file_path = 12;
}

message File {
bytes file = 1; // file of the asset
}

message Request {
string large_file_path = 1;
}

message Reply {
string large_file_path = 1;
}

message Assets{
Expand Down
Loading

0 comments on commit 755f240

Please sign in to comment.