"""
File: daq_in_scan_usb_1800.py
Library Call Demonstrated: mcculw.ul.daq_in_scan()
Purpose: Synchronously scans 8 thermocouple channels
Demonstration: how to covert raw data to temperature
Other Library Calls: mcculw.ul.daq_in_scan()
mcculw.ul.get_tc_values()
mcculw.ul.win_buf_free()
mcculw.ul.release_daq_device()
Special Requirements: This examples filters on the USB-1808 Series.
"""
from __future__ import absolute_import, division, print_function
from builtins import * # @UnusedWildImport
from time import sleep
# 1616HS and 2500 series do not have ScaleData support
# from ctypes import cast, POINTER, c_double
from ctypes import cast, POINTER, c_ushort, c_float
from mcculw import ul
from mcculw.enums import ScanOptions, ChannelType, ULRange, TempScale, DigitalPortType
from mcculw.device_info import DaqDeviceInfo
from mcculw.ul import ULError
from mcculw.enums import FunctionType, Status
try:
from console_examples_util import config_first_detected_device
except ImportError:
from .console_examples_util import config_first_detected_device
def run_example():
use_device_detection = False
board_num = 0
# Supported PIDs for the USB-1808 Series
rate = 100
points_per_channel = 64
memhandle = None
try:
# use board zero from InstaCal
daq_dev_info = DaqDeviceInfo(board_num)
print('\nActive DAQ device: ', daq_dev_info.product_name, ' (',
daq_dev_info.unique_id, ')\n', sep='')
scan_options = ScanOptions.FOREGROUND
# Create the daq_in_scan channel configuration lists
chan_list = []
chan_type_list = []
gain_list = []
# Analog channels must be first in the list
chan_list.append(0)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(0)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(1)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(1)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(1)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(2)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(2)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(3)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(3)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(4)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(4)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(5)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(4)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(6)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(5)
chan_type_list.append(ChannelType.CJC)
gain_list.append(ULRange.BIP10VOLTS)
chan_list.append(7)
chan_type_list.append(ChannelType.TC)
gain_list.append(ULRange.BIP10VOLTS)
num_chans = len(chan_list)
total_count = num_chans * points_per_channel
half_count = total_count / 2
# Allocate memory for the scan and cast it to a ctypes array pointer
memhandle = ul.scaled_win_buf_alloc(total_count)
# 1616HS and 2500 series do not have ScaleData support
# ctypes_array = cast(memhandle, POINTER(c_double))
ctypes_array = cast(memhandle, POINTER(c_ushort))
# Check if the buffer was successfully allocated
if not memhandle:
raise Exception('Error: Failed to allocate memory')
scan_options = ScanOptions.CONTINUOUS | ScanOptions.BACKGROUND
# Start the scan
actRate, preCount, postCount = ul.daq_in_scan(
board_num, chan_list, chan_type_list, gain_list, num_chans,
rate, 0, total_count, memhandle, scan_options)
status, curr_count, curr_index = ul.get_status(
board_num, FunctionType.DAQIFUNCTION)
# boolean flag used to toggle reading upper and lower buffer
read_lower = True
# acquire 640 samples
acquire = points_per_channel * num_chans * 10
while status != Status.IDLE:
status, curr_count, curr_index = ul.get_status(
board_num, FunctionType.DAQIFUNCTION)
if (curr_index > half_count) and (read_lower == True):
# use if you also have voltage or digital inputs
ul.win_buf_to_array(memhandle, ctypes_array, 0, int(half_count))
# Points per channel is 64 so there are 32 to convert which starts at the zero
e, temps = ul.get_tc_values(
board_num, chan_list, chan_type_list,
num_chans, memhandle, 0, int(points_per_channel/2), TempScale.CELSIUS)
for j in range(256):
if j % 8 == 0:
print()
print('{:2.2f}\t'.format(temps[j]), end ='')
read_lower = False
elif (curr_index < half_count) and (read_lower == False):
# use if you also have voltage or digital inputs
ul.win_buf_to_array(memhandle, ctypes_array, int(half_count), int(half_count))
# Points per channel is 64 so there are 32 to convert which starts at the halfway point
e, temps = ul.get_tc_values(
board_num, chan_list, chan_type_list,
num_chans, memhandle, int(points_per_channel/2), int(points_per_channel/2), TempScale.CELSIUS)
for j in range(256):
if j % 8 == 0:
print()
print('{:2.2f}\t'.format(temps[j]), end='')
read_lower = False
if curr_count > acquire:
print('\nAcquired number of scans = {:d}'.format(int(curr_count / num_chans)))
break
ul.stop_background(board_num, FunctionType.DAQIFUNCTION)
print("\nscan completed successfully.")
except ULError as e:
print('\n', e)
finally:
if memhandle:
# Free the buffer in a finally block to prevent a memory leak.
ul.win_buf_free(memhandle)
if use_device_detection:
ul.release_daq_device(board_num)
if __name__ == '__main__':
run_example()