Advanced Use
This section describes how to implement a server that goes beyond the standard functionalities
provided by BioDM
.
Custom resource
As previously explained, a resource is a Table
, Schema
, Controller
triplet, internally
connected around a Service
finally communicating with one or more Manager
.
Depending on the level of complexity brought in the custom feature you would like to implement, you will have to fine tune, from one up to all, of those components together.
Following sections are assuming you are weaving a valid Table and Schema as explained in earlier parts of the documentation.
A bit about Internals
Standard pipeline for an incoming request goes like this:
->Server->Middlewares->Controller->Service(s)->Manager(s)
Controller is the BioDM
component that manages a set of routes.
In order to avoid most mistakes with Controllers it is recommended to visit
Starlette<https://www.starlette.io/>_ documentation.
Bread and butter for resources are ResourceController
and DatabaseService
. You may add a
feature by extending them.
Keycloack Groups: a case study
/users
and /groups
are internal examples of custom resources.
What we want with them is to have a synchronized copy of those resources from keycloak.
Additionally, since those resources may be created directly on keycloak by administrators,
it is desirable that passing a primary key is sufficient for the api instance to discover the full
resource.
Basic operations and routes stay the same, so the controller is not changing. Following is the code snippet that manages group creation.
Basic idea is to query
our keycloak server before creating a new group to ensure its existence and recover its keycloak
UUID that is necessary for some operations. Then populate it in the data dictionary that is
ultimately sent down to DatabaseService
(here the composite case) that will handle
insert/update statement building and execution.
class KCService(CompositeEntityService):
"""Abstract class for local keycloak entities."""
@classproperty
def kc(cls) -> KeycloakManager:
"""Return KCManager instance."""
return cls.app.kc
@abstractmethod
async def update(self, remote_id: str, data: Dict[str, Any]):
raise NotImplementedError
async def sync(self, remote: Dict[str, Any], data: Dict[str, Any]):
"""Sync Keycloak and input data."""
inter = remote.keys() & (
set(c.name for c in self.table.__table__.columns) - self.table.pk
)
fill = {
key: remote[key] for key in inter if key not in data.keys()
}
update = {
key: data[key] for key in inter
if data.get(key, None) and data.get(key, None) != remote.get(key, None)
}
if update:
await self.update(remote['id'], update)
data.update(fill)
@abstractmethod
async def read_or_create(self, data: Dict[str, Any], /) -> None:
"""Query entity from keycloak, create it in case it does not exists, update in case it does.
Populates data with resulting id and/or found information."""
raise NotImplementedError
class KCGroupService(KCService):
@staticmethod
def kcpath(path) -> pathlib.Path:
"""Compute keycloak path from api path."""
return pathlib.Path("/" + path.replace("__", "/"))
async def update(self, remote_id: str, data: Dict[str, Any]):
return await self.kc.update_group(group_id=remote_id, data=data)
async def read_or_create(self, data: Dict[str, Any]) -> None:
"""READ group from keycloak, CREATE if missing, UPDATE if exists.
:param data: Group data
:type data: Dict[str, Any]
"""
path = self.kcpath(data['path'])
group = await self.kc.get_group_by_path(str(path))
if group:
await self.sync(group, data)
return
parent_id = None
if not path.parent.parts == ('/',):
parent = await self.kc.get_group_by_path(str(path.parent))
if not parent:
raise DataError("Input path does not match any parent group.")
parent_id = parent['id']
data['id'] = await self.kc.create_group(path.name, parent_id)
async def write(
self,
data: List[Dict[str, Any]] | Dict[str, Any],
stmt_only: bool = False,
user_info: UserInfo | None = None,
**kwargs
):
"""Create entities on Keycloak Side before passing to parent class for DB."""
# Check permissions beforehand.
await self._check_permissions("write", user_info, data)
# Create on keycloak side
for group in to_it(data):
# Group first.
await self.read_or_create(group)
# Then Users.
for user in group.get("users", []):
await User.svc.read_or_create(user, [group["path"]], [group["id"]],)
# Send to DB
# Not passing user_info, which gives unrestricted permissions as check happens above.
return await super().write(data, stmt_only=stmt_only, **kwargs)
Extending: Prefix vs. Postfix
The above example describes a Prefix feature extension.
Meaning, modifications are taking place before data gets inserted into the DB. This is the way to go in case you do not need a handle on DB objects/session.
A prefix feature extension shall make sure that the data dictionary sent down to DatabaseService
is respecting tables integrity.
On the other hand, a Postfix feature extension happens after data gets inserted. This is the way to go in case you need to access entity relationships, database generated ids, and so on.
Nothing prevents you from doing both at the same time.
S3 Files: a case study
For small files (i.e. <=100MB), an API instance will generate self sufficient presigned post urls. Those contain a callback, which allows the S3 storage to inform us directly on success of an upload. Evidently it is unique to each file and thus needs the key, which is an autoincrement field in our example.
This callback and downloading features are two extra endpoints that we extend on
a ResourceController
.
from biodm.routing import Route, PublicRoute
class S3Controller(ResourceController):
"""Controller for entities involving file management leveraging an S3Service."""
def _infer_svc(self) -> Type[S3Service]:
"""Attach our new service type via _infer_svc method."""
if not issubclass(self.table, S3File):
raise ImplementionError(
"S3Controller should be attached on a table inheriting"
" from biodm.component.S3File"
)
return S3Service
def routes(self, **_) -> List[Mount | Route] | List[Mount] | List[BaseRoute]:
"""Add an endpoint for successful file uploads and direct download."""
# flake8: noqa: E501 pylint: disable=line-too-long
prefix = f'{self.prefix}/{self.qp_id}/'
file_routes = [
Route(f'{prefix}download', self.download, methods=[HttpMethod.GET]),
PublicRoute(f'{prefix}post_success', self.post_success, methods=[HttpMethod.GET]),
...
]
# Set an extra attribute for later.
self.post_upload_callback = Path(file_routes[1].path)
return file_routes + super().routes()
async def download(self, request: Request):
"""Returns boto3 presigned download URL with a redirect header.
---
description: Returns a download presigned URL to retrieve file from s3 bucket.
parameters:
- in: path
name: id
responses:
307:
description: Download URL, with a redirect header.
"""
return RedirectResponse(
await self.svc.download(
pk_val=self._extract_pk_val(request),
user_info=await UserInfo(request),
)
)
async def post_success(self, request: Request):
""" Used as a callback in the s3 presigned upload urls that are emitted.
Uppon receival, update entity status in the DB.
---
description: File upload callback - hit by s3 bucket on success upload.
parameters:
- in: path
name: id
responses:
201:
description: Upload confirmation 'Uploaded.'
"""
await self.svc.post_success(
pk_val=self._extract_pk_val(request),
)
return json_response("Uploaded.", status_code=201)
- Following we implement the expected custom service, with the following requirements:
populate a unique
upload form
upon creating a new/files
resourceHandled in
_insert
and_insert_list
methods which is the postfix way
implement
post_success
that registers a success of uploadimplement
download
in order to return direct upload URL to clients
A lot of that code has to do with retrieving async SQLAlchemy objects attributes.
class S3Service(CompositeEntityService):
"""Class for automatic management of S3 bucket transactions for file resources."""
@classproperty
def s3(cls) -> S3Manager:
return cls.app.s3
def post_callback(self, item) -> str:
mapping = { # Map primary key values to route elements.
key: getattr(item, key)
for key in self.table.pk
}
# Access controller via table.
route = str(self.table.ctrl.post_upload_callback)
for key, val in mapping.items():
route = route.replace("{" + f"{key}" + "}", str(val))
srv = self.app.server_endpoint.strip('/')
return f"{srv}{route}"
async def gen_key(self, item, session: AsyncSession):
"""Generate the unique bucket key from file elements."""
# Fetch necessary fields from DB.
await session.refresh(item, ['filename', 'extension'])
version = ""
if self.table.is_versioned:
await session.refresh(item, ['version'])
version = "_v" + str(item.version)
# Custom key prefix mechanism.
key_salt = await getattr(item.awaitable_attrs, 'key_salt')
if iscoroutine(key_salt):
item.__dict__['session'] = session
key_salt = await item.key_salt
return f"{key_salt}_{item.filename}{version}.{item.extension}"
async def gen_upload_form(self, file: S3File, session: AsyncSession):
"""Populates an Upload for a newly created file. Handling simple post and multipart_upload
cases.
:param file: New file
:type file: S3File
:param session: current session
:type session: AsyncSession
"""
assert isinstance(file, S3File) # mypy.
# Use a proxy Upload table that also handles large files.
file.upload = Upload()
# Flushing is necessary to generate an id.
session.add(file.upload)
await session.flush()
parts = await getattr(file.upload.awaitable_attrs, 'parts')
key = await self.gen_key(file, session=session)
parts.append(
UploadPart(
upload_id=file.upload.id,
form=str(
self.s3.create_presigned_post(
object_name=key,
callback=self.post_callback(file)
)
)
)
)
@DatabaseManager.in_session
async def post_success(self, pk_val: List[Any], session: AsyncSession):
""""""
file = await self.read(pk_val, fields=['ready', 'upload'], session=session)
file.validated_at = utcnow()
file.ready = True
file.upload_id, file.upload = None, None
@DatabaseManager.in_session
async def download(
self, pk_val: List[Any], user_info: UserInfo | None, session: AsyncSession
) -> str:
"""Get File entry from DB, and return a direct download url.
:param pk_val: key
:type pk_val: List[Any]
:param user_info: requesting user info
:type user_info: UserInfo | None
:param session: current db session
:type session: AsyncSession
:raises FileNotUploadedError: File entry exists but has not been validated yet
:return: direct download url.
:rtype: str
"""
# File management fields.
fields = ['filename', 'extension', 'dl_count', 'ready']
# Also fetch foreign keys, as some may be necessary for permission check below.
fields += list(c.name for c in self.table.__table__.columns if c.foreign_keys)
# Shall raise an error if given file doesn't exists.
file = await self.read(pk_val, fields=fields, session=session)
assert isinstance(file, S3File) # mypy.
await self._check_permissions("download", user_info, file.__dict__, session=session)
if not file.ready:
raise FileNotUploadedError("File exists but has not been uploaded yet.")
url = self.s3.create_presigned_download_url(await self.gen_key(file, session=session))
file.dl_count += 1
return url
@DatabaseManager.in_session
async def _insert(
self,
stmt: Insert,
user_info: UserInfo | None,
session: AsyncSession
) -> (Any | None):
"""INSERT special case for file: populate url after getting entity id."""
file = await super()._insert(stmt, user_info=user_info, session=session)
await self.gen_upload_form(file, session=session)
return file
@DatabaseManager.in_session
async def _insert_list(
self,
stmts: Sequence[Insert],
user_info: UserInfo | None,
session: AsyncSession
) -> Sequence[Base]:
"""INSERT many objects into the DB database, check token write permission before commit."""
files = await super()._insert_list(stmts, user_info=user_info, session=session)
for file in files:
await self.gen_upload_form(file, session=session)
return files
@DatabaseManager.in_session
async def release(
self,
pk_val: List[Any],
fields: List[str],
update: Dict[str, Any],
session: AsyncSession,
user_info: UserInfo | None = None,
) -> Base:
"""Important: Also override /files/{id}/release behaviour."""
# Bumps version.
file = await super().release(
pk_val=pk_val,
fields=fields,
update=update,
session=session,
user_info=user_info
)
# Reset special fields.
file.created_at = utcnow()
file.validated_at = None
file.ready = False
file.dl_count = 0
# Generate a new form.
await self.gen_upload_form(file, session=session)
return file
Routing and Auth
As shown in the S3Controller
example above, BioDM
provides two
Routes
class: PublicRoute
and Route
.
In case you are defining your own routes you should use those ones instead of
starlette’s Route
.
Ultimately, this allows to use the config parameter REQUIRE_AUTH
which when set to True
will require authentication on all endpoints routed
with simple Routes
while leaving endpoints marked with PublicRoute
public.
This distinction can be important as in the example above, s3 bucket is not authenticated
when sending us a successful notice of file upload.