Skip to content

Commit

Permalink
add feature generate report (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaojin3616 authored Dec 5, 2023
2 parents cc24863 + ff5d2b7 commit 33ec3c0
Show file tree
Hide file tree
Showing 99 changed files with 4,469 additions and 2,329 deletions.
34 changes: 25 additions & 9 deletions docker/bisheng/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ database_url:

# 缓存配置 redis://[[username]:[password]]@localhost:6379/0
redis_url: "redis://redis:6379/0"
redis_host: [("redis")]
redis_master:
redis_password:

environment:
env: dev
Expand All @@ -25,8 +28,7 @@ autogen_roles:
documentation: ""
AutoGenCustomRole:
documentation: ""

agents:
agents:
ZeroShotAgent:
documentation: "https://python.langchain.com/docs/modules/agents/how_to/custom_mrkl_agent"
JsonAgent:
Expand All @@ -42,6 +44,10 @@ agents:
SQLAgent:
documentation: ""
chains:
RuleBasedRouter:
documentation: ""
MultiRuleChain:
documentation: ""
TransformChain:
documentation: ""
MultiPromptChain:
Expand All @@ -54,8 +60,6 @@ chains:
documentation: ""
TransformChain:
documentation: ""
MultiRetrievalQA:
documentation: ""
SimpleSequentialChain:
documentation: ""
SequentialChain:
Expand Down Expand Up @@ -84,7 +88,15 @@ chains:
documentation: "https://python.langchain.com/docs/modules/chains/popular/chat_vector_db"
CombineDocsChain:
documentation: ""
# SummarizeDocsChain:
# documentation: ""
LoaderOutputChain:
documentation: ""
documentloaders:
CustomKVLoader:
documentation: ""
UniversalKVLoader:
documentation: ""
ElemUnstructuredLoaderV0:
documentation: ""
AirbyteJSONLoader:
Expand Down Expand Up @@ -138,6 +150,8 @@ documentloaders:
PDFWithSemanticLoader:
documentation: "https://python.langchain.com/docs/modules/data_connection/document_loaders/integrations/git"
embeddings:
OpenAIProxyEmbedding:
documentation: ""
OpenAIEmbeddings:
documentation: "https://python.langchain.com/docs/modules/data_connection/text_embedding/integrations/openai"
HuggingFaceEmbeddings:
Expand Down Expand Up @@ -362,13 +376,15 @@ output_parsers:
documentation: "https://python.langchain.com/docs/modules/model_io/output_parsers/structured"
ResponseSchema:
documentation: "https://python.langchain.com/docs/modules/model_io/output_parsers/structured"

RouterOutputParser:
documentation: ""

input_output:
Input:
VariableNode:
documentation: ""
InputNode:
documentation: ""
Output:
documentation: ""
InputFile:
InputFileNode:
documentation: ""


6 changes: 4 additions & 2 deletions src/backend/bisheng/api/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Router for base api
from bisheng.api.v1 import (chat_router, endpoints_router, flow_styles_router, flows_router,
knowledge_router, qa_router, server_router, skillcenter_router,
user_router, validate_router)
knowledge_router, qa_router, report_router, server_router,
skillcenter_router, user_router, validate_router, variable_router)
from bisheng.api.v2 import chat_router_rpc, knowledge_router_rpc, rpc_router_rpc
from fastapi import APIRouter

Expand All @@ -16,6 +16,8 @@
router.include_router(server_router)
router.include_router(user_router)
router.include_router(qa_router)
router.include_router(variable_router)
router.include_router(report_router)

router_rpc = APIRouter(prefix='/api/v2',)
router_rpc.include_router(knowledge_router_rpc)
Expand Down
66 changes: 63 additions & 3 deletions src/backend/bisheng/api/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@

from bisheng.api.v1.schemas import StreamData
from bisheng.database.base import get_session
from bisheng.database.models.role_access import AccessType, RoleAccess
from bisheng.database.models.variable_value import Variable
from bisheng.graph.graph.base import Graph
from bisheng.utils.logger import logger
from sqlmodel import select
from sqlalchemy import delete
from sqlmodel import Session, select

API_WORDS = ['api', 'key', 'token']

Expand Down Expand Up @@ -83,14 +86,14 @@ def build_flow(graph_data: dict,
}
yield str(StreamData(event='log', data=log_dict))
# # 如果存在文件,当前不操作文件,避免重复操作
if not process_file and chat_id is not None:
if not process_file and vertex.base_type == 'documentloaders':
template_dict = {
key: value
for key, value in vertex.data['node']['template'].items()
if isinstance(value, dict)
}
for key, value in template_dict.items():
if value.get('type') == 'file':
if value.get('type') == 'fileNode':
# 过滤掉文件
vertex.params[key] = ''

Expand Down Expand Up @@ -173,6 +176,8 @@ def build_flow_no_yield(graph_data: dict,
# 聊天窗口等flow 主动生成的vector 需要新建临时collection
# tmp_{chat_id}
if vertex.base_type == 'vectorstores':
# 注入user_name
vertex.params['user_name'] = kwargs.get('user_name') if kwargs else ''
# 知识库通过参数传参
if 'collection_name' in kwargs and 'collection_name' in vertex.params:
vertex.params['collection_name'] = kwargs['collection_name']
Expand All @@ -189,6 +194,9 @@ def build_flow_no_yield(graph_data: dict,
# es
vertex.params['index_name'] = f'tmp_{flow_id}_{chat_id if chat_id else 1}'

if vertex.base_type == 'chains' and 'retriever' in vertex.params:
vertex.params['user_name'] = kwargs.get('user_name') if kwargs else ''

vertex.build()
params = vertex._built_object_repr()
logger.debug(
Expand All @@ -214,3 +222,55 @@ def access_check(payload: dict, owner_user_id: int, target_id: int, type: Access
if owner_user_id != payload.get('user_id') and str(target_id) not in third_ids:
return False
return True


def get_L2_param_from_flow(flow_data: dict, flow_id: str,):
graph = Graph.from_payload(flow_data)
node_id = []
variable_ids = []
file_name = []
for node in graph.nodes:
if node.vertex_type in {'InputFileNode'}:
node_id.append(node.id)
file_name.append(node.params.get('file_type'))
elif node.vertex_type in {'VariableNode'}:
variable_ids.append(node.id)

session: Session = next(get_session())
db_variables = session.exec(select(Variable).where(Variable.flow_id == flow_id)).all()

old_file_ids = {variable.node_id: variable
for variable in db_variables if variable.value_type == 3}
update = []
delete_node_ids = []
try:
for index, id in enumerate(node_id):
if id in old_file_ids:
if file_name[index] != old_file_ids.get(id).variable_name:
old_file_ids.get(id).variable_name = file_name[index]
update.append(old_file_ids.get(id))
old_file_ids.pop(id)
else:
# file type
db_new_var = Variable(flow_id=flow_id, node_id=id,
variable_name=file_name[index], value_type=3)
update.append(db_new_var)
# delete variable which not delete by edit
old_variable_ids = {variable.node_id
for variable in db_variables if variable.value_type != 3}

if old_file_ids:
delete_node_ids.extend(list(old_file_ids.keys()))

delete_node_ids.extend(old_variable_ids.difference(set(variable_ids)))

if update:
[session.add(var) for var in update]
if delete_node_ids:
session.exec(delete(Variable).where(Variable.node_id.in_(delete_node_ids)))
session.commit()
return True
except Exception as e:
logger.exception(e)
session.rollback()
return False
4 changes: 4 additions & 0 deletions src/backend/bisheng/api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from bisheng.api.v1.flows import router as flows_router
from bisheng.api.v1.knowledge import router as knowledge_router
from bisheng.api.v1.qa import router as qa_router
from bisheng.api.v1.report import router as report_router
from bisheng.api.v1.server import router as server_router
from bisheng.api.v1.skillcenter import router as skillcenter_router
from bisheng.api.v1.user import router as user_router
from bisheng.api.v1.validate import router as validate_router
from bisheng.api.v1.variable import router as variable_router

__all__ = [
'chat_router',
Expand All @@ -20,4 +22,6 @@
'server_router',
'user_router',
'qa_router',
'variable_router',
'report_router',
]
42 changes: 25 additions & 17 deletions src/backend/bisheng/api/v1/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,22 @@ async def on_text(self, text: str, **kwargs: Any) -> Any:
else:
await self.websocket.send_json(log.dict())
await self.websocket.send_json(start.dict())
elif kwargs.get('type'):
start = ChatResponse(type='start', category=kwargs.get('type'))
end = ChatResponse(type='end', intermediate_steps=text, category=kwargs.get('type'))
await self.websocket.send_json(start.dict())
await self.websocket.send_json(end.dict())
elif 'category' in kwargs:
log = ChatResponse(message=text, type='stream')
await self.websocket.send_json(log.dict())
if 'autogen' == kwargs['category']:
log = ChatResponse(message=text, type='stream')
await self.websocket.send_json(log.dict())
if kwargs.get('type'):
# 兼容下
start = ChatResponse(type='start', category=kwargs.get('type'))
end = ChatResponse(type='end', intermediate_steps=text,
category=kwargs.get('type'))
await self.websocket.send_json(start.dict())
await self.websocket.send_json(end.dict())
else:
log = ChatResponse(message=text, intermediate_steps=kwargs['log'],
type=kwargs['type'], category=kwargs['category'])
await self.websocket.send_json(log.dict())
logger.debug(f'on_text text={text} kwargs={kwargs}')

async def on_agent_action(self, action: AgentAction, **kwargs: Any):
log = f'Thought: {action.log}'
Expand Down Expand Up @@ -246,16 +254,16 @@ def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
def on_chat_model_start(self, serialized: Dict[str, Any],
messages: List[List[BaseMessage]], **kwargs: Any) -> Any:
"""Run when retriever end running."""
sender = kwargs['sender']
receiver = kwargs['receiver']
content = messages[0][0] if isinstance(messages[0][0], str) else messages[0][0].get('content')
end = ChatResponse(message=f'{content}', type='end', sender=sender, recevier=receiver)
start = ChatResponse(type='start', sender=sender, recevier=receiver)
loop = asyncio.get_event_loop()
coroutine2 = self.websocket.send_json(end.dict())
coroutine3 = self.websocket.send_json(start.dict())
asyncio.run_coroutine_threadsafe(coroutine2, loop)
asyncio.run_coroutine_threadsafe(coroutine3, loop)
# sender = kwargs['sender']
# receiver = kwargs['receiver']
# content = messages[0][0] if isinstance(messages[0][0], str) else messages[0][0].get('content')
# end = ChatResponse(message=f'{content}', type='end', sender=sender, recevier=receiver)
# start = ChatResponse(type='start', sender=sender, recevier=receiver)
# loop = asyncio.get_event_loop()
# coroutine2 = self.websocket.send_json(end.dict())
# coroutine3 = self.websocket.send_json(start.dict())
# asyncio.run_coroutine_threadsafe(coroutine2, loop)
# asyncio.run_coroutine_threadsafe(coroutine3, loop)
logger.debug(f'on_chat result={messages}')

def on_text(self, text: str, **kwargs) -> Any:
Expand Down
20 changes: 7 additions & 13 deletions src/backend/bisheng/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def chat(flow_id: str,
payload = json.loads(Authorize.get_jwt_subject())
user_id = payload.get('user_id')
"""Websocket endpoint for chat."""
if type and type == 'L1':
if chat_id:
with next(get_session()) as session:
db_flow = session.get(Flow, flow_id)
if not db_flow:
Expand All @@ -127,18 +127,11 @@ async def chat(flow_id: str,
graph_data = json.loads(flow_data_store.hget(flow_data_key, 'graph_data'))

try:
process_file = False if chat_id else True
graph = build_flow_no_yield(graph_data=graph_data,
artifacts={},
process_file=process_file,
flow_id=UUID(flow_id).hex,
chat_id=chat_id)
langchain_object = graph.build()
for node in langchain_object:
key_node = get_cache_key(flow_id, chat_id, node.id)
chat_manager.set_cache(key_node, node._built_object)
chat_manager.set_cache(get_cache_key(flow_id, chat_id), node._built_object)
await chat_manager.handle_websocket(flow_id, chat_id, websocket, user_id)
if not chat_id:
# 调试时,每次都初始化对象
chat_manager.set_cache(get_cache_key(flow_id, chat_id), None)
await chat_manager.handle_websocket(flow_id, chat_id, websocket, user_id,
gragh_data=graph_data)
except WebSocketException as exc:
logger.error(exc)
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
Expand Down Expand Up @@ -311,6 +304,7 @@ async def event_stream(flow_id, chat_id: str):
yield str(StreamData(event='message', data=input_keys_response))
# We need to reset the chat history
chat_manager.chat_history.empty_history(flow_id, chat_id)
chat_manager.set_cache(get_cache_key(flow_id=flow_id, chat_id=chat_id), None)
flow_data_store.hsetkey(flow_data_key, 'status', BuildStatus.SUCCESS.value, expire)
except Exception as exc:
logger.exception(exc)
Expand Down
9 changes: 6 additions & 3 deletions src/backend/bisheng/api/v1/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def getn_env():
env['uns_support'] = uns_support
else:
env['uns_support'] = list(knowledge.filetype_load_map.keys())
if settings.settings.get_from_db('office_url'):
env['office_url'] = settings.settings.get_from_db('office_url')
return {'data': env}


Expand Down Expand Up @@ -97,7 +99,7 @@ async def process_flow(
"""
Endpoint to process an input with a given flow_id.
"""
if inputs and isinstance(inputs, dict):
if inputs and isinstance(inputs, dict) and 'id' in inputs:
inputs.pop('id')

try:
Expand Down Expand Up @@ -125,8 +127,9 @@ async def process_flow(
async def create_upload_file(file: UploadFile, flow_id: str):
# Cache file
try:
file_path = save_uploaded_file(file.file, folder_name=flow_id)

file_path = save_uploaded_file(file.file, folder_name=flow_id, file_name=file.filename)
if not isinstance(file_path, str):
file_path = str(file_path)
return UploadFileResponse(
flowId=flow_id,
file_path=file_path,
Expand Down
13 changes: 11 additions & 2 deletions src/backend/bisheng/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from typing import List
from uuid import UUID

from bisheng.api.utils import access_check, build_flow_no_yield, remove_api_keys
from bisheng.api.utils import (access_check, build_flow_no_yield, get_L2_param_from_flow,
remove_api_keys)
from bisheng.api.v1.schemas import FlowListCreate, FlowListRead
from bisheng.database.base import get_session
from bisheng.database.models.flow import Flow, FlowCreate, FlowRead, FlowReadWithStyle, FlowUpdate
Expand Down Expand Up @@ -117,7 +118,7 @@ def update_flow(*,
raise HTTPException(status_code=404, detail='Flow not found')

if not access_check(payload, db_flow.user_id, flow_id, AccessType.FLOW_WRITE):
raise HTTPException(status_code=500, detail='没有权限编辑此技能')
raise HTTPException(status_code=500, detail='No right access this flow')

flow_data = flow.dict(exclude_unset=True)

Expand All @@ -133,13 +134,21 @@ def update_flow(*,
logger.exception(exc)
raise HTTPException(status_code=500, detail=f'Flow 编译不通过, {str(exc)}')

if db_flow.status == 2 and ('status' not in flow_data or flow_data['status'] != 1):
raise HTTPException(status_code=500, detail='上线中技能,不支持修改')

if settings.remove_api_keys:
flow_data = remove_api_keys(flow_data)
for key, value in flow_data.items():
setattr(db_flow, key, value)
session.add(db_flow)
session.commit()
session.refresh(db_flow)
try:
if not get_L2_param_from_flow(db_flow.data, db_flow.id):
logger.error(f'flow_id={db_flow.id} extract file_node fail')
except Exception:
pass
return db_flow


Expand Down
Loading

0 comments on commit 33ec3c0

Please sign in to comment.