retuve.app.utils

  1# Copyright 2024 Adam McArthur
  2#
  3# Licensed under the Apache License, Version 2.0 (the "License");
  4# you may not use this file except in compliance with the License.
  5# You may obtain a copy of the License at
  6#
  7#     http://www.apache.org/licenses/LICENSE-2.0
  8#
  9# Unless required by applicable law or agreed to in writing, software
 10# distributed under the License is distributed on an "AS IS" BASIS,
 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12# See the License for the specific language governing permissions and
 13# limitations under the License.
 14
 15import asyncio
 16import hashlib
 17import json
 18import os
 19import secrets
 20from datetime import datetime
 21
 22from fastapi import (
 23    APIRouter,
 24    Depends,
 25    Header,
 26    HTTPException,
 27    Request,
 28    Response,
 29)
 30from httpx import AsyncClient
 31
 32from retuve.funcs import retuve_run
 33
 34
 35def generate_token():
 36    return secrets.token_urlsafe(32)
 37
 38
 39def consistent_hash(value, mod):
 40    # Convert the input to a string and encode it
 41    encoded_value = str(value).encode()
 42    # Use a reproducible hash function (e.g., SHA-256)
 43    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
 44    return hash_value % mod
 45
 46
 47TOKEN_STORE = {}
 48API_TOKEN_STORE = {}
 49
 50
 51async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
 52    # Save the DICOM file in the appropriate directory
 53    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
 54    if dicom_content is not None:
 55        # if path already exists, return
 56        if os.path.exists(dicom_path):
 57            return
 58        with open(dicom_path, "wb") as f:
 59            f.write(dicom_content)
 60
 61    # Process the DICOM
 62    result = await asyncio.to_thread(
 63        retuve_run,
 64        hip_mode=config.batch.hip_mode,
 65        config=config,
 66        modes_func=config.batch.mode_func,
 67        modes_func_kwargs_dict={},
 68        file=dicom_path,
 69    )
 70
 71    return result
 72
 73
 74async def save_results(instance_id, savedir, result=None, just_paths=False):
 75    """
 76    Saves DICOM content, video, image, and metrics results for a given instance ID.
 77
 78    :param instance_id: The unique ID of the DICOM instance.
 79    :param result: The result object returned by `retuve_run`.
 80    :param savedir: The base directory for saving the results.
 81    :param just_paths: Whether to return just the paths of the saved results.
 82    """
 83    result_dir = f"{savedir}/{instance_id}"
 84    os.makedirs(result_dir, exist_ok=True)
 85
 86    video_path = f"{result_dir}/video.mp4"
 87    img_path = f"{result_dir}/img.jpg"
 88    metrics_path = f"{result_dir}/metrics.json"
 89
 90    if just_paths:
 91        return video_path, img_path, metrics_path
 92
 93    # Save video result if available
 94    if result.video_clip:
 95        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
 96
 97    # Save image result if available
 98    if result.image:
 99        await asyncio.to_thread(result.image.save, img_path)
100
101    # Save metrics if available
102    if result.metrics:
103        with open(metrics_path, "w") as f:
104            json.dump(result.metrics, f)
105
106    return video_path, img_path, metrics_path
107
108
109async def get_sorted_dicom_images(
110    orthanc_url, username=None, password=None, latest_time=None
111):
112    """
113    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
114
115    :param orthanc_url: The Orthanc server URL.
116    :param username: Username for authentication (optional).
117    :param password: Password for authentication (optional).
118    :param latest_time: Filter for acquisition times after this datetime.
119    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
120    """
121
122    auth = (username, password) if username and password else None
123    images_with_dates = []
124    latest_acq_time = latest_time or datetime.min
125
126    async with AsyncClient() as client:
127        patients_response = await client.get(f"{orthanc_url}/patients", auth=auth)
128        for patient_id in patients_response.json():
129            studies_response = await client.get(
130                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
131            )
132            for study in studies_response.json():
133                study_id = study["ID"]
134                series_response = await client.get(
135                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
136                )
137                for series in series_response.json():
138                    series_id = series["ID"]
139                    instances_response = await client.get(
140                        f"{orthanc_url}/series/{series_id}/instances",
141                        auth=auth,
142                    )
143                    for instance in instances_response.json():
144                        instance_id = instance["ID"]
145                        metadata_response = await client.get(
146                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
147                            auth=auth,
148                        )
149                        metadata = metadata_response.json()
150
151                        # Remove files that are not multiframe US's
152                        if not (
153                            metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1"
154                            and int(metadata.get("NumberOfFrames", 0)) > 1
155                        ):
156                            continue
157
158                        acq_date, acq_time = metadata.get(
159                            "AcquisitionDate"
160                        ), metadata.get("AcquisitionTime")
161                        if acq_date and acq_time:
162                            acq_datetime = datetime.strptime(
163                                f"{acq_date}{acq_time.split('.')[0]}",
164                                "%Y%m%d%H%M%S",
165                            )
166
167                            images_with_dates.append(
168                                (acq_datetime, instance_id, metadata)
169                            )
170
171        final_images_with_dates = []
172
173        if len(images_with_dates) == 0:
174            return [], latest_acq_time
175
176        for acq_datetime, instance_id, metadata in images_with_dates:
177            if acq_datetime > latest_acq_time:
178                file_response = await client.get(
179                    f"{orthanc_url}/instances/{instance_id}/file",
180                    auth=auth,
181                )
182
183                # hash the main instance_id to be smaller
184                small_id = consistent_hash(instance_id, 10**8)
185                patient_id_real = metadata.get("PatientID")
186                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
187
188                latest_acq_time = max(latest_acq_time, acq_datetime)
189
190                final_images_with_dates.append(
191                    (acq_datetime, file_response.content, instance_id)
192                )
193
194        images_with_dates.sort(key=lambda x: x[0])
195        latest_image = images_with_dates[-1]
196
197        # inject file_response.content into latest_image
198        file_response = await client.get(
199            f"{orthanc_url}/instances/{latest_image[1]}/file",
200            auth=auth,
201        )
202
203        instance_id = latest_image[1]
204        acq_datetime = latest_image[0]
205        metadata = latest_image[2]
206        small_id = consistent_hash(instance_id, 10**8)
207        patient_id_real = metadata.get("PatientID")
208        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
209
210        latest_image = (
211            acq_datetime,
212            file_response.content,
213            instance_id,
214        )
215
216        if not final_images_with_dates:
217            final_images_with_dates = [latest_image]
218
219    return [
220        (image, instance_id) for _, image, instance_id in final_images_with_dates
221    ], latest_acq_time
222
223
224class UnauthorizedException(Exception):
225    """Raised when the user is unauthorized and should be redirected."""
226
227    pass
228
229
230def validate_auth_token(auth_token: str):
231    token_data = TOKEN_STORE.get(auth_token)
232    if not token_data or token_data["expires"] < datetime.utcnow():
233        raise UnauthorizedException
234    return token_data["username"]
235
236
237def validate_api_token(api_token: str):
238    token_data = API_TOKEN_STORE.get(api_token)
239    if not token_data or token_data["expires"] < datetime.utcnow():
240        raise HTTPException(status_code=403, detail="Invalid API token.")
241    return token_data["username"]
def generate_token():
36def generate_token():
37    return secrets.token_urlsafe(32)
def consistent_hash(value, mod):
40def consistent_hash(value, mod):
41    # Convert the input to a string and encode it
42    encoded_value = str(value).encode()
43    # Use a reproducible hash function (e.g., SHA-256)
44    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
45    return hash_value % mod
TOKEN_STORE = {}
API_TOKEN_STORE = {}
async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
52async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
53    # Save the DICOM file in the appropriate directory
54    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
55    if dicom_content is not None:
56        # if path already exists, return
57        if os.path.exists(dicom_path):
58            return
59        with open(dicom_path, "wb") as f:
60            f.write(dicom_content)
61
62    # Process the DICOM
63    result = await asyncio.to_thread(
64        retuve_run,
65        hip_mode=config.batch.hip_mode,
66        config=config,
67        modes_func=config.batch.mode_func,
68        modes_func_kwargs_dict={},
69        file=dicom_path,
70    )
71
72    return result
async def save_results(instance_id, savedir, result=None, just_paths=False):
 75async def save_results(instance_id, savedir, result=None, just_paths=False):
 76    """
 77    Saves DICOM content, video, image, and metrics results for a given instance ID.
 78
 79    :param instance_id: The unique ID of the DICOM instance.
 80    :param result: The result object returned by `retuve_run`.
 81    :param savedir: The base directory for saving the results.
 82    :param just_paths: Whether to return just the paths of the saved results.
 83    """
 84    result_dir = f"{savedir}/{instance_id}"
 85    os.makedirs(result_dir, exist_ok=True)
 86
 87    video_path = f"{result_dir}/video.mp4"
 88    img_path = f"{result_dir}/img.jpg"
 89    metrics_path = f"{result_dir}/metrics.json"
 90
 91    if just_paths:
 92        return video_path, img_path, metrics_path
 93
 94    # Save video result if available
 95    if result.video_clip:
 96        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
 97
 98    # Save image result if available
 99    if result.image:
100        await asyncio.to_thread(result.image.save, img_path)
101
102    # Save metrics if available
103    if result.metrics:
104        with open(metrics_path, "w") as f:
105            json.dump(result.metrics, f)
106
107    return video_path, img_path, metrics_path

Saves DICOM content, video, image, and metrics results for a given instance ID.

Parameters
  • instance_id: The unique ID of the DICOM instance.
  • result: The result object returned by retuve_run.
  • savedir: The base directory for saving the results.
  • just_paths: Whether to return just the paths of the saved results.
async def get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
110async def get_sorted_dicom_images(
111    orthanc_url, username=None, password=None, latest_time=None
112):
113    """
114    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
115
116    :param orthanc_url: The Orthanc server URL.
117    :param username: Username for authentication (optional).
118    :param password: Password for authentication (optional).
119    :param latest_time: Filter for acquisition times after this datetime.
120    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
121    """
122
123    auth = (username, password) if username and password else None
124    images_with_dates = []
125    latest_acq_time = latest_time or datetime.min
126
127    async with AsyncClient() as client:
128        patients_response = await client.get(f"{orthanc_url}/patients", auth=auth)
129        for patient_id in patients_response.json():
130            studies_response = await client.get(
131                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
132            )
133            for study in studies_response.json():
134                study_id = study["ID"]
135                series_response = await client.get(
136                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
137                )
138                for series in series_response.json():
139                    series_id = series["ID"]
140                    instances_response = await client.get(
141                        f"{orthanc_url}/series/{series_id}/instances",
142                        auth=auth,
143                    )
144                    for instance in instances_response.json():
145                        instance_id = instance["ID"]
146                        metadata_response = await client.get(
147                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
148                            auth=auth,
149                        )
150                        metadata = metadata_response.json()
151
152                        # Remove files that are not multiframe US's
153                        if not (
154                            metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1"
155                            and int(metadata.get("NumberOfFrames", 0)) > 1
156                        ):
157                            continue
158
159                        acq_date, acq_time = metadata.get(
160                            "AcquisitionDate"
161                        ), metadata.get("AcquisitionTime")
162                        if acq_date and acq_time:
163                            acq_datetime = datetime.strptime(
164                                f"{acq_date}{acq_time.split('.')[0]}",
165                                "%Y%m%d%H%M%S",
166                            )
167
168                            images_with_dates.append(
169                                (acq_datetime, instance_id, metadata)
170                            )
171
172        final_images_with_dates = []
173
174        if len(images_with_dates) == 0:
175            return [], latest_acq_time
176
177        for acq_datetime, instance_id, metadata in images_with_dates:
178            if acq_datetime > latest_acq_time:
179                file_response = await client.get(
180                    f"{orthanc_url}/instances/{instance_id}/file",
181                    auth=auth,
182                )
183
184                # hash the main instance_id to be smaller
185                small_id = consistent_hash(instance_id, 10**8)
186                patient_id_real = metadata.get("PatientID")
187                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
188
189                latest_acq_time = max(latest_acq_time, acq_datetime)
190
191                final_images_with_dates.append(
192                    (acq_datetime, file_response.content, instance_id)
193                )
194
195        images_with_dates.sort(key=lambda x: x[0])
196        latest_image = images_with_dates[-1]
197
198        # inject file_response.content into latest_image
199        file_response = await client.get(
200            f"{orthanc_url}/instances/{latest_image[1]}/file",
201            auth=auth,
202        )
203
204        instance_id = latest_image[1]
205        acq_datetime = latest_image[0]
206        metadata = latest_image[2]
207        small_id = consistent_hash(instance_id, 10**8)
208        patient_id_real = metadata.get("PatientID")
209        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
210
211        latest_image = (
212            acq_datetime,
213            file_response.content,
214            instance_id,
215        )
216
217        if not final_images_with_dates:
218            final_images_with_dates = [latest_image]
219
220    return [
221        (image, instance_id) for _, image, instance_id in final_images_with_dates
222    ], latest_acq_time

Fetch and sort DICOM images from an Orthanc server based on acquisition time.

Parameters
  • orthanc_url: The Orthanc server URL.
  • username: Username for authentication (optional).
  • password: Password for authentication (optional).
  • latest_time: Filter for acquisition times after this datetime.
Returns

A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).

class UnauthorizedException(builtins.Exception):
225class UnauthorizedException(Exception):
226    """Raised when the user is unauthorized and should be redirected."""
227
228    pass

Raised when the user is unauthorized and should be redirected.

def validate_auth_token(auth_token: str):
231def validate_auth_token(auth_token: str):
232    token_data = TOKEN_STORE.get(auth_token)
233    if not token_data or token_data["expires"] < datetime.utcnow():
234        raise UnauthorizedException
235    return token_data["username"]
def validate_api_token(api_token: str):
238def validate_api_token(api_token: str):
239    token_data = API_TOKEN_STORE.get(api_token)
240    if not token_data or token_data["expires"] < datetime.utcnow():
241        raise HTTPException(status_code=403, detail="Invalid API token.")
242    return token_data["username"]