Files
bstest_py/main.py
Ivan Vazhenin e8c9f01639 First commit
2024-01-21 20:03:54 +03:00

57 lines
1.2 KiB
Python

import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
import uvicorn
from fastapi import FastAPI
from strawberry.fastapi import GraphQLRouter
import os
import sys
import time
import strawberry
lib_path = '/home/ashatora/src/bptest/cmake-build-debug'
sys.path.append(lib_path)
from libbptest import *
def blocking_function():
sc = SomeClass('qwe')
sc.test_time(10)
return sc.name
async def resolve_hello() -> str:
loop = asyncio.get_event_loop()
logging.warning(loop)
res = await loop.run_in_executor(executor, blocking_function)
return res
async def resolve_func() -> str:
return 'Hello World!'
@strawberry.type
class Query:
func1: str = strawberry.field(resolver=resolve_hello)
func2: str = strawberry.field(resolver=resolve_func)
# async def main():
schema = strawberry.Schema(Query)
executor = ProcessPoolExecutor(max_workers=1)
graphgl_app = GraphQLRouter(schema=schema)
app = FastAPI(debug=True)
app.include_router(graphgl_app, prefix="/graphql")
uvicorn.run(app, host="0.0.0.0", port=9000)
#
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# try:
# loop.run_until_complete(main())
# finally:
# loop.run_until_complete(loop.shutdown_asyncgens())
# loop.close()