from __future__ import absolute_import, division, print_function
import time
from builtins import * # @UnusedWildImport
from time import sleep
from ctypes import cast, POINTER, c_double, c_ushort, c_ulong
from mcculw import ul
from mcculw.enums import ScanOptions, FunctionType, Status
from mcculw.ul import ULError, a_input_mode
from mcculw.enums import InterfaceType
from mcculw.enums import CounterMode
from mcculw.enums import CounterDebounceMode
from mcculw.enums import CounterDebounceTime
from mcculw.enums import CounterEdgeDetection
from mcculw.enums import CounterTickSize
use_device_detection = True
def run_example():
board_num = 0
board_index = 0
find_device = "USB-QUAD08"
if use_device_detection:
board_num = -1
ul.ignore_instacal()
dev_list = ul.get_daq_device_inventory(InterfaceType.USB)
if len(dev_list) > 0:
for device in dev_list:
if str(device) == find_device:
print(f"Found {find_device} board number = {board_index}")
print(f"Serial number: {device.unique_id}")
print(f"Product type: {hex(device.product_id)}")
board_num = board_index
ul.create_daq_device(board_num, device)
board_index = board_index + 1
if board_num == -1:
print(f"Device {find_device} not found")
return
else:
print("No devices detected")
return
# **********End of Discovery************
rate = 200
points_per_channel = 40
low_chan = 0
high_chan = 1
num_chans = 2
total_count = points_per_channel * num_chans
half_count = int(total_count / 2)
# The SCALEDATA option, returns volts instead of A/D counts
scan_options = ScanOptions.CONTINUOUS | ScanOptions.BACKGROUND | ScanOptions.CTR32BIT
memhandle = ul.win_buf_alloc_32(total_count)
buf_data = cast(memhandle, POINTER(c_ulong))
# Check if the buffer was successfully allocated
if not memhandle:
print("Failed to allocate memory.")
return
try:
ul.c_config_scan(board_num,
low_chan,
CounterMode.ENCODER | CounterMode.ENCODER_MODE_BIT_32,
CounterDebounceTime.DEBOUNCE500ns,
CounterDebounceMode.TRIGGER_AFTER_STABLE,
CounterEdgeDetection.RISING_EDGE,
CounterTickSize.TICK20PT83ns,
low_chan)
ul.c_config_scan(board_num,
high_chan,
CounterMode.PERIOD | CounterMode.PERIOD_MODE_BIT_32,
CounterDebounceTime.DEBOUNCE500ns,
CounterDebounceMode.TRIGGER_AFTER_STABLE,
CounterEdgeDetection.RISING_EDGE,
CounterTickSize.TICK20PT83ns,
high_chan)
# tick_size is the amount of time assigned to single count
tick_size = 0.00000002083
# distance_per_tick is a distance assigned to single count (arbitrary value for demonstration)
distance_per_tick = 0.001
# Start reading the inputs
ul.c_in_scan(
board_num, low_chan, high_chan, total_count,
rate, memhandle, scan_options)
# Create a format string that aligns the data in columns
# plus two for curr_index and curr_count
row_format = "{:8}" * (num_chans)
# Print the channel name headers
labels = []
for ch_num in range(low_chan, high_chan + 1):
labels.append("CH" + str(ch_num) + "\t")
print(row_format.format(*labels))
# boolean flag used to toggle reading upper and lower buffer
read_lower = True
# Start updating the displayed values
status, curr_count, curr_index = ul.get_status(
board_num, FunctionType.CTRFUNCTION)
last = 0
diff = 0
while status != Status.IDLE and curr_count < 5000:
# Make sure a data point is available for display.
if curr_count > 0:
# curr_index points to the start of the last completed
# channel scan that was transferred between the board and
# the data buffer. Display the latest value for each
# channel.
# display_data = []
if (curr_index > half_count) and (read_lower == True):
# tranfer lower half of the buffer to buf_data array
ul.win_buf_to_array_32(memhandle, buf_data, 0, int(half_count))
if buf_data[1] > 0:
# assumes one pulse per revolution
period = buf_data[1] * tick_size
frequency = 1 / period
rpm = frequency * 60
distance = buf_data[0] * distance_per_tick
print('{:.3f} inches\t\t{:.3f} RPM'.format(distance, rpm))
read_lower = False
elif (curr_index < half_count) and (read_lower == False):
# transfer upper half the buffer to buf_data array
ul.win_buf_to_array_32(memhandle, buf_data, int(half_count), int(half_count))
if buf_data[1] > 0:
# assumes one pulse per revolution
period = buf_data[1] * tick_size
frequency = 1 / period
rpm = frequency * 60
distance = buf_data[0] * distance_per_tick
print('{:.3f} inches\t\t{:.3f} RPM'.format(distance, rpm))
read_lower = True
sleep(0.1)
status, curr_count, curr_index = ul.get_status(
board_num, FunctionType.CTRFUNCTION)
# Stop the background operation (this is required even if the
# scan completes successfully)
ul.stop_background(board_num, FunctionType.AIFUNCTION)
print("Scan completed successfully.")
except ULError as e:
print("UL error\n", e)
finally:
# Free the buffer in a finally block to prevent errors from causing
# a memory leak.
ul.win_buf_free(memhandle)
if use_device_detection:
ul.release_daq_device(board_num)
if __name__ == '__main__':
run_example()