How to Create a Driver with Dynamically Changing Channels
This guide should show you how to write an instrument driver where the number and type of available Channels is defined and modifiable at run time.
Tip
This is especially useful if you want to write drivers where you know that the individual instrument might have, for example, a varying number of channels, where every channel has the same functionality. Or for instruments with a very large number of (very similar) channels, where you programmatically want to create the available set and read channels at runtime.
Create the Core Structure
As always it is easiest to simply create the core structure and boilerplate code using the Driver Builder under Tools in the CAMELS GUI. As an example we will create a mock instrument called test_dynamic.
Modify the *.py
File
In the <driver_name>.py
file you must change the following things:
Set
ophyd_device=None
in thesuper.init()
ofsubclass
Set
ophyd_class_name="make_ophyd_instance"
in thesuper.init()
of the subclassAdd any instrument configuration settings. We will simply add a combobox (drop down menu) to vary the number of channels. This should demonstrate how the dynamic channel generation works. So we add
self.settings["channel_numbers"] = "3" # default number of channels should be 3
below the
__init__
ofsubclass
and then addcomboBoxes = {"channel_numbers": ["1", "2", "3", "4", "5"]}
to the
subclass_config
class after__init__
.You can define anything to be passed to the class (in
.py
file) and then define what should happen with the value you passed to the class to create read and set channels dynamically (inophyd.py
file).Add an import statement to the top where you import the
make_ophyd_class
defined in thedevice_name_ophyd.py
file like thisfrom .test_dynamic_ophyd import make_ophyd_class
The
make_ophyd_class
takes any arguments and then creates read and write channels depending on what you define in theophyd.py
file. How to modify thedevice_name_ophyd.py
file is explained below in more detail.We want to create the ophyd class with the number of channels we select from the drop down menu. Add the following two methods to the subclass:
def update_driver(self): if ( not "channel_numbers" in self.settings or not self.settings["channel_numbers"] ): return # make_ophyd_class is a function that returns a class with components that are generated at runtime # here we pass the channel_numbers to the make_ophyd_class which creates the class self.ophyd_class = make_ophyd_class(self.settings["channel_numbers"]) # now we create an instance of the class # name="test" prevents the instrument driver from actually trying to connect directly to the physical instrument self.ophyd_instance = self.ophyd_class( channel_numbers=self.settings["channel_numbers"], name="test" ) config, passive_config = get_configs_from_ophyd(self.ophyd_instance) for key, value in config.items(): if key not in self.config: self.config[key] = value for key, value in passive_config.items(): if key not in self.passive_config: self.passive_config[key] = value def get_channels(self): self.update_driver() return super().get_channels()
These methods update the driver and make sure all teh channels are available.
self.ophyd_class
andself.ophyd_instance
actually create the instrument instance, depending on theDefine this function at the end of the file. It does not belong to any class
def get_configs_from_ophyd(ophyd_instance): config = {} passive_config = {} for comp in ophyd_instance.walk_components(): name = comp.item.attr dev_class = comp.item.cls if name in ophyd_instance.configuration_attrs: if device_class.check_output(dev_class): config.update({f"{name}": 0}) else: passive_config.update({f"{name}": 0}) return config, passive_config
The final .py
looks like this:
from .test_dynamic_ophyd import Test_Dynamic
from .test_dynamic_ophyd import make_ophyd_class
from nomad_camels.main_classes import device_class
class subclass(device_class.Device):
def __init__(self, **kwargs):
super().__init__(
name="test_dynamic",
virtual=False,
tags=[],
directory="test_dynamic",
ophyd_device=None,
ophyd_class_name="make_ophyd_instance",
**kwargs,
)
self.settings["channel_numbers"] = "3" # default number of channels should be 3
def update_driver(self):
if (
not "channel_numbers" in self.settings
or not self.settings["channel_numbers"]
):
return
# make_ophyd_class is a function that returns a class with components that are generated at runtime
# here we pass the channel_numbers to the make_ophyd_class which creates the class
self.ophyd_class = make_ophyd_class(self.settings["channel_numbers"])
# now we create an instance of the class
# name="test" prevents the instrument driver from actually trying to connect directly to the physical instrument
self.ophyd_instance = self.ophyd_class(
channel_numbers=self.settings["channel_numbers"], name="test"
)
config, passive_config = get_configs_from_ophyd(self.ophyd_instance)
for key, value in config.items():
if key not in self.config:
self.config[key] = value
for key, value in passive_config.items():
if key not in self.passive_config:
self.passive_config[key] = value
def get_channels(self):
self.update_driver()
return super().get_channels()
class subclass_config(device_class.Simple_Config):
def __init__(
self,
parent=None,
data="",
settings_dict=None,
config_dict=None,
additional_info=None,
):
comboBoxes = {"channel_numbers": ["1", "2", "3", "4", "5"]}
super().__init__(
parent,
"test_dynamic",
data,
settings_dict,
config_dict,
additional_info,
comboBoxes=comboBoxes,
)
self.load_settings()
def get_configs_from_ophyd(ophyd_instance):
config = {}
passive_config = {}
for comp in ophyd_instance.walk_components():
name = comp.item.attr
dev_class = comp.item.cls
if name in ophyd_instance.configuration_attrs:
if device_class.check_output(dev_class):
config.update({f"{name}": 0})
else:
passive_config.update({f"{name}": 0})
return config, passive_config
Modify the *ophyd.py
File
In the *ophyd.py
file we must define the make_ophyd_class
and make_ophyd_instance
functions.
1. Define make_ophyd_class
We will start by adding the function that defines and creates the class first:
def make_ophyd_class(channel_number):
signal_dictionary = {}
for channel in range(1, int(channel_number) + 1):
# For each channel add read_power function
signal_dictionary[f"read_power_channel_{channel}"] = Cpt(
Custom_Function_SignalRO,
name=f"read_power_channel_{channel}",
metadata={"units": "", "description": ""},
read_function=read_function_generator(channel),
)
return type(
f"Test_Dynamic_total_channels_{channel_number}",
(Test_Dynamic,),
{**signal_dictionary},
)
2. Create Dynamic Read Channels : Define read_function_generator
As we want to create read and write channels for each instrument channel, we iterate over all the available channels. Here you would add your own code and add the desired components to the signal_dictionary
. We are using CustomFunctionSignalsRO
as we only want to be able to read these channels.
Attention
The read_function=read_function_generator(channel)
line is very important as this is where we define what exactly is done when we call the read channel in CAMELS.
As we want to dynamically create these read_functions as they most likely are slightly different for each channel (this depends on the exact instrument) we will use a closure to create many instances of slightly different functions.
For this add the definition of the read_function_generator
anywhere in the ophyd file. You can add it into the make_ophyd_class
if you like:
def make_ophyd_class(channel_number):
def read_function_generator(channel):
def read_function(_self_instance):
"""
This function returns a lambda function that reads the power of the specified channel.
the read_function is added to the signal as a read_function.
The _self_instance will later be resolved to the parent of the instance of the
Ibeam_smart class that the signal belongs to.
Parameters:
_self_instance (object): The parent instance.
Returns:
function: A lambda function that reads the power channel.
"""
return lambda: _self_instance.parent.read_power_channel(channel)
return read_function
signal_dictionary = {}
for channel in range(1, int(channel_number) + 1):
It makes sense to add the _self_instance
argument as this allows you to access all the methods of the parent class (so here the Test_Dynamic
class) later on. The signals can handel functions that have the _self_instance
argument and then pass the correct self
to the function.
3. Define make_ophyd_instance
Now we define the function that creates an instance of the class we just defined.
def make_ophyd_instance(
prefix="",
*args,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
# These are the arguments you want to pass to the ophyd class
# These are the settings you defined in the .py file
# We will pass the number of channels we selected in the drop down and are defined in the .py file
channel_numbers="",
**kwargs,
):
ophyd_class = make_ophyd_class(channel_numbers)
return ophyd_class(
prefix,
*args,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
# These are the arguments you want to pass to the ophyd class
# These are the settings you defined in the .py file
# We will pass the number of channels we selected in the drop down and are defined in the .py file
channel_numbers=channel_numbers,
**kwargs,
)
The instance we return here is the class we return with make_ophyd_class
.
...
return type(
f"Test_Dynamic_total_channels_{channel_number}",
(Test_Dynamic,), # This is the class that was automatically created by the driver_builder
{**signal_dictionary},
)
See above for more details.
4. Define the Device Class
The driver builder automatically created the default Device class depending on the name you gave in the driver builder. For us this class is called Test_Dynamic
.
class Test_Dynamic(Device):
channel_numbers = Cpt(
Custom_Function_Signal,
name="channel_numbers",
kind="config",
metadata={"units": "None", "description": "number of channels selected by the user in the GUI"},
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
# This is what you need to add:
channel_number="",
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
5. Adding Functions to the Parent Class
In the read_function_generator
we defined that there must be a method of the parent class called read_power_channel
as you can see here:
def read_function_generator(channel):
...
return lambda: _self_instance.parent.read_power_channel(channel)
...
We must now add this function to the parent class (the Test_Dynamic
class)
class Test_Dynamic(Device):
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
# These are the settings you defined in the ophyd class
channel_numbers="",
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.channel_numbers = channel_numbers
# if name is test then all the code after the return is skipped
# this is useful if you perform actual connections to instruments, that should not run when first defining the instrument in the GUI
if name == "test":
return
# This function is called by the read_function_generator
def read_power_channel(self, channel):
return f"Power of channel {channel} is {channel**2}"
Create Dynamic Set Channels
Above we created channels that can only be read. Now we want to create channels that can set values and are intended to write to instruments.
1. Add Set Channels to Device
For this we add the new set channels to the device by adding them to the signal_dictionary
in the make_ophyd_class
:
# For each channel add a set power function
signal_dictionary[f"put_power_channel_{channel}"] = Cpt(
Custom_Function_Signal,
name=f"put_power_channel_{channel}",
metadata={"units": "", "description": f"Sets the power of channel {channel} to the value provided in the GUI."},
put_function=put_function_generator(channel),
)
2. Define put_funtion_generator
Now we create a generator (closure) that generates a put (also called set) function for each channel.
...
def put_function_generator(channel):
def put_power_function(_self_instance, value):
"""
This function returns a lambda function that sets the power of the specified channel.
the put_function is added to the signal as a put_function.
The _self_instance will later be resolved to the parent of the instance of the
Ibeam_smart class that the signal belongs to.
Parameters:
_self_instance (object): The parent instance.
value (float): The power to set the channel to.
Returns:
function: A lambda function that sets the power channel.
"""
# It is important to pass the value to the lambda function!
return lambda: _self_instance.parent.put_power_channel(channel, value)
return put_power_function
...
3. Define the put_power_function
The last step is to define what happens when the put_power_function
is called. This is done in the device class. For us that is Test_Dynamic
:
...
def put_power_channel(self, channel, value):
return f"Power of channel {channel} is set to {value}"
The Final ophyd.py
File
The final ophyd.py
file looks like this
from ophyd import Component as Cpt
from nomad_camels.bluesky_handling.custom_function_signal import (
Custom_Function_Signal,
Custom_Function_SignalRO,
)
from ophyd import Device
def make_ophyd_instance(
prefix="",
*args,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
# These are the arguments you want to pass to the ophyd class
# These are the settings you defined in the .py file
# We will pass the number of channels we selected in the drop down and are defined in the .py file
channel_numbers="",
**kwargs,
):
ophyd_class = make_ophyd_class(channel_numbers)
return ophyd_class(
prefix,
*args,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
# These are the arguments you want to pass to the ophyd class
# These are the settings you defined in the .py file
# We will pass the number of channels we selected in the drop down and are defined in the .py file
channel_numbers=channel_numbers,
**kwargs,
)
def make_ophyd_class(channel_number):
def read_function_generator(channel):
def read_function(_self_instance):
"""
This function returns a lambda function that reads the power of the specified channel.
the read_function is added to the signal as a read_function.
The _self_instance will later be resolved to the parent of the instance of the
Ibeam_smart class that the signal belongs to.
Parameters:
_self_instance (object): The parent instance.
Returns:
function: A lambda function that reads the power channel.
"""
return lambda: _self_instance.parent.read_power_channel(channel)
return read_function
def put_function_generator(channel):
def put_power_function(_self_instance, value):
"""
This function returns a lambda function that sets the power of the specified channel.
the put_function is added to the signal as a put_function.
The _self_instance will later be resolved to the parent of the instance of the
Ibeam_smart class that the signal belongs to.
Parameters:
_self_instance (object): The parent instance.
value (float): The power to set the channel to.
Returns:
function: A lambda function that sets the power channel.
"""
# It is important to pass the value to the lambda function!
return lambda: _self_instance.parent.put_power_channel(channel, value)
return put_power_function
signal_dictionary = {}
for channel in range(1, int(channel_number) + 1):
# For each channel add read_power function
signal_dictionary[f"read_power_channel_{channel}"] = Cpt(
Custom_Function_SignalRO,
name=f"read_power_channel_{channel}",
metadata={"units": "", "description": f"Read power of channel {channel} which is the square of {channel}"},
read_function=read_function_generator(channel),
)
# For each channel add a set power function
signal_dictionary[f"put_power_channel_{channel}"] = Cpt(
Custom_Function_Signal,
name=f"put_power_channel_{channel}",
metadata={"units": "", "description": f"Sets the power of channel {channel} to the value provided in the GUI."},
put_function=put_function_generator(channel),
)
return type(
f"Test_Dynamic_total_channels_{channel_number}",
(Test_Dynamic,),
{**signal_dictionary},
)
class Test_Dynamic(Device):
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
# These are the settings you defined in the ophyd class
channel_numbers="",
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
self.channel_numbers = channel_numbers
# if name is test then all the code after the return is skipped
# this is useful if you perform actual connections to instruments, that should not run when first defining the instrument in the GUI
if name == "test":
return
# This function is called by the read_function_generator
def read_power_channel(self, channel):
return f"Power of channel {channel} is {channel**2}"
def put_power_channel(self, channel, value):
return f"Power of channel {channel} is set to {value}"