Measurement Computing   Easy to Use | Easy to Integrate | Easy to Support catalog banner

Python for Windows example collects thermocouple data at 100 Hz...

Expand / Collapse
 

Python for Windows example collects thermocouple data at 100 Hz


The code listing below uses the daq_in_scan function to read thermocouple inputs (module attached at the bottom of this article). This function is typically used to simultaneously read the device subsystems (analog, digital) on devices such as the USB-1616HS and USB-2523. However, if you need to scan multiple thermocouples at a faster rate, it must also be used so as to pair up the channel with its CJC sensor. In this example, we use it to read eight thermocouple inputs (and the associated CJC channels) at 100 samples per second. The binary data returned in the buffer contains both CJC and channel data. It is arranged as CJC, Channel0, CJC Channel1, etc. Care must be taken to position the correct CJC before each channel. The GetTCValues function is used to convert the CJC and channel data to a temperature value. The key to using the function is to enter the position of the first thermocouple/CJC and the number of samples to convert. 

Attached below is the 32-bit code file. To run the example, you must have InstaCal and mcculw Python for Windows installed. Use the attached module as it not recommended to copy/paste Python code due to tab indentations issues.

Disclaimer:
The attached Code or Example is provided As Is.  It has not been tested or validated as a product, for use in a deployed application or system, or for use in hazardous environments.  You assume all risks for use of the Code or Example.

"""
File:                       daq_in_scan_usb_1800.py

Library Call Demonstrated:  mcculw.ul.daq_in_scan()

Purpose:                    Synchronously scans 8 thermocouple channels

Demonstration:              how to covert raw data to temperature

Other Library Calls:        mcculw.ul.daq_in_scan()
                            mcculw.ul.get_tc_values()
                            mcculw.ul.win_buf_free()
                            mcculw.ul.release_daq_device()

Special Requirements:       This examples filters on the USB-1808 Series.
"""
from __future__ import absolute_import, division, print_function
from builtins import *  # @UnusedWildImport
from time import sleep
# 1616HS and 2500 series do not have ScaleData support
# from ctypes import cast, POINTER, c_double
from ctypes import cast, POINTER, c_ushort, c_float

from mcculw import ul
from mcculw.enums import ScanOptions, ChannelType, ULRange, TempScale, DigitalPortType
from mcculw.device_info import DaqDeviceInfo
from mcculw.ul import ULError
from mcculw.enums import FunctionType, Status


try:
    from console_examples_util import config_first_detected_device
except ImportError:
    from .console_examples_util import config_first_detected_device


def run_example():

    use_device_detection = False
    board_num = 0
    # Supported PIDs for the USB-1808 Series
    rate = 100
    points_per_channel = 64
    memhandle = None

    try:

        # use board zero from InstaCal
        daq_dev_info = DaqDeviceInfo(board_num)
        print('\nActive DAQ device: ', daq_dev_info.product_name, ' (',
              daq_dev_info.unique_id, ')\n', sep='')

        scan_options = ScanOptions.FOREGROUND

        # Create the daq_in_scan channel configuration lists
        chan_list = []
        chan_type_list = []
        gain_list = []

        # Analog channels must be first in the list
        chan_list.append(0)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(0)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(1)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(1)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(1)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(2)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(2)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(3)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(3)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(4)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(4)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(5)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(4)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(6)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(5)
        chan_type_list.append(ChannelType.CJC)
        gain_list.append(ULRange.BIP10VOLTS)

        chan_list.append(7)
        chan_type_list.append(ChannelType.TC)
        gain_list.append(ULRange.BIP10VOLTS)
        num_chans = len(chan_list)

        total_count = num_chans * points_per_channel
        half_count = total_count / 2

        # Allocate memory for the scan and cast it to a ctypes array pointer
        memhandle = ul.scaled_win_buf_alloc(total_count)

        # 1616HS and 2500 series do not have ScaleData support
        # ctypes_array = cast(memhandle, POINTER(c_double))
        ctypes_array = cast(memhandle, POINTER(c_ushort))

        # Check if the buffer was successfully allocated
        if not memhandle:
            raise Exception('Error: Failed to allocate memory')

        scan_options = ScanOptions.CONTINUOUS | ScanOptions.BACKGROUND

        # Start the scan
        actRate, preCount, postCount = ul.daq_in_scan(
            board_num, chan_list, chan_type_list, gain_list, num_chans,
            rate, 0, total_count, memhandle, scan_options)

        status, curr_count, curr_index = ul.get_status(
            board_num, FunctionType.DAQIFUNCTION)

        # boolean flag used to toggle reading upper and lower buffer
        read_lower = True

        # acquire 640 samples
        acquire = points_per_channel * num_chans * 10
        while status != Status.IDLE:

            status, curr_count, curr_index = ul.get_status(
                board_num, FunctionType.DAQIFUNCTION)

            if (curr_index > half_count) and (read_lower == True):
                # use if you also have voltage or digital inputs
                ul.win_buf_to_array(memhandle, ctypes_array, 0, int(half_count))

                # Points per channel is 64 so there are 32 to convert which starts at the zero
                e, temps = ul.get_tc_values(
                    board_num, chan_list, chan_type_list,
                    num_chans, memhandle, 0, int(points_per_channel/2), TempScale.CELSIUS)

                for j in range(256):
                    if j % 8 == 0:
                        print()
                    print('{:2.2f}\t'.format(temps[j]), end ='')

                read_lower = False

            elif (curr_index < half_count) and (read_lower == False):
                # use if you also have voltage or digital inputs
                ul.win_buf_to_array(memhandle, ctypes_array, int(half_count), int(half_count))

                # Points per channel is 64 so there are 32 to convert which starts at the halfway point
                e, temps = ul.get_tc_values(
                    board_num, chan_list, chan_type_list,
                    num_chans, memhandle, int(points_per_channel/2), int(points_per_channel/2), TempScale.CELSIUS)

                for j in range(256):
                    if j % 8 == 0:
                        print()
                    print('{:2.2f}\t'.format(temps[j]), end='')

                read_lower = False

            if curr_count > acquire:
                print('\nAcquired number of scans = {:d}'.format(int(curr_count / num_chans)))
                break

        ul.stop_background(board_num, FunctionType.DAQIFUNCTION)
        print("\nscan completed successfully.")
    except ULError as e:
        print('\n', e)
    finally:
        if memhandle:
            # Free the buffer in a finally block to prevent a memory leak.
            ul.win_buf_free(memhandle)
        if use_device_detection:
            ul.release_daq_device(board_num)


if __name__ == '__main__':
    run_example()


Rate this Article:

Attachments


daq_in_scan_tc_get_values.py daq_in_scan_tc_get_values.py (7.20 KB, 14 views)

Add Your Comments


For comments email [email protected].

Details
Article ID: 50863

Last Modified:8/31/2022 8:29:10 AM

Article has been viewed 152 times.

Options