# -*- coding: UTF-8 -*-
# Wrapper for Test-Equipment-Plus's "SignalHound" series of USB spectrum analysers.
#
# Written By Connor Wolf <wolf@imaginaryindustries.com>
#
# * ----------------------------------------------------------------------------
# * "THE BEER-WARE LICENSE":
# * Connor Wolf <wolf@imaginaryindustries.com> wrote this file. As long as you retain
# * this notice you can do whatever you want with this stuff. If we meet some day,
# * and you think this stuff is worth it, you can buy me a beer in return.
# * (Only I don't drink, so a soda will do). Connor
# * Also, support the Signal-Hound devs. Their hardware is pretty damn awesome.
# * ----------------------------------------------------------------------------
#
# TODO: Respin lots of the messy if: elif: else: statements into a dictionary lookup
import ctypes as ct
import ctypes.util as ctu
import bb_api_h as hf
import sys
if sys.platform == "win32":
from ctypes import wintypes as wt
else:
raise ValueError("Only windows is currently supported by the SignalHound hardware.")
import logging
import numpy as np
from numpy.core.multiarray import int_asbuffer
import os.path
[docs]class SignalHound(object):
#: C Call return value -> integer return code mapping
bbStatus = {
"bbInvalidModeErr" : -112,
"bbReferenceLevelErr" : -111,
"bbInvalidVideoUnitsErr" : -110,
"bbInvalidWindowErr" : -109,
"bbInvalidBandwidthTypeErr" : -108,
"bbInvalidSweepTimeErr" : -107,
"bbBandwidthErr" : -106,
"bbInvalidGainErr" : -105,
"bbAttenuationErr" : -104,
"bbFrequencyRangeErr" : -103,
"bbInvalidSpanErr" : -102,
"bbInvalidScaleErr" : -101,
"bbInvalidDetectorErr" : -100,
# General Errors
"bbDeviceConnectionErr" : -14,
"bbPacketFramingErr" : -13,
"bbGPSErr" : -12,
"bbGainNotSetErr" : -11,
"bbDeviceNotIdleErr" : -10,
"bbDeviceInvalidErr" : -9,
"bbBufferTooSmallErr" : -8,
"bbNullPtrErr" : -7,
"bbAllocationLimitErr" : -6,
"bbDeviceAlreadyStreamingErr" : -5,
"bbInvalidParameterErr" : -4,
"bbDeviceNotConfiguredErr" : -3,
"bbDeviceNotStreamingErr" : -2,
"bbDeviceNotOpenErr" : -1,
# No Error
"bbNoError" : 0,
# Warnings/Messages
"bbAdjustedParameter" : 1,
"bbADCOverflow" : 2,
"bbNoTriggerFound" : 3
}
#: C Array size for raw sweep requests
_rawDataArrSize = 299008
#: Raw sweep trigger C array size
_rawSweepTriggerArraySize = 68
__devType = None
def __init__(self):
self.log = logging.getLogger("Main.DeviceInt")
self.devOpen = False
if sys.platform == "win32":
self.log.info("Opening DLL")
libPath = ctu.find_library("bb_api.dll")
if not libPath:
if os.path.exists("bb_api.dll"): # This is a messy hack, but it makes imports work with my scripts. I should
# Really put the signal hound DLL on my $PATH, but whatever
libPath = "bb_api.dll"
elif os.path.exists("../bb_api.dll"):
libPath = "../bb_api.dll"
# So, apparently despite the fact that the setup.py script drops the signal hound
# dll in the python/DLLs directory, and the fact that that directory is in sys.path,
# find_library somehow doesn't find it anyways.
# As such, manually check the DLLs directory for the signalHound dll
elif os.path.exists(os.path.join(sys.exec_prefix, "DLLs", "bb_api.dll")):
libPath = os.path.join(sys.exec_prefix, "DLLs", "bb_api.dll")
else:
self.log.error("Could not locate signal hound DLL.")
raise EnvironmentError("Required DLL not available on system PATH")
self.log.info("Found dll located at %s", libPath)
self.dll = ct.CDLL (libPath)
# This is horrible ctypes DLL hackery
# You need to access the internal DLL handle to properly force windows to close the dll handle, which
# is the only way to COMPLETELY close the device interface.
# It's needed if you ever want to completely close the device, to re-initialize the device interface.
# ctypes doesn't make manually deallocating a dll easy.
self.dllHandle = wt.HMODULE(self.dll._handle)
elif sys.platform == "linux" or sys.platform == "linux2":
self.log.error("Linux Not supported for API Verson 2.x!")
raise NotImplementedError("Linux Not supported for API Verson 2.x!")
self.cRawSweepCallbackFunc = None
self.openDevice()
self.acq_conf = {}
self.sequentialADCErrors = 0
def __del__(self):
self.log.info("Deleting SignalHound Interface Class")
self.forceClose()
[docs] def forceClose(self):
self.log.info("Force Closing.")
if self.devOpen:
self.closeDevice()
if self.cRawSweepCallbackFunc:
del(self.cRawSweepCallbackFunc)
# Note: This is *probably* not needed. I was having some issues with dangling handles when doing
# multople-process data-logging, and it was part of the debugging effort from that.
# At this point, it's probably uneeded (it wasn't the root of the issue), but it's
# harmless, and I figure it's better to explicitly clean-up the DLL handle then
# rely on it happening automatically
self.log.info("Forcing DLL handle closed")
if sys.platform == "win32":
try:
ct.windll.kernel32.FreeLibrary(self.dllHandle)
except ct.ArgumentError as e:
self.log.warning("Argument error in forcing DLL closed")
self.log.warning("%s", e)
[docs] def openDevice(self):
self.log.info("Opening Device")
self.deviceHandle = ct.c_int(0)
deviceHandlePnt = ct.pointer(self.deviceHandle)
ret = self.dll.bbOpenDevice(deviceHandlePnt)
if ret != hf.bbNoError:
if ret == hf.bbNullPtrErr:
raise ValueError("Could not open device due to null-pointer error!")
elif ret == hf.bbDeviceNotOpenErr:
raise ValueError("Could not open device!")
else:
raise ValueError("Could not open device due to unknown reason!")
self.devOpen = True
self._devType = self.getDeviceType()
self.log.info("Opened Device with handle num: %s", self.deviceHandle.value)
[docs] def closeDevice(self):
self.log.info("Closing Device with handle num: %s", self.deviceHandle.value)
try:
self.dll.bbAbort(self.deviceHandle)
self.log.info("Running acquistion aborted.")
except Exception as e:
self.log.info("Could not abort acquisition: %s", e)
ret = self.dll.bbCloseDevice(self.deviceHandle)
if ret != hf.bbNoError:
raise ValueError("Error closing device!")
self.log.info("Closed Device with handle num: %s", self.deviceHandle.value)
self.devOpen = False
[docs] def queryDeviceDiagnostics(self):
raise DeprecationWarning("This function is no longer supported in the 2.0 SignalHound API")
[docs] def getDeviceDiagnostics(self):
'''
Query signal-hound's physical state and hardware status.
Args:
No Args
Returns:
dictionary containing current temperature, USB Voltage, and current:
| {
| "temperature": <Internal temperature of the SignalHound in Degrees Celcius.>,
| "voltageUSB": <USB operating voltage, in volts. Acceptable ranges are 4.40 to 5.25 V.>,
| "currentUSB": <USB current draw, in mA. Acceptable ranges are 800 - 1000 mA>,
| }
The device temperature is updated in the API after each sweep is retrieved. The temperature is returned
in Celsius and has a resolution of 1/8 th of a degree. A temperature above 70 ° C or below 0 ° C indicates
your device is operating outside of its normal operating temperature, and may cause readings to be out
of spec, and may damage the device.
A USB voltage of below 4.4V may cause readings to be out of spec. Check your cable for damage and
USB connectors for damage or oxidation.
Will raise ``EnvironmentError`` for temperatures or voltages outside the allowable range.
Raw call ``BB_API bbStatus bbGetDeviceDiagnostics(int device, float *temperature, float *voltage1_8, float *voltage1_2, float *voltageUSB, float *currentUSB);``
'''
# self.log.info("Querying device diagnostics.")
temperature = ct.c_float(0)
voltageUSB = ct.c_float(0)
currentUSB = ct.c_float(0)
temperaturePnt = ct.pointer(temperature)
voltageUSBPnt = ct.pointer(voltageUSB)
currentUSBPnt = ct.pointer(currentUSB)
err = self.dll.bbGetDeviceDiagnostics(self.deviceHandle, temperaturePnt, voltageUSBPnt, currentUSBPnt)
if err == self.bbStatus["bbNoError"]:
pass
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
else:
raise IOError("Unknown error!")
ret = {
"temperature" : temperature.value,
"voltageUSB" : voltageUSB.value,
"currentUSB" : currentUSB.value
}
if ret["currentUSB"] < 4.4:
raise EnvironmentError("USB Supply voltage below specified minimum of 4.4V. Please check hardware. Read supply voltage = %f" % ret["currentUSB"])
if ret["temperature"] > 70 or ret["temperature"] < 0:
raise EnvironmentError("Hardware temperature outside of normal operating bounds.")
# self.log.info("Diagnostics queried. Values = \n%s", "\n".join([" {key}, {value}".format(key=key, value=value) for key, value in ret.iteritems()]))
return ret
[docs] def queryStreamInfo(self):
'''
Args:
No Args
Returns:
dictionary containing status information on the IQ data stream:
| {
| "return_len": <The number of IQ samples pairs which will be returned by calling ``bbFetchRaw()``.>,
| "samples_per_sec": <The number of IQ pairs to expect per second.>,
| "bandwidth": <The bandpass filter bandwidth, width in Hz. Width is specified by the 3dB rolloff points.>,
| }
Use this function to characterize the IQ data stream.
Will raise ``IOError`` If the device is not open, not streaming, or if an unknown error is encountered..
Raw call ``BB_API bbStatus bbQueryStreamInfo(int device, int *return_len, double *bandwidth, int *samples_per_sec);``
'''
return_len = ct.c_int(0)
bandwidth = ct.c_double(0)
samples_per_sec = ct.c_int(0)
return_lenPnt = ct.pointer(return_len)
bandwidthPnt = ct.pointer(bandwidth)
samples_per_secPnt = ct.pointer(samples_per_sec)
err = self.dll.bbQueryStreamInfo(self.deviceHandle, return_lenPnt, bandwidthPnt, samples_per_secPnt)
if err == self.bbStatus["bbNoError"]:
pass
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("The device specified is not currently streaming!")
else:
raise IOError("Unknown error!")
# The raw data array returned by fetchRaw when in streaming mode is the value of return_len * 2 (since each value is two floats)
self._rawDataArrSize = return_len.value * 2
values = {
"return_len" : return_len.value,
"samples_per_sec" : bandwidth.value,
"bandwidth" : samples_per_sec.value
}
return values
[docs] def initiate(self, mode, flag, gps_timestamp=False):
'''
Args:
mode: The possible values for mode are BB_SWEEPING, BB_REAL_TIME,
BB_ZERO_SPAN, BB_TIME_GATE, BB_RAW_SWEEP,
BB_RAW_SWEEP_LOOP and BB_AUDIO_DEMOD.
flag: The default value is zero.
If mode equals BB_ZERO_SPAN, flag can be used to denote the type of
modulation performed on the incoming signal. BB_DEMOD_AM and
BB_DEMOD_FM are the two options.
Flag can be used to inform the API to time
stamp data using an external GPS reciever. Mask the bandwidth flag (‘|’
in C) with BB_TIME_STAMP to achieve this. See Appendix:Using a GPS
Receiver to Time-Stamp Data for information on how to set this up.
gps_timestamp (bool): Timestamp using GPS
bbInitiate configures the device into a state determined by the mode parameter. For more information
regarding operating states, refer to the Theory of Operation and Modes of Operation sections. This
function calls bbAbort before attempting to reconfigure. It should be noted, if an error is returned, any
past operating state will no longer be active.
Pay special attention to the bbInvalidParameterErr description below
Raw Call: ``BB_API bbStatus bbInitiate(int device, unsigned int mode, unsigned int flag);``
'''
self.acq_conf["acq_mode"] = mode
self.acq_conf["acq_flag"] = flag
modeOpts = {
"sweeping" : hf.BB_SWEEPING,
"streaming" : hf.BB_STREAMING,
"real-time" : hf.BB_REAL_TIME,
"zero-span" : hf.BB_ZERO_SPAN,
"time-gate" : hf.BB_TIME_GATE,
"raw-sweep" : hf.BB_RAW_SWEEP,
"raw-sweep-loop" : hf.BB_RAW_SWEEP_LOOP,
"audio-demod" : hf.BB_AUDIO_DEMOD
}
zeroSpanOpts = {
"demod-am" : hf.BB_DEMOD_AM,
"demod-fm" : hf.BB_DEMOD_FM
}
if mode in modeOpts:
mode = modeOpts[mode]
else:
raise ValueError("Mode must be one of %s. Passed value was %s." % (modeOpts, mode))
if mode == hf.BB_ZERO_SPAN or mode == hf.BB_TIME_GATE:
raise NotImplementedError("Zero span and time-gate modes are not functional yet in the BB 2.0 API Version. Please contact signalhound for more information.")
if mode == hf.BB_ZERO_SPAN:
if flag in zeroSpanOpts:
flag = zeroSpanOpts[flag]
else:
raise ValueError("Available flag settings for mode \"zero-span\" are \"demod-am\" and \"demod-fm\". Passed value was %s." % flag)
# Checking for raw-pipe mode is messy, since it uses the same configuration value as the streaming mode.
elif self.acq_conf["acq_mode"] == "raw-pipe":
raise ValueError("Raw pipe mode is depreciated, and has been removed.")
else:
flag = 0
if mode == hf.BB_REAL_TIME:
if not "span_freq" in self.acq_conf:
raise ValueError("You must call configureCenterSpan() before initiate()!")
elif (self.acq_conf["span_freq"] > hf.BB60C_MAX_RT_SPAN or
self.acq_conf["span_freq"] < hf.BB_MIN_RT_SPAN ):
if not self._devType:
raise ValueError("Device type not detected? How did this even occur!")
elif self._devType == "BB60C":
if self.acq_conf["span_freq"] > hf.BB60C_MAX_RT_SPAN:
raise ValueError("Real-time mode maximum span frequency is 27 Mhz for the BB60C. Specified span frequency = %f" % self.acq_conf["span_freq"])
elif self._devType == "BB60A":
if self.acq_conf["span_freq"] > hf.BB60A_MAX_RT_SPAN:
raise ValueError("Real-time mode maximum span frequency is 20 Mhz for the BB60A. Specified span frequency = %f" % self.acq_conf["span_freq"])
if not "rbw" in self.acq_conf:
raise ValueError("You must call configureSweepCoupling() before initiate()!")
elif (self.acq_conf["rbw"] > hf.BB_MAX_RT_RBW or
self.acq_conf["rbw"] < hf.BB_MIN_RT_RBW):
raise ValueError("Invalid RBW for Real-time mode. Minimum RBW is %f, maximum RBW is %f. Specified RBW = %f" % (hf.BB_MIN_RT_RBW, hf.BB_MAX_RT_RBW, self.acq_conf["rbw"]))
if gps_timestamp:
self.log.info("Timestamping returned data with GPS time")
flag |= hf.BB_TIME_STAMP
mode = ct.c_uint(mode)
flag = ct.c_uint(flag)
err = self.dll.bbInitiate(self.deviceHandle, mode, flag)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to initiate succeeded.")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbInvalidParameterErr"]:
self.log.error("bbInvalidParameterErr!")
self.log.error('''In real-time mode, this value may be returned if the span limits defined in the API header are broken. Also in real-time mode, this error will be
returned if the resolution bandwidth is outside the limits defined in the API header.''')
self.log.error('''In time-gate analysis mode this error will be returned if span limits defined in the API header are broken. Also in time gate analysis, this
error is returned if the bandwidth provided require more samples for processing than is allowed in the gate length. To fix this, increase rbw/vbw.''')
raise IOError("The value for mode did not match any known value.")
elif err == self.bbStatus["bbAllocationLimitError"]:
self.log.error('''This value is returned in extreme circumstances. The API currently limits the amount of RAM usage to 1GB. When exceptional parameters are
provided, such as very low bandwidths, or long sweep times, this error may be returned. At this point you have reached the boundaries of the
device. The processing algorithms are optimized for speed at the expense of space, which is the reason this can occur.''')
raise IOError("Could not allocate sufficent RAM!")
elif err == self.bbStatus["bbBandwidthErr"]:
raise IOError("RBW is larger than your span. (Sweep Mode)!")
else:
raise IOError("Unknown error setting initiate! Error = %s" % err)
[docs] def fetchTrace(self):
'''
Args:
None
Returns:
dictionary containing the ``min`` and ``max`` arrays with eponymous keys.
Returns a minimum and maximum array of values relating to the current mode of operation. If the
detectorType provided in bbConfigureAcquisition is BB_AVERAGE, the arrays will contain identical
values. Element zero of each array corresponds to the startFreq returned from bbQueryTraceInfo.
Raw Call: ``BB_API bbStatus bbFetchTrace(int device, int arraySize, double *min, double *max);``
'''
try:
arraySize = self.traceLen
except AttributeError:
self.log.error("You must call queryTraceInfo atleast once before fetchTrace")
raise
maxArr = (ct.c_double * arraySize)()
minArr = (ct.c_double * arraySize)()
maxPtr = ct.pointer(maxArr)
minPtr = ct.pointer(minArr)
err = self.dll.bbFetchTrace(self.deviceHandle, arraySize, minPtr, maxPtr)
if err == self.bbStatus["bbNoError"]:
# self.log.info("Call to fetchTrace succeeded.") # Commented out because it was NOISY
self.sequentialADCErrors = 0 # There was no clipping, so reset the clipping integrator
pass
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
elif err == self.bbStatus["bbBufferTooSmallErr"]:
raise IOError("The 'arraySize' parameter passed is less than the trace size returned from 'bbQueryTraceInfo'.")
elif err == self.bbStatus["bbADCOverflow"]:
self.log.warning("Clipping is common on the first acquitition cycle, presumably due to the IF stages settling.")
self.log.warning("This error is only a problem if it occurs more then once and not at the immediate start of an acquisition, or immediately following a recalibration.")
self.sequentialADCErrors += 1
# Only throw an actual error if we've been clipping for a while.
# This way, transients won't break things (as fast, in any event).
if self.sequentialADCErrors > 10:
raise IOError("The ADC has detected clipping of the input signal for more then 10 sequential samples!")
elif err == self.bbStatus["bbNoTriggerFound"]:
raise IOError('''In time-gated analysis, if the spectrum returned is not representative of
the gate specified, this warning is returned.
In zero-span analysis, if the device is configured to anticipate a video or
external trigger, this warning is returned when the trigger condition has
not been met for this trace.''')
elif err == self.bbStatus["bbPacketFramingErr"]:
raise IOError("Data loss or miscommunication has occurred between the device and the API!")
elif err == self.bbStatus["bbDeviceConnectionErr"]:
raise IOError("Device connection issues were present in the acquisition of this sweep!")
else:
raise IOError("Unknown error setting fetchTrace! Error = %s" % err)
maxData = SignalHound.fastDecodeArray(maxArr, arraySize, np.double)
minData = SignalHound.fastDecodeArray(minArr, arraySize, np.double)
ret = {
"max" : maxData,
"min" : minData
}
return ret
[docs] def fetchAudio(self):
'''
Returns:
Numpy array of 4096 32-bit floating point values
If the device is initiated and running in the audio demodulation mode, the function is a blocking call
which returns the next 4096 audio samples. The approximate blocking time for this function is 128 ms if
called again immediately after returning. There is no internal buffering of audio, meaning the audio will
be overwritten if this function is not called in a timely fashion. The audio values are typically -1.0 to 1.0,
representing full-scale audio. In FM mode, the audio values will scale with a change in IF bandwidth.
Raw Call: ``BB_API bbStatus bbFetchAudio(int device, float *audio);``
'''
arraySize = 4096
audioArr = (ct.c_float * arraySize)()
audioArrPtr = ct.pointer(audioArr)
err = self.dll.bbFetchAudio(self.deviceHandle, audioArrPtr)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to fetchAudio succeeded.")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
elif err == self.bbStatus["bbDeviceConnectionErr"]:
raise IOError("Device connection issues were present in the acquisition of this sweep!")
else:
raise IOError("Unknown error setting fetchAudio! Error = %s" % err)
arr = np.ctypeslib.as_array(audioArr) # Map numpy array onto the same memory location as audioArr
#Note: Copy is *probably* not needed, as the memory location is owned by the python code, rather then the SignalHound API.
return arr.copy() # Copy, and return location.
[docs] def fetchRawCorrections(self):
'''
Returns:
| ``dict`` containing:
| - ``corrections`` 32-bit float array of length 2048. Correction values are decibel.
| - ``index`` Index into the corrections array where the correction data begins.
| - ``startFreq`` Frequency associated with the correction at index.
When this function returns successfully, the correction array will contain the frequency domain
correction constants for the given bandwidth chosen. The corrections are modified based on
temperature, gain, attenuation, and frequency. If any of these change, a new correction array should be
requested. The correction array will only be generated again on a new bbInitiate().
The correction arrays and returned values differ slightly depending on the 7 or 20 MHz bandwidth
chosen. Each one is described in depth below.
The correction array represents 40 MHz of bandwidth where frequencies outside the requested 20 MHz
are zeroed out. The first non-zero sample begins at corrections[index]. The frequency at this index is
startFreq. The bin size of each index is implied through 40 MHz divided by the length of the array,
(40.0e6 / 2048) = 19531.25 Hz. If an Fourier transform is applied on the IF data, the correction values
will line up with the usable 20 MHz bandwdith.
7MHz
The correction array represents 10 Mhz of bandwdith where the usable 7 MHz is centered and all values
outside the usable 7 MHz is zeroed. The index returned is the first non zero sample in the array. The
startFreq returned is the frequency of the first sample in the array, corrections[0]. Every other sample’s
frequency can be determined with the bin size. The bin size for this array is (10.0e6 / 2048) = 4882.8125
Hz. If a complex Fourier Transform is applied to the IQ data, the correction values will line up with the
usable 7 MHz bandwidth.
##Tips
Time domain corrections of the signal’s amplitude require two steps. First, an inverse Fourier Transform
must be performed on the entire correction array (including zero’ed portions). This results in a 4096
sample kernel. Second, the kernel is used in convolution with the time domain data. If a larger/smaller
kernel is desired, interpolate/extrapolate the correction array while it is in the frequency domain to the
desired length. Lengths which are powers of two are suggested.
Frequency domain correction of the signal’s amplitude requires you to first transform the raw data into
the frequency domain. Performing an Fourier transform on the incoming data will yeild a frequency
domain array that will align with the correction array. You can index the Transform results using the
index returned from this function if you wish or apply the whole array. Remember that the corrections
are in dB. If larger Transform sizes are desired, you can interpolate the correction array to the desired
size. (Be aware! This will change the index of the first non-zero correction, but the results of the FFT will
still align the with usable 20 MHz)
Raw Call: ``BB_API bbStatus bbFetchRawCorrections(int device, float *corrections, int *index, double *startFreq);``
'''
arraySize = 2048
corrArr = (ct.c_float * arraySize)()
corrArrPtr = ct.pointer(corrArr)
index = ct.c_int(0)
startFreq = ct.c_double(0)
indexPtr = ct.pointer(index)
startFreqPtr = ct.pointer(startFreq)
err = self.dll.bbFetchRawCorrections(self.deviceHandle, corrArrPtr, indexPtr, startFreqPtr)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to fetchRawCorrections succeeded.")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
else:
raise IOError("Unknown error setting fetchRawCorrections! Error = %s" % err)
ret = {
"data" : np.ctypeslib.as_array(corrArr),
"index" : index.value,
"startFreq" : startFreq.value
}
return ret
@classmethod
[docs] def getRawSweep_size(cls):
return ct.c_float, cls._rawDataArrSize
@classmethod
[docs] def getRawSweep_s_size(cls):
return ct.c_short, cls._rawDataArrSize
@classmethod
[docs] def getRawSweepTrig_size(cls):
return ct.c_int, cls._rawSweepTriggerArraySize
[docs] def fetchRawSweep(self):
'''
returns:
Numpy array of signed short integers
This function is used to collect a single sweep for a device configured in raw sweep mode. The length of
the buffer provided is determined by the settings used to configure the device for raw sweep mode. This
length can be determined using the equation.
Buffer-Length = 18688 * ppf * steps
If the function returns successfully the array will contain a full sweep. The shorts will
Raw Call: ``BB_API bbStatus bbFetchRawSweep(int device, short *buffer);``
'''
try:
bufLen = 18688 * self.acq_conf["ppf"] * self.acq_conf["steps"]
except AttributeError:
raise ValueError("You must call configureRawSweep before fetchRawSweep")
rawBuf = (ct.c_short * bufLen)()
rawBufPtr = ct.pointer(rawBuf)
err = self.dll.bbFetchRawSweep(self.deviceHandle, rawBufPtr)
if err == self.bbStatus["bbNoError"]:
pass # No print statements here. Too noisy
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
elif err == self.bbStatus["bbADCOverflow"]:
raise IOError("The ADC has detected clipping of the input signal!")
elif err == self.bbStatus["bbPacketFramingErr"]:
raise IOError("Data loss or miscommunication has occurred between the device and the API!")
elif err == self.bbStatus["bbDeviceConnectionErr"]:
raise IOError("Device connection issues were present in the acquisition of this sweep!")
else:
raise IOError("Unknown error setting fetchRawSweep! Error = %s" % err)
data = SignalHound.fastDecodeArray(rawBuf, bufLen, np.short)
return data
[docs] def startRawSweepLoop(self, callbackFunc):
'''
Args:
callbackFunc: Python function. Used as a callback to notify the user of
completed sweeps.
This function can be called after being configured and initiated in RAW_SWEEP_LOOP mode. The device
begins sweeping on the first call to this function after the device has been initiated. It is possible to call
this function multiple times per initiate to change the function call back used.
If this function returns successfully, the device begins sweeping immediately. The function provided is
set as the callback function used when a sweep is completed. sweep_callback is called once per sweep
completion. The function is passed two parameters, a pointer to the buffer of data for the sweep, and the
length of the buffer, both ``ctypes`` variables: ``(bufPtr, bufLen)``.
To properly decode the passed parameters, you should use the ``SignalHound.decodeRawSweep`` staticmethod.
This takes the two ctypes arguments in the order they are passed to the callback function, and returns
a python numpy array.
The data buffer will not be overwritten when in the function body of sweep_callback. The API will
maintain a circular list of buffers to store sweeps in. The API will store up to ¼ to ½ seconds worth of
sweeps depending on parameters. If the function body of sweep_callback exceeds this amount of time,
it is possible for the API to need to move ahead and skip over the buffer the user is still accessing. This
will cause a loss of data. It is recommended the function body of sweep_callback is short, preferably
simply copying the data from buffer into your own data structure. This ensures you receive every sweep
and make your own decisions on when to drop/ignore sweeps.
The sweep_callback function is not called in the main thread of execution. It is called once per sweep,
which can result in the function being called anywhere from 3-250 milliseconds. It is the responsibility of
the user to not index the buffer out of range. The buffer contents can be modified by the user only
during the function body of sweep_callback, once the function returns, the API is free to overwrite the
contents. Modifying the contents of the buffer not in the function body of sweep_callback is undefined.
The user should not attempt to manage any of the memory provided through the buffer pointers.
The device sweeps indefinitely until bbAbort or bbCloseDevice is called. When operation is suspended
via bbAbort, the device must be reconfigured and initiated again before calling this function.
Raw Call: ``BB_API bbStatus bbStartRawSweepLoop(int device, void(*sweep_callback)(short *buffer, int len));``
'''
if not callable(callbackFunc):
raise ValueError("You must pass a callable variable for the callback!")
callBackFactory = ct.WINFUNCTYPE(None, ct.POINTER(ct.c_short), ct.c_int)
self.cRawSweepCallbackFunc = callBackFactory(callbackFunc)
err = self.dll.bbStartRawSweepLoop(self.deviceHandle, self.cRawSweepCallbackFunc)
if err == self.bbStatus["bbNoError"]:
self.log.info("Started raw sweep loop.")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
else:
raise IOError("Unknown error in startRawSweepLoop!")
[docs] def queryTraceInfo(self):
'''
Returns:
| ``dict`` containing:
| "arr-size": The size of arrays returned by bbFetchTrace.
| "arr-bin-size": The frequency difference between two
| sequential bins in a returned sweep. In Zero-Span mode, binSize refers
| to the difference between sequential samples in seconds.
| "ret-start-freq": The frequency of the first bin in a
| returned sweep. In Zero-Span mode, start represents the exact center
| frequency used by the API.
This function should be called to determine sweep characteristics after a device has been configured
and initiated. For zero-span mode, startFreq and binSize will refer to the time domain values. In zero-
span mode startFreq will always be zero, and binSize will be equal to sweepTime/traceSize.
Note: Calling while in BB_RAW_PIPE mode will produce a bbDeviceNotConfiguredErr
Raw Call: ``BB_API bbStatus bbQueryTraceInfo(int device, unsigned int *traceLen, double *binSize, double *start);``
'''
# self.log.info("Querying device for trace information.")
traceLen = ct.c_uint(0)
traceLenPnt = ct.pointer(traceLen)
binSize = ct.c_double(0)
binSizePnt = ct.pointer(binSize)
start = ct.c_double(0)
startPnt = ct.pointer(start)
err = self.dll.bbQueryTraceInfo(self.deviceHandle, traceLenPnt, binSizePnt, startPnt)
if err == self.bbStatus["bbNoError"]:
# self.log.info("returned queryTraceInfo: %d, %f, %f" % (traceLen.value, binSize.value, start.value))
pass
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured, or in \"raw-pipe\" mode!")
else:
raise IOError("Unknown error in queryTraceInfo!")
self.traceLen = traceLen.value
return {"arr-size" : traceLen.value, "arr-bin-size" : binSize.value, "ret-start-freq" : start.value}
[docs] def queryStreamingCenter(self):
'''
Returns:
A Double containing the absolute center frequency of
the streaming device.
The function retrieves the center frequency of the 20 MHz IF bandwidth of a device currently initialized
in raw pipe mode. The center returned is representative of ¼ of the IF sample rate. The 20 MHz of usable
bandwidth is centered on this frequency.
Raw Call: ``BB_API bbStatus bbQueryStreamingCenter(int device, double *center);``
'''
self.log.info("Querying device for streaming center-freqency.")
center = ct.c_double(0)
centerPnt = ct.pointer(center)
err = self.dll.bbQueryStreamingCenter(self.deviceHandle, centerPnt)
if err == self.bbStatus["bbNoError"]:
self.log.info("returned streaming center-frequency: %f" % (center.value))
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
else:
raise IOError("Unknown error in queryStreamingCenter!")
return center.value
[docs] def queryTimestamp(self):
'''
Returns:
| Two-Tuple containing (in order):
| seconds Integer Seconds since midnight (00:00:00), January 1, 1970, coordinated
| universal time(UTC).
| nanoseconds Integer nanoseconds between seconds and seconds + 1
This function is used in conjunction with bbSyncCPUtoGPS and a GPS device to retrieve an absolute time
for a data packet in raw pipe mode. This function returns an absolute time for the last packet retrieved
from bbFetchRaw. See the Appendix:Code Examples for information on how to setup and interpret the
time information.
Raw Call: ``BB_API bbStatus bbQueryTimestamp(int device, unsigned int *seconds, unsigned int *nanoseconds);``
'''
self.log.info("Querying device for timestamp.")
seconds = ct.c_uint(0)
nanoseconds = ct.c_uint(0)
secondsPnt = ct.pointer(seconds)
nanosecondsPnt = ct.pointer(nanoseconds)
err = self.dll.bbQueryTimestamp(self.deviceHandle, secondsPnt, nanosecondsPnt)
if err == self.bbStatus["bbNoError"]:
self.log.info("returned timestamp values: Seconds - %d, nanoseconds - %d" % (seconds.value, nanoseconds.value))
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device not Configured!")
else:
raise IOError("Unknown error in queryTimestamp!")
return (seconds.value, nanoseconds.value)
[docs] def abort(self):
'''
Stops the device operation and places the device into an idle state.
Raw Call: ``BB_API bbStatus bbAbort(int device);``
'''
# cleanup state variables used in various modes.
self.acq_conf = {}
self.log.info("Stopping acquisition")
err = self.dll.bbAbort(self.deviceHandle)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to abort succeeded.")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbDeviceNotConfiguredErr"]:
raise IOError("Device was already idle! Did you call abort without ever calling initiate()?")
else:
raise IOError("Unknown error setting abort! Error = %s" % err)
[docs] def preset(self):
'''
This function exists to invoke a hard reset of the device. This will function similarly to a power
cycle(unplug/re-plug the device). This might be useful if the device has entered an undesirable or
unrecoverable state. Often the device might become unrecoverable if a program closed unexpectedly,
not allowing the device to close properly. This function might allow the software to perform the reset
rather than ask the user perform a power cycle.
Viewing the traces returned is often the best way to determine if the device is operating normally. To
utilize this function, the device must be open. Calling this function will trigger a reset which happens
after 2 seconds. Within this time you must call bbCloseDevice to free any remaining resources and
release the device serial number from the open device list. From the time of the bbPreset call, we
suggest 3 to more seconds of wait time before attempting to re-open the device.
Raw Call: ``BB_API bbStatus bbPreset(int device);``
'''
self.log.warning("Performing hardware-reset of device!")
self.log.warning("Please ensure you close the device handle within two seconds of this call!")
err = self.dll.bbPreset(self.deviceHandle)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to preset succeeded.")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
else:
raise IOError("Unknown error calling preset! Error = %s" % err)
[docs] def selfCal(self):
'''
This function causes the device to recalibrate itself to adjust for internal device temperature changes,
generating an amplitude correction array as a function of IF frequency. This function will explicitly call
bbAbort() to suspend all device operations before performing the calibration, and will return the device
in an idle state and configured as if it was just opened. The state of the device should not be assumed,
and should be fully reconfigured after a self-calibration.
Temperature changes of 2 degrees Celsius or more have been shown to measurably alter the
shape/amplitude of the IF. We suggest using bbQueryDiagnostics to monitor the device’s temperature
and perform self-calibrations when needed. Amplitude measurements are not guaranteed to be
accurate otherwise, and large temperature changes (10 ° C or more) may result in adding a dB or more of
error.
Because this is a streaming device, we have decided to leave the programmer in full control of when the
device in calibrated. The device is calibrated once upon opening the device through bbOpenDevice and is
the responsibility of the programmer after that.
Note:
After calling this function, the device returns to the default state. Currently the API does not retain state
prior to the calling of bbSelfCal(). Fully reconfiguring the device will be necessary.
Raw Call: ``BB_API bbStatus bbSelfCal(int device);``
'''
self.log.info("Performing self-calibration of device.")
err = self.dll.bbSelfCal(self.deviceHandle)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to selfCal succeeded.")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
else:
raise IOError("Unknown error calling selfCal! Error = %s" % err)
[docs] def syncCPUtoGPS(self, comPort, baudRate):
'''
Args:
comPort (integer): Com port number for the NMEA data output from the GPS reciever.
baudRate (integer): Baud Rate of the Com port
The connection to the COM port is only established for the duration of this function. It is closed when
the function returns. Call this function once before using a GPS PPS signal to time-stamp RF data. The
synchronization will remain valid until the CPU clock drifts more than ¼ second, typically several hours,
and will re-synchronize continually while streaming data using a PPS trigger input.
This function calculates the offset between your CPU clock time and the GPS clock time to within a few
milliseconds, and stores this value for time-stamping RF data using the GPS PPS trigger. This function
ignores time zone, limiting the calculated offset to +/- 30 minutes. It was tested using an FTS 500 from
Connor Winfield at 38.4 kbaud. It uses the “$GPRMC” string, so you must set up your GPS to output this
string.
Raw Call: ``BB_API bbStatus bbSyncCPUtoGPS(int comPort, int baudRate);``
'''
self.log.warning("GPS Synchronization not yet verified.")
self.log.warning("e-mail Connor at connorw@imaginaryindustries.com if you have issues or comments")
self.log.info("Attempting to synchronize CPU with GPS timebase.")
comPort = ct.c_int(comPort)
baudRate = ct.c_int(baudRate)
err = self.dll.bbSyncCPUtoGPS(comPort, baudRate)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to syncCPUtoGPS succeeded.")
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbGPSErr"]:
raise IOError("Could not connect to GPS!")
else:
raise IOError("Unknown error synchronizing with GPS! Error = %s" % err)
[docs] def getDeviceType(self):
'''
Returns:
| Ascii string containing device type:
| - "No device"
| - "BB60A"
| - "BB60C"
| - "BB124"
This function may be called only after the device has been opened. If the device successfully opened,
type will contain the model type of the device pointed to by handle.
Raw Call: ``BB_API bbStatus bbGetDeviceType(int device, int *type);``
'''
self.log.info("Querying device for model information")
devType = ct.c_uint(0)
devTypePnt = ct.pointer(devType)
err = self.dll.bbGetDeviceType(self.deviceHandle, devTypePnt)
if err == self.bbStatus["bbNoError"]:
pass
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
else:
raise IOError("Unknown error setting getDeviceType! Error = %s" % err)
if devType.value == hf.BB_DEVICE_NONE:
dev = "No device"
elif devType.value == hf.BB_DEVICE_BB60A:
dev = "BB60A"
elif devType.value == hf.BB_DEVICE_BB60C:
dev = "BB60C"
elif devType.value == hf.BB_DEVICE_BB124A:
dev = "BB124"
else:
raise ValueError("Unknown device type!")
self.log.info("Call to getDeviceType succeeded. Type = %s" % dev)
return dev
[docs] def getSerialNumber(self):
'''
Returns: Device serial number as a integer.
This function may be called only after the device has been opened. The serial number returned should
match the number on the case.
Raw Call: ``BB_API bbStatus bbGetSerialNumber(int device, unsigned int *sid);``
'''
self.log.info("Querying device for serial number.")
serialNo = ct.c_uint(0)
serialNoPnt = ct.pointer(serialNo)
err = self.dll.bbGetSerialNumber(self.deviceHandle, serialNoPnt)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to getSerialNumber succeeded. Value = %s" % serialNo.value)
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
else:
raise IOError("Unknown error setting getSerialNumber! Error = %s" % err)
return serialNo.value
[docs] def getFirmwareVersion(self):
'''
Returns: Device firmware rev as a integer.
Use this function to determine which version of firmware is associated with the specified device.
Raw Call: ``BB_API bbStatus bbGetFirmwareVersion(int device, int *version);``
'''
self.log.info("Querying device for firmware version.")
firmwareRev = ct.c_uint(0)
firmwareRevPnt = ct.pointer(firmwareRev)
err = self.dll.bbGetFirmwareVersion(self.deviceHandle, firmwareRevPnt)
if err == self.bbStatus["bbNoError"]:
self.log.info("Call to getFirmwareVersion succeeded. Value = %s" % firmwareRev.value)
elif err == self.bbStatus["bbDeviceNotOpenErr"]:
raise IOError("Device not open!")
elif err == self.bbStatus["bbNullPtrErr"]:
raise IOError("Null pointer error!")
else:
raise IOError("Unknown error setting getFirmwareVersion! Error = %s" % err)
return firmwareRev.value
[docs] def getAPIVersion(self):
'''
Returns: Device API version as an ascii string.
The returned string is of the form
major.minor.revision
Ascii periods (“.”) separate positive integers. Major/Minor/Revision are
not gauranteed to be a single decimal digit. The string is null
terminated. An example string is below ..
``[ ‘1’ | ‘.’ | ‘2’ | ‘.’ | ‘1’ | ‘1’ | ‘\0’ ] = “1.2.11”``
Raw Call: ``BB_API const char* bbGetAPIVersion();``
'''
self.log.info("Querying API for revision information.")
self.dll.bbGetAPIVersion.restype = ct.c_char_p # Tell ctypes this function returns a pointer to a string
apiRevStr = self.dll.bbGetAPIVersion(self.deviceHandle)
ret = ct.c_char_p(apiRevStr).value # Dereference pointer, extract string
self.log.info("Device firmware rev = %s" % ret)
return ret
[docs] def getErrorString(self, errCode):
'''
Args:
errCode (integer): Error code value
Returns: Ascii string containing human-readable version of the error code.
Produce an ascii string representation of a given status code. Useful for debugging.
Probably not really needed, since I'm doing error decoding locally in each function.
This /should/ be of type bbStatus. bbStatus is an enum with hard-coded values, so I'm being lazy, and just using
an int. It works well enough.
Raw Call: ``BB_API const char* bbGetErrorString(bbStatus status);``
'''
serialNo = ct.c_int(errCode)
self.dll.bbGetAPIVersion.restype = ct.c_char_p # Tell ctypes this function returns a pointer to a string
apiRevStr = self.dll.bbGetAPIVersion(self.deviceHandle, serialNo)
return ct.c_char_p(apiRevStr).value # Dereference pointer, extract string, return it.
[docs] def getCurrentAcquisitionSettings(self):
'''
Return a dictionary containing the return values from ``queryTraceInfo()``, ``getDeviceDiagnostics()`` and the current ``acq_conf``
If there is no running acquisition, ``queryTraceInfo()`` will be defaulted to ``{}``
'''
try:
tmp = self.queryTraceInfo()
except IOError:
tmp = {}
tmp.update(self.getDeviceDiagnostics())
tmp.update(self.acq_conf)
return tmp
# staticmethod, because it's only usefull for dealing with the SignalHound stuff, and yet it should be accessible easily for stuff like callbacks where you don't have.
# easy access to the instantiated class pointer
@staticmethod
[docs] def decodeRawSweep(bufPtr, bufLen):
'''
Args:
bufPtr (pointer to buffer): Pointer to a C buffer containing a sweep dataset
buflen (integer buffer size): Size of the data in ``bufPtr``
Decode a C array into a numpy-array using buffer casts. Assumes the values in the buffer are of datatype ``np.short``
Assumed array size is ``sizeof(np.short) * buflen`` bytes, or effectively 2 * bufLen.
Returns:
Numpy array containing contents of buffer
Note: This function copies the data from the array, so it is valid even if the memory underlying the ``bufPtr`` is subsequently
deallocated. This is intended for handling contexts like the callback, where once the callback returns, the SignalHound memory
management may reuse or free the underlying buffer. Since the copied array will be managed by the python memory manager, it
is safe to preserve beyond the scope of a calling function.
'''
bufAdr = ct.addressof(bufPtr.contents)
arr = np.frombuffer(int_asbuffer(bufAdr, bufLen * np.short().nbytes), dtype=np.short) # Map array memory as a numpy array.
arr = arr.copy() # Then copy it, so our array won't get modified when the circular buffer overwrites itself.
# We have to copy() since the call normally just returns a array that is overlaid onto the pre-existing data
return arr
@staticmethod
[docs] def fastDecodeArray(ctBuff, buffLen, dtype):
'''
Args:
bufPtr (pointer to buffer): Pointer to a C buffer containing a sweep dataset
buflen (integer buffer size): Size of the data in ``bufPtr``
dtype (numpy data-type): Datatype of values in array.
Decode a C array into a numpy-array using buffer casts.
Assumed array size is ``sizeof(dtype) * buflen`` bytes.
Returns:
Numpy array containing contents of buffer
Note: This function copies the data from the array, so it is valid even if the memory underlying the ``bufPtr`` is subsequently
deallocated. This is intended for handling contexts like the callback, where once the callback returns, the SignalHound memory
management may reuse or free the underlying buffer. Since the copied array will be managed by the python memory manager, it
is safe to preserve beyond the scope of a calling function.
'''
bufAdr = ct.addressof(ctBuff)
arr = np.frombuffer(int_asbuffer(bufAdr, buffLen * dtype().nbytes), dtype=dtype) # Map array memory as a numpy array.
arr = arr.copy() # Then copy it, so our array won't get modified when the circular buffer overwrites itself.
# We have to copy() since the call normally just returns a array that is overlaid onto the pre-existing data
return arr