Advanced Use

This section describes how to implement a server that goes beyond the standard functionalities provided by BioDM.

Custom resource

As previously explained, a resource is a Table, Schema, Controller triplet, internally connected around a Service finally communicating with one or more Manager.

Depending on the level of complexity brought in the custom feature you would like to implement, you will have to fine tune, from one up to all, of those components together.

Following sections are assuming you are weaving a valid Table and Schema as explained in earlier parts of the documentation.

A bit about Internals

Standard pipeline for an incoming request goes like this:

->Server->Middlewares->Controller->Service(s)->Manager(s)

Controller is the BioDM component that manages a set of routes. In order to avoid most mistakes with Controllers it is recommended to visit Starlette<https://www.starlette.io/>_ documentation.

Bread and butter for resources are ResourceController and DatabaseService. You may add a feature by extending them.

Keycloack Groups: a case study

/users and /groups are internal examples of custom resources. What we want with them is to have a synchronized copy of those resources from keycloak. Additionally, since those resources may be created directly on keycloak by administrators, it is desirable that passing a primary key is sufficient for the api instance to discover the full resource.

Basic operations and routes stay the same, so the controller is not changing. Following is the code snippet that manages group creation.

Basic idea is to query our keycloak server before creating a new group to ensure its existence and recover its keycloak UUID that is necessary for some operations. Then populate it in the data dictionary that is ultimately sent down to DatabaseService (here the composite case) that will handle insert/update statement building and execution.

kcservice.py
class KCService(CompositeEntityService):
    """Abstract class for local keycloak entities."""
    @classproperty
    def kc(cls) -> KeycloakManager:
        """Return KCManager instance."""
        return cls.app.kc

    @abstractmethod
    async def update(self, remote_id: str, data: Dict[str, Any]):
        raise NotImplementedError

    async def sync(self, remote: Dict[str, Any], data: Dict[str, Any]):
        """Sync Keycloak and input data."""
        inter = remote.keys() & (
            set(c.name for c in self.table.__table__.columns) - self.table.pk
        )
        fill = {
            key: remote[key] for key in inter if key not in data.keys()
        }
        update = {
            key: data[key] for key in inter
            if data.get(key, None) and data.get(key, None) != remote.get(key, None)
        }
        if update:
            await self.update(remote['id'], update)
        data.update(fill)

    @abstractmethod
    async def read_or_create(self, data: Dict[str, Any], /) -> None:
        """Query entity from keycloak, create it in case it does not exists, update in case it does.
        Populates data with resulting id and/or found information."""
        raise NotImplementedError


class KCGroupService(KCService):
    @staticmethod
    def kcpath(path) -> pathlib.Path:
        """Compute keycloak path from api path."""
        return pathlib.Path("/" + path.replace("__", "/"))

    async def update(self, remote_id: str, data: Dict[str, Any]):
        return await self.kc.update_group(group_id=remote_id, data=data)

    async def read_or_create(self, data: Dict[str, Any]) -> None:
        """READ group from keycloak, CREATE if missing, UPDATE if exists.

        :param data: Group data
        :type data: Dict[str, Any]
        """
        path = self.kcpath(data['path'])
        group = await self.kc.get_group_by_path(str(path))

        if group:
            await self.sync(group, data)
            return

        parent_id = None
        if not path.parent.parts == ('/',):
            parent = await self.kc.get_group_by_path(str(path.parent))
            if not parent:
                raise DataError("Input path does not match any parent group.")
            parent_id = parent['id']

        data['id'] = await self.kc.create_group(path.name, parent_id)

    async def write(
        self,
        data: List[Dict[str, Any]] | Dict[str, Any],
        stmt_only: bool = False,
        user_info: UserInfo | None = None,
        **kwargs
    ):
        """Create entities on Keycloak Side before passing to parent class for DB."""
        # Check permissions beforehand.
        await self._check_permissions("write", user_info, data)

        # Create on keycloak side
        for group in to_it(data):
            # Group first.
            await self.read_or_create(group)
            # Then Users.
            for user in group.get("users", []):
                await User.svc.read_or_create(user, [group["path"]], [group["id"]],)

        # Send to DB
        # Not passing user_info, which gives unrestricted permissions as check happens above.
        return await super().write(data, stmt_only=stmt_only, **kwargs)

Extending: Prefix vs. Postfix

The above example describes a Prefix feature extension.

Meaning, modifications are taking place before data gets inserted into the DB. This is the way to go in case you do not need a handle on DB objects/session.

A prefix feature extension shall make sure that the data dictionary sent down to DatabaseService is respecting tables integrity.

On the other hand, a Postfix feature extension happens after data gets inserted. This is the way to go in case you need to access entity relationships, database generated ids, and so on.

Nothing prevents you from doing both at the same time.

S3 Files: a case study

For small files (i.e. <=100MB), an API instance will generate self sufficient presigned post urls. Those contain a callback, which allows the S3 storage to inform us directly on success of an upload. Evidently it is unique to each file and thus needs the key, which is an autoincrement field in our example.

This callback and downloading features are two extra endpoints that we extend on a ResourceController.

s3controller.py
from biodm.routing import Route, PublicRoute

class S3Controller(ResourceController):
    """Controller for entities involving file management leveraging an S3Service."""
    def _infer_svc(self) -> Type[S3Service]:
        """Attach our new service type via _infer_svc method."""
        if not issubclass(self.table, S3File):
            raise ImplementionError(
                "S3Controller should be attached on a table inheriting"
                " from biodm.component.S3File"
            )
        return S3Service

    def routes(self, **_) -> List[Mount | Route] | List[Mount] | List[BaseRoute]:
        """Add an endpoint for successful file uploads and direct download."""
        # flake8: noqa: E501  pylint: disable=line-too-long
        prefix = f'{self.prefix}/{self.qp_id}/'
        file_routes = [
            Route(f'{prefix}download',           self.download,           methods=[HttpMethod.GET]),
            PublicRoute(f'{prefix}post_success', self.post_success,       methods=[HttpMethod.GET]),
            ...
        ]
        # Set an extra attribute for later.
        self.post_upload_callback = Path(file_routes[1].path)

        return file_routes + super().routes()

    async def download(self, request: Request):
        """Returns boto3 presigned download URL with a redirect header.

        ---

        description: Returns a download presigned URL to retrieve file from s3 bucket.
        parameters:
        - in: path
            name: id
        responses:
            307:
                description: Download URL, with a redirect header.
        """
        return RedirectResponse(
            await self.svc.download(
                pk_val=self._extract_pk_val(request),
                user_info=await UserInfo(request),
            )
        )

    async def post_success(self, request: Request):
        """ Used as a callback in the s3 presigned upload urls that are emitted.
            Uppon receival, update entity status in the DB.

        ---

        description: File upload callback - hit by s3 bucket on success upload.
        parameters:
        - in: path
            name: id
        responses:
            201:
                description: Upload confirmation 'Uploaded.'
        """
        await self.svc.post_success(
            pk_val=self._extract_pk_val(request),
        )

        return json_response("Uploaded.", status_code=201)
Following we implement the expected custom service, with the following requirements:
  • populate a unique upload form upon creating a new /files resource

    • Handled in _insert and _insert_list methods which is the postfix way

  • implement post_success that registers a success of upload

  • implement download in order to return direct upload URL to clients

A lot of that code has to do with retrieving async SQLAlchemy objects attributes.

s3service.py
class S3Service(CompositeEntityService):
    """Class for automatic management of S3 bucket transactions for file resources."""
    @classproperty
    def s3(cls) -> S3Manager:
        return cls.app.s3

    def post_callback(self, item) -> str:
        mapping = { # Map primary key values to route elements.
            key: getattr(item, key)
            for key in self.table.pk
        }

        # Access controller via table.
        route = str(self.table.ctrl.post_upload_callback)
        for key, val in mapping.items():
            route = route.replace("{" + f"{key}" + "}", str(val))

        srv = self.app.server_endpoint.strip('/')
        return f"{srv}{route}"

    async def gen_key(self, item, session: AsyncSession):
        """Generate the unique bucket key from file elements."""
        # Fetch necessary fields from DB.
        await session.refresh(item, ['filename', 'extension'])
        version = ""
        if self.table.is_versioned:
            await session.refresh(item, ['version'])
            version = "_v" + str(item.version)
        # Custom key prefix mechanism.
        key_salt = await getattr(item.awaitable_attrs, 'key_salt')
        if iscoroutine(key_salt):
            item.__dict__['session'] = session
            key_salt = await item.key_salt
        return f"{key_salt}_{item.filename}{version}.{item.extension}"

    async def gen_upload_form(self, file: S3File, session: AsyncSession):
        """Populates an Upload for a newly created file. Handling simple post and multipart_upload
        cases.

        :param file: New file
        :type file: S3File
        :param session: current session
        :type session: AsyncSession
        """
        assert isinstance(file, S3File) # mypy.

        # Use a proxy Upload table that also handles large files.
        file.upload = Upload()
        # Flushing is necessary to generate an id.
        session.add(file.upload)
        await session.flush()
        parts = await getattr(file.upload.awaitable_attrs, 'parts')

        key = await self.gen_key(file, session=session)
        parts.append(
            UploadPart(
                upload_id=file.upload.id,
                form=str(
                    self.s3.create_presigned_post(
                        object_name=key,
                        callback=self.post_callback(file)
                    )
                )
            )
        )

    @DatabaseManager.in_session
    async def post_success(self, pk_val: List[Any], session: AsyncSession):
        """"""
        file = await self.read(pk_val, fields=['ready', 'upload'], session=session)
        file.validated_at = utcnow()
        file.ready = True
        file.upload_id, file.upload = None, None

    @DatabaseManager.in_session
    async def download(
        self, pk_val: List[Any], user_info: UserInfo | None, session: AsyncSession
    ) -> str:
        """Get File entry from DB, and return a direct download url.

        :param pk_val: key
        :type pk_val: List[Any]
        :param user_info: requesting user info
        :type user_info: UserInfo | None
        :param session: current db session
        :type session: AsyncSession
        :raises FileNotUploadedError: File entry exists but has not been validated yet
        :return: direct download url.
        :rtype: str
        """
        # File management fields.
        fields = ['filename', 'extension', 'dl_count', 'ready']
        # Also fetch foreign keys, as some may be necessary for permission check below.
        fields += list(c.name for c in self.table.__table__.columns if c.foreign_keys)
        # Shall raise an error if given file doesn't exists.
        file = await self.read(pk_val, fields=fields, session=session)

        assert isinstance(file, S3File) # mypy.

        await self._check_permissions("download", user_info, file.__dict__, session=session)

        if not file.ready:
            raise FileNotUploadedError("File exists but has not been uploaded yet.")

        url = self.s3.create_presigned_download_url(await self.gen_key(file, session=session))
        file.dl_count += 1
        return url

    @DatabaseManager.in_session
    async def _insert(
        self,
        stmt: Insert,
        user_info: UserInfo | None,
        session: AsyncSession
    ) -> (Any | None):
        """INSERT special case for file: populate url after getting entity id."""
        file = await super()._insert(stmt, user_info=user_info, session=session)
        await self.gen_upload_form(file, session=session)
        return file

    @DatabaseManager.in_session
    async def _insert_list(
        self,
        stmts: Sequence[Insert],
        user_info: UserInfo | None,
        session: AsyncSession
    ) -> Sequence[Base]:
        """INSERT many objects into the DB database, check token write permission before commit."""
        files = await super()._insert_list(stmts, user_info=user_info, session=session)
        for file in files:
            await self.gen_upload_form(file, session=session)
        return files

    @DatabaseManager.in_session
    async def release(
        self,
        pk_val: List[Any],
        fields: List[str],
        update: Dict[str, Any],
        session: AsyncSession,
        user_info: UserInfo | None = None,
    ) -> Base:
        """Important: Also override /files/{id}/release behaviour."""
        # Bumps version.
        file = await super().release(
            pk_val=pk_val,
            fields=fields,
            update=update,
            session=session,
            user_info=user_info
        )
        # Reset special fields.
        file.created_at = utcnow()
        file.validated_at = None
        file.ready = False
        file.dl_count = 0
        # Generate a new form.
        await self.gen_upload_form(file, session=session)
        return file

Routing and Auth

As shown in the S3Controller example above, BioDM provides two Routes class: PublicRoute and Route.

In case you are defining your own routes you should use those ones instead of starlette’s Route.

Ultimately, this allows to use the config parameter REQUIRE_AUTH which when set to True will require authentication on all endpoints routed with simple Routes while leaving endpoints marked with PublicRoute public. This distinction can be important as in the example above, s3 bucket is not authenticated when sending us a successful notice of file upload.