Source code for conjuror.plans.beam

from abc import ABC
from dataclasses import dataclass
from typing import Generic, Sequence, Iterable

import numpy as np
import plotly.io as pio
from plotly import graph_objects as go
from plotly import subplots
from pydicom import Sequence as DicomSequence, Dataset
from scipy.interpolate import make_interp_spline

from conjuror.images.simulators import Imager
from conjuror.plans.machine import TMachine, MachineSpecs, GantryDirection, FluenceMode
from conjuror.utils import wrap180


@dataclass
class _BeamLimitingDevice:
    """Helper class intended to facilitate access to BeamLimitingDevices."""

    number_of_leaf_pairs: int
    leaf_position_boundaries: list[float]
    # This maps an imager rows to a given leaf (e.g. rows 450-470 -> leaf #10)
    row_to_leaf_map: np.ndarray
    leaves_a: np.ndarray
    leaves_b: np.ndarray


class BeamVisualizationMixin:
    """This Mixin class adds functionality to visualize beams"""

    def generate_fluence(
        self: "Beam",
        imager: Imager,
        interpolation_factor: int = 100,
    ) -> np.ndarray:
        """Generate the fluence map from the RT Plan.

        Parameters
        ----------
        imager : Imager
            The imager to use to generate the images. This provides the
            size of the image and the pixel size.
        interpolation_factor : int
            Interpolation factor to increase control points resolution.

        Returns
        -------
        np.ndarray
            The fluence map. Will be the same shape as the imager.
        """
        x = imager.pixel_size * (np.arange(imager.shape[1]) - (imager.shape[1] - 1) / 2)
        y = imager.pixel_size * (np.arange(imager.shape[0]) - (imager.shape[0] - 1) / 2)

        # Store MLC data in a single dictionary
        bldseq = self.beam_limiting_device_sequence
        blds = dict[str, _BeamLimitingDevice]()
        for key, positions in self.beam_limiting_device_positions.items():
            if "MLC" not in key:
                continue
            bld = next(blds for blds in bldseq if blds.RTBeamLimitingDeviceType == key)
            blds[key] = _BeamLimitingDevice(
                bld.NumberOfLeafJawPairs,
                bld.LeafPositionBoundaries,
                np.argmax(np.array([bld.LeafPositionBoundaries]).T - y > 0, axis=0) - 1,
                positions[bld.NumberOfLeafJawPairs :, :],
                positions[: bld.NumberOfLeafJawPairs, :],
            )

        # Interpolate data
        num_cp = self.number_of_control_points  # before interpolation
        num_cp_ = interpolation_factor * (num_cp - 1) + 1  # after interpolation
        t = range(num_cp)  # abscissas for interpolation (used t since x is imager axis)
        t_ = np.linspace(0, num_cp - 1, num_cp_)  # evaluated abscissas
        metersets = make_interp_spline(t, self.metersets, k=1)(t_)
        for bld in blds.values():
            bld.leaves_a = make_interp_spline(t, bld.leaves_a, k=1, axis=1)(t_)
            bld.leaves_b = make_interp_spline(t, bld.leaves_b, k=1, axis=1)(t_)

        meterset_per_cp = np.diff(metersets, prepend=0)
        fluence = np.zeros(imager.shape)
        for cp_idx in range(1, num_cp_):
            stack_fluences = list()
            for bld in blds.values():
                leaves_b = bld.leaves_b[:, cp_idx : cp_idx + 1]
                leaves_a = bld.leaves_a[:, cp_idx : cp_idx + 1]
                mu = meterset_per_cp[cp_idx]
                # The mask contains the fluence boolean values, where the y-axis corresponds
                # to the leaves and the x-axis indicates whether a given pixel is irradiated.
                stack_compact = (x > leaves_b) & (x <= leaves_a)
                # This loop expands stack_compact into the full size image stack_fluence
                stack_fluence = np.zeros(imager.shape)
                for row in range(len(y)):
                    leaf = bld.row_to_leaf_map[row]
                    if leaf < 0:
                        continue
                    stack_fluence[row, stack_compact[leaf, :]] = mu
                stack_fluences.append(stack_fluence)
            cp_fluence = np.min(stack_fluences, axis=0)
            fluence += cp_fluence

        # Jaws
        blds = self.beam_limiting_device_positions
        jaws_x = next(val for key, val in blds.items() if key in ["ASYMX", "X"])
        jaws_y = next(val for key, val in blds.items() if key in ["ASYMY", "Y"])
        if np.any(np.diff(jaws_x, axis=1)) or np.any(np.diff(jaws_y, axis=1)):
            raise ValueError("The jaws must be static")
        fluence[:, (x < jaws_x[0, 0]) | (x > jaws_x[1, 0])] = 0
        fluence[(y < jaws_y[0, 0]) | (y > jaws_y[1, 0]), :] = 0

        return fluence

    def plot_fluence(
        self: "Beam",
        imager: Imager,
        interpolation_factor: int = 100,
        show: bool = True,
    ) -> go.Figure:
        """Plot the fluence map from the RT Beam.

        Parameters
        ----------
        imager : Imager
            The imager to use to generate the images. This provides the
            size of the image and the pixel size.
        interpolation_factor : int
            Interpolation factor to increase control points resolution.
        show : bool, optional
            Whether to show the plots. Default is True.
        """
        fluence = self.generate_fluence(imager, interpolation_factor)
        fig = go.Figure()
        fig.add_heatmap(
            z=fluence,
            colorscale="Viridis",
            colorbar=dict(title="MU"),
            showscale=True,
        )
        fig.update_layout(
            title=f"Fluence Map - {self.beam_name}",
        )
        fig.update_yaxes(scaleanchor="x", scaleratio=1)
        if show:
            fig.show()
        return fig

    def animate_mlc(self: "Beam", show: bool = True) -> go.Figure:
        """Plot the MLC positions as animation.

        Parameters
        ----------
        show : bool, optional
            Whether to show the plot. Default is True.
        """
        _leaf_length = 200
        jaw_line_width = 2
        mlc_line_width = 2
        blds = {
            bld.RTBeamLimitingDeviceType: bld
            for bld in self.beam_limiting_device_sequence
            if "MLC" in bld.RTBeamLimitingDeviceType
        }

        frames = []
        for cp_idx in range(self.number_of_control_points):
            mlc_shapes = []
            jaw_shapes = []
            for key, positions in self.beam_limiting_device_positions.items():
                if key in ["X", "ASYMX"]:
                    x1 = go.Scatter(
                        x=2 * [positions[0, cp_idx]],
                        y=[-1000, 1000],
                        mode="lines",
                        line=dict(width=jaw_line_width, color="orange"),
                    )
                    x2 = go.Scatter(
                        x=2 * [positions[1, cp_idx]],
                        y=[-1000, 1000],
                        mode="lines",
                        line=dict(width=jaw_line_width, color="orange"),
                    )
                    jaw_shapes.append(x1)
                    jaw_shapes.append(x2)
                if key in ["Y", "ASYMY"]:
                    y1 = go.Scatter(
                        x=[-1000, 1000],
                        y=2 * [positions[0, cp_idx]],
                        mode="lines",
                        line=dict(width=jaw_line_width, color="orange"),
                    )
                    y2 = go.Scatter(
                        x=[-1000, 1000],
                        y=2 * [positions[1, cp_idx]],
                        mode="lines",
                        line=dict(width=jaw_line_width, color="orange"),
                    )
                    jaw_shapes.append(y1)
                    jaw_shapes.append(y2)
                if "MLC" not in key:
                    continue

                # MLC
                num_leaf_pairs = blds[key].NumberOfLeafJawPairs
                for leaf in range(num_leaf_pairs):
                    y1 = blds[key].LeafPositionBoundaries[leaf]
                    y2 = blds[key].LeafPositionBoundaries[leaf + 1]
                    y = np.array([y1, y1, y2, y2, y1])

                    pos_b = positions[leaf, cp_idx]
                    x_b = pos_b + _leaf_length * np.array([-1, 0, 0, -1, -1])
                    rect_b = go.Scatter(
                        x=x_b,
                        y=y,
                        mode="lines",
                        line=dict(width=mlc_line_width, color="blue"),
                    )

                    pos_a = positions[leaf + num_leaf_pairs, cp_idx]
                    x_a = pos_a + _leaf_length * np.array([0, 1, 1, 0, 0])
                    rect_a = go.Scatter(
                        x=x_a,
                        y=y,
                        mode="lines",
                        line=dict(width=mlc_line_width, color="blue"),
                    )

                    mlc_shapes.append(rect_b)
                    mlc_shapes.append(rect_a)

            shapes = mlc_shapes + jaw_shapes
            frame = go.Frame(data=shapes, name=f"cp_{cp_idx}")
            frames.append(frame)
        data = frames[0].data
        layout = go.Layout(
            showlegend=False,
            title=f"Beam: {self.beam_name}",
            xaxis=dict(range=[-200, 200]),
            yaxis=dict(range=[-200, 200], scaleanchor="x", scaleratio=1),
            updatemenus=[
                {
                    "type": "buttons",
                    "buttons": [
                        {
                            "label": "Play",
                            "method": "animate",
                            "args": [
                                None,
                                {
                                    # "frame": {"duration": 50},
                                    # "transition": {"duration": 0},
                                    "fromcurrent": True,
                                },
                            ],
                        }
                    ],
                    "pad": {"r": 10, "t": 50},
                    "x": 0,
                    "y": 0,
                }
            ],
            sliders=[
                {
                    "currentvalue": {"prefix": "Control point: "},
                    "steps": [
                        {
                            "label": f"{i}",
                            "method": "animate",
                            "args": [
                                [f"cp_{i}"],
                                {
                                    "mode": "immediate",
                                    "transition": {"duration": 0},
                                },
                            ],
                        }
                        for i in range(len(frames))
                    ],
                    "pad": {"b": 10, "t": 50},
                }
            ],
        )
        fig = go.Figure(data=data, frames=frames, layout=layout)

        if show:
            fig.show()

        return fig

    def plot_control_points(
        self: "Beam", specs: MachineSpecs, show: bool = True
    ) -> go.Figure:
        """Plot the control points from dynamic beam
        Rows: Absolute position, relative motion, time to deliver, speed
        Cols: Dose, Gantry, MLC
        """
        # This is used mostly for visual inspection during development
        # Axis labeling could be improved

        self.compute_dynamics(specs)

        def _plot(
            as_line: bool,
            data: np.ndarray,
            reuse_axis: bool = False,
            title: str = "",
            y_label: str = "",
        ) -> None:
            """helper function for plotting"""
            if not reuse_axis:
                idx[0] += 1

            r, c = np.unravel_index(idx[0], (num_rows, num_cols))
            row, col = int(r) + 1, int(c) + 1  # Plotly subplots are 1-based
            x_data = self.metersets if as_line else self.metersets[:-1]
            shape = "linear" if as_line else "hv"
            color = colorway[1] if reuse_axis else colorway[0]
            trace = go.Scatter(
                x=x_data,
                y=data,
                mode="lines",
                line={"shape": shape, "color": color},
            )
            fig.add_trace(trace, row=row, col=col)

            if not reuse_axis:
                fig.layout.annotations[idx[0]].text = title
                fig.update_yaxes(title_text=y_label, row=row, col=col)

        colorway = pio.templates[pio.templates.default].layout.colorway
        idx = [-1]
        num_rows, num_cols = 4, 3
        spt = [" "] * (num_rows * num_cols)
        fig = subplots.make_subplots(rows=num_rows, cols=num_cols, subplot_titles=spt)
        fig.update_layout(showlegend=False)

        # Positions
        _plot(True, self.metersets, title="MU", y_label="Absolute")
        _plot(True, self.gantry_angles, title="Gantry")
        _plot(True, self.beam_limiting_device_positions["MLCX"][0, :], title="MLC")
        _plot(True, self.beam_limiting_device_positions["MLCX"][-1, :], reuse_axis=True)

        # Motions
        _plot(False, self.dose_motions, y_label="Motion")
        _plot(False, self.gantry_motions)
        _plot(False, self.mlc_motions[0, :])
        _plot(False, self.mlc_motions[-1, :], reuse_axis=True)

        # Time to deliver
        _plot(False, self.time_to_deliver, y_label="Delivery time")
        _plot(False, self.time_to_deliver)
        _plot(False, self.time_to_deliver)

        # Speeds
        _plot(False, self.dose_speeds * 60, y_label="Speed")
        _plot(False, self.gantry_speeds)
        _plot(False, self.mlc_speeds[0, :])
        _plot(False, self.mlc_speeds[-1, :], reuse_axis=True)

        if show:
            fig.show()

        return fig


class BeamDynamicsMixin:
    """This Mixin class adds functionalities to calculate dynamic parameters of the beam.

    Nomenclature:

    _motion: difference between consecutive control points
    _speed: speed = _motion/time
    """

    time_to_deliver: np.ndarray
    dose_motions: np.ndarray
    gantry_motions: np.ndarray
    mlc_motions: np.ndarray
    dose_speeds: np.ndarray
    gantry_speeds: np.ndarray
    mlc_speeds: np.ndarray

    def compute_dynamics(self: "Beam", specs: MachineSpecs) -> None:
        # motions
        # note: this is currently hardcoded for TrueBeam. Changes are necessary for Halcyon
        self.dose_motions = np.abs(np.diff(self.metersets))
        gantry_angle_var = (180 - self.gantry_angles) % 360
        self.gantry_motions = np.abs(np.diff(gantry_angle_var))
        mlc_positions = self.beam_limiting_device_positions["MLCX"]
        self.mlc_motions = np.diff(mlc_positions, axis=1)

        # ttd = time to deliver
        ttd_dose = self.dose_motions / (self.dose_rate / 60)
        ttd_gantry = self.gantry_motions / specs.max_gantry_speed
        ttd_mlc = self.mlc_motions / specs.max_mlc_speed
        times_to_deliver = np.vstack((ttd_dose, ttd_gantry, ttd_mlc))
        self.time_to_deliver = np.max(np.abs(times_to_deliver), axis=0)

        # speeds
        # dose_speed is the same as dose_rate but in MU/sec
        self.dose_speeds = self.dose_motions / self.time_to_deliver
        self.gantry_speeds = self.gantry_motions / self.time_to_deliver
        self.mlc_speeds = self.mlc_motions / self.time_to_deliver


[docs] class Beam(Generic[TMachine], BeamDynamicsMixin, BeamVisualizationMixin, ABC): """Represents a DICOM beam dataset. Has methods for creating the dataset and adding control points.""" ROUNDING_DECIMALS = 6 def __init__( self, beam_limiting_device_sequence: DicomSequence, beam_name: str, energy: float, fluence_mode: FluenceMode, dose_rate: int, metersets: Sequence[float], gantry_angles: float | Sequence[float], coll_angle: float, beam_limiting_device_positions: dict[str, list], couch_vrt: float, couch_lat: float, couch_lng: float, couch_rot: float, ): """ Parameters ---------- beam_limiting_device_sequence : DicomSequence The beam_limiting_device_sequence as defined in the template plan. beam_name : str The name of the beam. Must be less than 16 characters. energy : float The energy of the beam. fluence_mode : FluenceMode The fluence mode of the beam. dose_rate : int The dose rate of the beam. metersets : Sequence[float] The meter sets for each control point. gantry_angles : Union[float, Sequence[float]] The gantry angle(s) of the beam. If a single number, it's assumed to be a static beam. If multiple numbers, it's assumed to be a dynamic beam. coll_angle : float The collimator angle. beam_limiting_device_positions : dict[str, list] The positions of the beam_limiting_device_positions for each control point, where key is the type of beam limiting device (e.g. "MLCX") and the value contains the positions. couch_vrt : float The couch vertical position. couch_lat : float The couch lateral position. couch_lng : float The couch longitudinal position. couch_rot : float The couch rotation. """ if len(beam_name) > 16: raise ValueError("Beam name must be less than or equal to 16 characters") # Private attributes used for dicom creation only self._fluence_mode = fluence_mode self._energy = energy self._couch_vrt = couch_vrt self._couch_lat = couch_lat self._couch_lng = couch_lng # Public attributes (storing only) self.dose_rate = dose_rate self.coll_angle = coll_angle self.couch_rot = couch_rot # Public attributes (used outside dicom scope, e.g. for plotting) # For easier manipulation all variable are stored as np.ndarray of size num_cp, # if the axis are static they are replicated to fit the array. self.beam_name = beam_name self.beam_meterset = np.round(metersets[-1], self.ROUNDING_DECIMALS) self.number_of_control_points = len(metersets) self.beam_limiting_device_sequence = beam_limiting_device_sequence if not isinstance(gantry_angles, Iterable): gantry_angles = [gantry_angles] * self.number_of_control_points self.metersets = np.array(metersets) self.gantry_angles = np.array(gantry_angles) self.beam_limiting_device_positions = dict() for key, positions in beam_limiting_device_positions.items(): rep = self.number_of_control_points if len(positions) == 1 else 1 bld = np.array(rep * positions).T self.beam_limiting_device_positions[key] = bld @classmethod def from_dicom(cls, ds: Dataset, beam_idx: int): """Load a beam from an RT plan dataset Parameters ---------- ds : Dataset The dataset of the RT Plan. beam_idx : int The index of the beam to be loaded (zero indexed, i.e. beam #1 -> ind #0). """ if ds.Modality != "RTPLAN": raise ValueError("File is not an RTPLAN file") if beam_idx >= ds.FractionGroupSequence[0].NumberOfBeams: msg = "beam_idx is largen that the number of beams in the plan (note: use zero indexing)." raise ValueError(msg) mu = ds.FractionGroupSequence[0].ReferencedBeamSequence[beam_idx].BeamMeterset beam = ds.BeamSequence[beam_idx] bld = beam.BeamLimitingDeviceSequence name = beam.BeamName fluence_mode = FluenceMode.STANDARD pfms = beam.get("PrimaryFluenceModeSequence") if pfms and pfms[0].get("FluenceMode") == "NON_STANDARD": match pfms.FluenceModeID: case "FFF": fluence_mode = FluenceMode.FFF case "SRS": fluence_mode = FluenceMode.SRS case _: raise ValueError("FluenceModeID must be either FFF or SRS") cp0 = beam.ControlPointSequence[0] energy = cp0.NominalBeamEnergy dose_rate = cp0.DoseRateSet coll_angle = cp0.BeamLimitingDeviceAngle couch_vrt = cp0.TableTopVerticalPosition couch_lat = cp0.TableTopLateralPosition couch_lng = cp0.TableTopLongitudinalPosition couch_rot = cp0.TableTopEccentricAngle # Initial control point gantry_angles = [cp0.GantryAngle] cmws = [cp0.CumulativeMetersetWeight] bldp = { bld.RTBeamLimitingDeviceType: [bld.LeafJawPositions] for bld in cp0.BeamLimitingDevicePositionSequence } # for the next control points the concept is: append new if exists, # otherwise append a copy of the previous control point for idx in range(1, beam.NumberOfControlPoints): cp = beam.ControlPointSequence[idx] try: gantry_angles.append(cp.GantryAngle) except AttributeError: gantry_angles.append(gantry_angles[-1]) try: cmws.append(cp.CumulativeMetersetWeight) except AttributeError: cmws.append(cmws[-1]) bldps = getattr(cp, "BeamLimitingDevicePositionSequence", {}) for key in bldp.keys(): bld_types = [x.RTBeamLimitingDeviceType for x in bldps] try: idx = bld_types.index(key) bldp[key].append(bldps[idx].LeafJawPositions) except ValueError: bldp[key].append(bldp[key][-1]) beam_limiting_device_positions = bldp metersets = mu * np.array(cmws) return cls( bld, name, energy, fluence_mode, dose_rate, metersets, gantry_angles, coll_angle, beam_limiting_device_positions, couch_vrt, couch_lat, couch_lng, couch_rot, ) def to_dicom(self) -> Dataset: """Return the beam as a DICOM dataset that represents a BeamSequence item.""" # The Meterset at a given Control Point is equal to Beam Meterset (300A,0086) # specified in the Referenced Beam Sequence (300C,0004) of the RT Fraction Scheme Module, # multiplied by the Cumulative Meterset Weight (300A,0134) for the Control Point, # divided by the Final Cumulative Meterset Weight (300A,010E) # https://dicom.innolitics.com/ciods/rt-plan/rt-beams/300a00b0/300a0111/300a0134 metersets_weights = np.array(self.metersets) / self.metersets[-1] # Round all possible dynamic elements to avoid floating point comparisons. # E.g. to evaluate is an axis is static, all elements should be equal to the first # Note: using np.isclose does not solve the problem since the tolerance should be the same # as Eclipse/Machine, and we don't know which tolerance they use. # Here we assume that their tolerance is tighter than ROUNDING_DECIMALS metersets_weights = np.round(metersets_weights, self.ROUNDING_DECIMALS) metersets_weights = np.array(metersets_weights) # force array for lint gantry_angles = np.round(self.gantry_angles, self.ROUNDING_DECIMALS) bld_positions = { k: np.round(v, self.ROUNDING_DECIMALS) for k, v in self.beam_limiting_device_positions.items() } # Infer gantry rotation from the gantry angles # It assumes the gantry cannot rotate over 180, so there is only one possible direction to go from A to B. ga_wrap180 = wrap180(np.array(gantry_angles)) # This dictionary is used for mapping the sign of the difference with the GantryDirection enum. gantry_direction_map = { 0: GantryDirection.NONE, 1: GantryDirection.CLOCKWISE, -1: GantryDirection.COUNTER_CLOCKWISE, } gantry_direction = [ gantry_direction_map[s] for s in np.sign(np.diff(ga_wrap180)) ] # The last GantryRotationDirection should always be 'NONE' gantry_direction += [GantryDirection.NONE] # Infer if a beam is static or dynamic from the control points gantry_is_static = len(set(gantry_direction)) == 1 dict_bld_is_static = { k: np.all(pos == pos[:, 0:1]) for k, pos in bld_positions.items() } blds_are_static = np.all(list(dict_bld_is_static.values())) beam_is_static = gantry_is_static and blds_are_static beam_type = "STATIC" if beam_is_static else "DYNAMIC" # Create dataset with basic beam info dataset = self._create_basic_beam_info( self.beam_name, beam_type, self._fluence_mode, beam_limiting_device_sequence=self.beam_limiting_device_sequence, number_of_control_points=self.number_of_control_points, ) # Add initial control point cp0 = Dataset() cp0.ControlPointIndex = 0 cp0.NominalBeamEnergy = self._energy cp0.DoseRateSet = self.dose_rate beam_limiting_device_position_sequence = DicomSequence() for key, values in bld_positions.items(): beam_limiting_device_position = Dataset() beam_limiting_device_position.RTBeamLimitingDeviceType = key beam_limiting_device_position.LeafJawPositions = list(values[:, 0]) beam_limiting_device_position_sequence.append(beam_limiting_device_position) cp0.BeamLimitingDevicePositionSequence = beam_limiting_device_position_sequence cp0.GantryAngle = gantry_angles[0] cp0.GantryRotationDirection = gantry_direction[0].value cp0.BeamLimitingDeviceAngle = self.coll_angle cp0.BeamLimitingDeviceRotationDirection = "NONE" cp0.PatientSupportAngle = self.couch_rot cp0.PatientSupportRotationDirection = "NONE" cp0.TableTopEccentricAngle = 0.0 cp0.TableTopEccentricRotationDirection = "NONE" cp0.TableTopVerticalPosition = self._couch_vrt cp0.TableTopLongitudinalPosition = self._couch_lng cp0.TableTopLateralPosition = self._couch_lat cp0.IsocenterPosition = None cp0.CumulativeMetersetWeight = 0.0 dataset.ControlPointSequence.append(cp0) # Add rest of the control points for cp_idx in range(1, self.number_of_control_points): cp = Dataset() cp.ControlPointIndex = cp_idx cp.CumulativeMetersetWeight = metersets_weights[cp_idx] if not gantry_is_static: cp.GantryAngle = gantry_angles[cp_idx] cp.GantryRotationDirection = gantry_direction[cp_idx].value bld_position_sequence = DicomSequence() for bld, positions in bld_positions.items(): if not dict_bld_is_static[bld]: bld_position = Dataset() bld_position.RTBeamLimitingDeviceType = bld bld_position.LeafJawPositions = list(positions[:, cp_idx]) bld_position_sequence.append(bld_position) if len(bld_position_sequence) > 0: cp.BeamLimitingDevicePositionSequence = bld_position_sequence dataset.ControlPointSequence.append(cp) return dataset @staticmethod def _create_basic_beam_info( beam_name: str, beam_type: str, fluence_mode: FluenceMode, beam_limiting_device_sequence: DicomSequence, number_of_control_points: int, ) -> Dataset: beam = Dataset() beam.Manufacturer = "Conjuror" beam.ManufacturerModelName = "Conjuror" beam.PrimaryDosimeterUnit = "MU" beam.SourceAxisDistance = 1000.0 # Primary Fluence Mode Sequence primary_fluence_mode1 = Dataset() if fluence_mode == FluenceMode.STANDARD: primary_fluence_mode1.FluenceMode = "STANDARD" elif fluence_mode == FluenceMode.FFF: primary_fluence_mode1.FluenceMode = "NON_STANDARD" primary_fluence_mode1.FluenceModeID = "FFF" elif fluence_mode == FluenceMode.SRS: primary_fluence_mode1.FluenceMode = "NON_STANDARD" primary_fluence_mode1.FluenceModeID = "SRS" beam.PrimaryFluenceModeSequence = DicomSequence((primary_fluence_mode1,)) # Beam Limiting Device Sequence beam.BeamLimitingDeviceSequence = beam_limiting_device_sequence # beam numbers start at 0 and increment from there. beam.BeamName = beam_name beam.BeamType = beam_type beam.RadiationType = "PHOTON" beam.TreatmentDeliveryType = "TREATMENT" beam.NumberOfWedges = 0 beam.NumberOfCompensators = 0 beam.NumberOfBoli = 0 beam.NumberOfBlocks = 0 beam.FinalCumulativeMetersetWeight = 1.0 beam.NumberOfControlPoints = number_of_control_points # Control Point Sequence beam.ControlPointSequence = DicomSequence() return beam