-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_balance.py
85 lines (70 loc) · 2.06 KB
/
main_balance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Основной файл приложения."""
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from fastapi import FastAPI
from jaeger_client import Config
from pydantic_settings import BaseSettings
from starlette.middleware.base import BaseHTTPMiddleware
from config.config import BALANCE_APP_PORT
from credit_card_balance.src.middlewares import tracing_middleware
from credit_card_balance.src.routers import (
balance_router,
readiness,
transactions_router,
)
class Settings(BaseSettings):
"""Конфигурация приложения."""
app_host: str = '0.0.0.0' # noqa: F401, S104
app_port: int = BALANCE_APP_PORT
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Контекст для работы приложения.
В нашем случае используется для Егеря.
Args:
app: Приложение.
Yields:
Контекст приложения.
"""
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': 'jaeger-agent.monitoring.svc.cluster.local', # noqa: E501
'reporting_port': 6831,
},
'logging': True,
},
service_name='gran_balance',
validate=True,
)
tracer = config.initialize_tracer()
yield {'jaeger_tracer': tracer}
app = FastAPI()
executor = ProcessPoolExecutor(max_workers=1)
app.include_router(
balance_router.router,
prefix='/api',
tags=['balance'],
)
app.include_router(
transactions_router.router,
prefix='/api',
tags=['transactions'],
)
app.include_router(readiness.router, tags=['readiness'])
app.add_middleware(
BaseHTTPMiddleware,
dispatch=tracing_middleware,
)
if __name__ == '__main__':
import uvicorn # noqa: WPS433
settings = Settings()
uvicorn.run(
app,
host=settings.app_host,
port=settings.app_port,
)