Control Sequences in Python
Write a control sequence for pressurizing a tank.
One of the most powerful tools Synnax has to offer is the ability to easily write control sequences in Python. These control sequences can be run from a command line to run both simple and complex sequences. In this guide, we will walk you through writing a control sequence to pressurize and vent a pressure vessel.
Prerequisites
Before writing a control sequence, you should have done the following:
- Downloaded the Python client
- Downloaded and started the Synnax server
- Downloaded and started the Synnax Console
Running a Simulated Data Acquisition System
For the purpose of this guide, we will use a simulated data acquisition system (DAQ) to represent the hardware. You can view the code for this DAQ on Synnax’s GitHub repository. This DAQ simulates a pressure vessel that can be filled and vented using several valves. The DAQ has the following sensors and actuators:
Channel | Description |
---|---|
daq_time | The current timestamps of the DAQ. This is an index channel for the sensors on the DAQ. |
sensor_# | The channels representing pressure sensors on the pressure vessel. |
valve_command_# | The channels used to write data to (open/close) a valve. |
valve_response_# | The channels used to read data from a valve (see if a valve is open or closed). |
valve_command_time_# | The index channels used for commanding a valve. Valves need separate index channels than sensors in case the valves are commanded at different times. |
We will be using the following sensors and valves in our control sequence:
Channel | Purpose |
---|---|
sensor_0 | The current pressure in the pressure vessel. |
valve_0 | The valve used to pressurize the pressure vessel. |
valve_1 | The valve used to vent the pressure vessel. |
Writing the Control Sequence
At the top of your script, make sure to import the Synnax library:
import synnax as sy
The next step is to connect to a Synnax cluster. This can be done by logging in with through the CLI or by passing the credentials into your script.
After that, we will want to instantiate the Synnax client so we can interact with the cluster:
client = sy.Synnax()
We are also going to create channel names to use in our control sequence. These names were created by the simulated DAQ when the respective channels were created.
PRESS_VALVE = "valve_command_0"
VENT_VALVE = "valve_command_1"
PRESSURE = "sensor_0"
Next, you’ll need to open up a Controller
. A Controller
is used to control hardware
by extending a framework for writing data to channels. Controller
s have methods such
as get
, set
, and wait_until
that make it easy to interact with channels in a
control sequence. We recommend using a context manager to ensure that the Controller
is
properly closed when the script is finished.
with client.control.acquire(
name="Pressurization Sequence",
write_authorities=[200],
write=[PRESS_VALVE, VENT_VALVE],
read=[PRESSURE],
) as controller:
Write authorities determine which writer is allowed to write to a given channel. Every writer in Synnax has an authority between 0 and 255. If two writers are open on the same channel, the writer with the higher authority will be able to write but the other one will not.
Next, lets grab the start timestamp from the control sequence and set a target pressure:
start = sy.TimeStamp.now()
target_pressure = 20 # psi
We should also make sure the vent valve is closed by setting a False
value for its
channel on the controller:
controller[VENT_VALVE] = False
Finally, we can open the pressurization valve and wait to reach the target pressure
before closing the valve. We will use the wait_until
method on the controller to
accomplish this. The wait_until
method will wait until the first argument is met or a
certain time has elapsed. If we want to wait to stop pressurizing the tank after a
certain time has passed even if we haven’t reached out target pressure, we can use the
timeout
argument:
controller[PRESS_VALVE] = True
controller.wait_until(
lambda c: c[PRESSURE] > target_pressure,
timeout = 20 * sy.TimeSpan.Second,
)
controller[PRESS_VALVE] = False
Lambda functions are short, anonymous functions in Python. This lambda function can be equivalent to writing a function like this:
def if_pressure_reached(controller):
return controller[PRESSURE] > target_pressure
and then calling it in our code like this:
controller.wait_until(if_pressure_reached, timeout = 20 * sy.TimeSpan.Second)
For more information on lambda functions, you can visit our reference page on controllers.
We can then depressurize the tank by opening the vent valve:
controller[VENT_VALVE] = True
controller.wait_until(
lambda c: c[PRESSURE] < 5,
timeout = 20 * sy.TimeSpan.Second,
)
controller[VENT_VALVE] = False
After this, we will create a range to represent the test. We can later use this range to easily look up this test later:
client.ranges.create(
name="Pressurization Test",
time_range=sy.TimeRange(start=start, end=sy.TimeStamp.now()),
)
This range is now saved and can be referenced later to view this test.
The full file can be viewed below:
control_sequence.py
import synnax as sy
import time
client = sy.Synnax(
host="localhost",
port=9090,
username="synnax",
password="seldon",
secure=False,
)
PRESS_VALVE = "valve_command_0"
VENT_VALVE = "valve_command_1"
PRESSURE = "sensor_0"
with client.control.acquire(
name="Pressurization Sequence",
write_authorities=[200],
write=[PRESS_VALVE, VENT_VALVE],
read=[PRESSURE],
) as controller:
start = sy.TimeStamp.now()
target_pressure = 20 # psi
controller[VENT_VALVE] = False
controller[PRESS_VALVE] = True
controller.wait_until(
lambda c: c[PRESSURE] > target_pressure,
timeout = 20 * sy.TimeSpan.Second,
)
controller[PRESS_VALVE] = False
time.sleep(3)
controller[VENT_VALVE] = True
controller.wait_until(
lambda c: c[PRESSURE] < 5,
timeout = 20 * sy.TimeSpan.Second,
)
controller[VENT_VALVE] = False
client.ranges.create(
name="Pressurization Test",
time_range=sy.TimeRange(start=start, end=sy.TimeStamp.now()),
)
Running the Control Sequence
The first step to running the control sequence is to start a local Synnax cluster. After installing the server you should run the following command:
synnax start --insecure --memory
Next, you will want to run the simulated DAQ. You can do this by downloading the Python script from the Synnax GitHub repository. After downloading the script, run the following command:
python simulated_daq.py
Python can be tricky to get installed and running properly. If you are having troubles, please visit our Python troubleshooting guide.
Now its, time to open the Synnax Console to set up two visualizations of the tank. After opening the console, you can connect to the cluster using the same connection parameters in the Python script.
Now, we will setup a schematic of our tank and valves:
In the schematic, make sure there are the following components:
Symbol | Channels | Purpose |
---|---|---|
Value | Input Channel: sensor_0 | The current pressure in the tank. |
Valve | State Channel: valve_response_0 Command Channel: valve_command_0 | The valve used to pressurize the tank. |
Valve | State Channel: valve_response_1 Command Channel: valve_command_1 | The valve used to vent the tank. |
Once the schematic is set up, you can enter control mode to manually control the valves.
Now, you can set up a line plot to view a plot of the pressure in the tank:
Once you finished setting up the line plot, your console should look like this:
Now, it’s time to run your control sequence. You can do this by running the Python script from earlier:
python control_sequence.py
You should see this result on the console:
Conclusion
If you want to look at more examples of control sequences, visit our GitHub repository. You can find more informations about there about functions used in our Python client.