"""
spectrum_frame.py
Spectrum Analysis frame class for PyGPSClient application.
This handles a frame containing a spectrum analysis chart from
a MON-SPAN message.
Created on 23 Dec 2022
:author: semuadmin (Steve Smith)
:copyright: 2020 semuadmin
:license: BSD 3-Clause
"""
# pylint: disable=no-member, unused-argument
import logging
from tkinter import ALL, CENTER, EW, NSEW, NW, Checkbutton, Frame, IntVar, N, S, W
from types import NoneType
from pyubx2 import UBXMessage
from pygpsclient.canvas_subclasses import (
TAG_DATA,
TAG_GRID,
TAG_WAIT,
TAG_XLABEL,
TAG_YLABEL,
CanvasGraph,
)
from pygpsclient.globals import (
BGCOL,
FGCOL,
GNSS_LIST,
MAXWAIT,
PLOTCOLS,
PNTCOL,
SPECTRUMVIEW,
WIDGETU2,
)
from pygpsclient.helpers import setubxrate
from pygpsclient.strings import DLGNOMONSPAN, DLGWAITMONSPAN
# Graph dimensions
OL_WID = 1
MIN_DB = 0
MAX_DB = 200
MIN_HZ = 1.1e9
MAX_HZ = 1.70e9
RF_FREQS = {
# "INM": 1536000000,
"B3": 1268520000,
"B2I": 1207140000,
"B2a": 1176450000,
"B1C": 1575420000,
"B1I": 1561098000,
"E6": 1278750000,
"E5b": 1207140000,
"E5a": 1176450000,
"E1": 1575420000,
"G3": 1207140000,
"G2": 1246000000,
"G1": 1602000000,
"L6": 1278750000,
"L5": 1176450000,
"L2": 1227600000,
"L1": 1575420000,
}
RF_LIST = {
0: PLOTCOLS[0],
1: PLOTCOLS[1],
2: PLOTCOLS[2],
3: PLOTCOLS[3],
4: "#1E90FF",
5: "#FF8C00",
}
RF_LIST_SNAPSHOT = {
0: "#BFBFBF",
1: "#999999",
2: "#A6A6A6",
3: "#7F7F7F",
4: "#8C8C8C",
5: "#666666",
}
ACTIVE = ""
MODEINIT = "init"
MODELIVE = "live"
MODESNAP = "snap"
GHZ = 1e9
FONTSCALE = 35
[docs]
class SpectrumviewFrame(Frame):
"""
Spectrumview frame class.
"""
[docs]
def __init__(self, app: Frame, parent: Frame, *args, **kwargs):
"""
Constructor.
:param Frame app: reference to main tkinter application
:param Frame parent: reference to parent frame
:param args: optional args to pass to Frame parent class
:param kwargs: optional kwargs to pass to Frame parent class
"""
self.__app = app # Reference to main application class
self.__master = self.__app.appmaster # Reference to root class (Tk)
self.logger = logging.getLogger(__name__)
super().__init__(parent, *args, **kwargs)
def_w, def_h = WIDGETU2
self.width = kwargs.get("width", def_w)
self.height = kwargs.get("height", def_h)
self._mindb = MIN_DB
self._maxdb = MAX_DB
self._minhz = MIN_HZ
self._maxhz = MAX_HZ
self._pending_confs = {}
self._showrf = True
self._chartpos = None
self._spectrum_snapshot = []
self._pgaoffset = IntVar()
self._waits = 0
self._redraw = True
self._waiting = True
self._body()
self._attach_events()
self.enable_messages(True)
def _body(self):
"""
Set up frame and widgets.
"""
self.grid_columnconfigure(0, weight=1)
self.grid_rowconfigure(0, weight=1)
self.grid_rowconfigure(1, weight=0)
self._canvas = CanvasGraph(
self.__app, self, width=self.width, height=self.height, bg=BGCOL
)
self.chk_pgaoffset = Checkbutton(
self,
text="PGA Offset",
fg=PNTCOL,
bg=BGCOL,
variable=self._pgaoffset,
anchor=W,
)
self._canvas.grid(column=0, row=0, columnspan=3, sticky=NSEW)
self.chk_pgaoffset.grid(column=0, row=1, sticky=EW)
def _attach_events(self):
"""
Bind events to frame.
"""
self.bind("<Configure>", self._on_resize)
self._canvas.bind("<Button-1>", self._on_click)
self._canvas.bind("<Double-Button-1>", self._on_toggle_rf)
self._canvas.bind("<Button-2>", self._on_snapshot)
self._canvas.bind("<Button-3>", self._on_snapshot)
self._canvas.bind("<Double-Button-2>", self._on_clear_snapshot)
self._canvas.bind("<Double-Button-3>", self._on_clear_snapshot)
self._pgaoffset.trace_add(("write", "unset"), self._on_update_pga)
[docs]
def reset(self):
"""
Reset spectrumview frame.
"""
self.__app.gnss_status.spectrum_data = []
self._chartpos = None
self._pgaoffset.set(0)
self._canvas.delete(ALL)
self.update_frame()
def _on_update_pga(self, var, index, mode):
"""
Action on updating pga flag.
"""
if self._pgaoffset.get():
self._maxdb = MAX_DB + 50
else:
self._maxdb = MAX_DB
self._redraw = True
[docs]
def enable_messages(self, status: bool):
"""
Enable/disable UBX MON-SPAN message.
:param bool status: 0 = off, 1 = on
"""
setubxrate(self.__app, "MON-SPAN", status)
for msgid in ("ACK-ACK", "ACK-NAK"):
self._set_pending(msgid, SPECTRUMVIEW)
def _set_pending(self, msgid: int, ubxfrm: int):
"""
Set pending confirmation flag for Spectrumview frame to
signify that it's waiting for a confirmation message.
:param int msgid: UBX message identity
:param int ubxfrm: integer representing UBX configuration frame (0-6)
"""
self._pending_confs[msgid] = ubxfrm
[docs]
def update_pending(self, msg: UBXMessage):
"""
Receives polled confirmation message from the ubx_handler and
updates spectrumview canvas.
:param UBXMessage msg: UBX config message
"""
pending = self._pending_confs.get(msg.identity, False)
if pending and msg.identity == "ACK-NAK":
self.reset()
w, h = self.width, self.height
self._canvas.create_text(
w / 2,
h / 2,
text=DLGNOMONSPAN,
fill="orange",
anchor=S,
)
self._pending_confs.pop("ACK-NAK")
if self._pending_confs.get("ACK-ACK", False):
self._pending_confs.pop("ACK-ACK")
[docs]
def update_frame(self):
"""
Plot MON-SPAN spectrum analysis.
spectrum_data is list of tuples (spec, spn, res, ctr, pga),
one item per RF block.
"""
rfblocks = self.__app.gnss_status.spectrum_data
if len(rfblocks) == 0:
if self._waits >= MAXWAIT:
self._canvas.create_alert(DLGNOMONSPAN, tags=TAG_WAIT)
else:
self._waits += 1
return
self._waits = 0
self._waiting = False
self._update_plot(rfblocks)
if self._spectrum_snapshot != []:
self._update_plot(self._spectrum_snapshot, MODESNAP, RF_LIST_SNAPSHOT)
[docs]
def init_frame(self):
"""
Initialise spectrum chart.
"""
# only redraw the tags that have changed
tags = (TAG_GRID, TAG_XLABEL, TAG_YLABEL, TAG_WAIT) if self._redraw else ()
# draw graph axes and labels
self._canvas.create_graph(
xdatamax=self._maxhz / GHZ,
xdatamin=self._minhz / GHZ,
ydatamax=(self._maxdb,),
ydatamin=(self._mindb,),
xtickmaj=10,
ytickmaj=10,
xdp=2,
ydp=(0,),
xlegend="GHz",
xcol=FGCOL,
ylegend=("dB",),
ycol=(FGCOL,),
xlabels=True,
ylabels=True,
fontscale=FONTSCALE,
tags=tags,
)
self._redraw = False
def _update_plot(
self, rfblocks: list, mode: str = MODELIVE, colors: dict | NoneType = None
):
"""
Update spectrum plot with live or snapshot rf block data.
:param list rfblocks: array of spectrum rf blocks
:param dict colors: dictionary of color for each rf block
:param str mode: plot mode ("live" or "snap"shot)
"""
if colors is None:
colors = RF_LIST
specxy, self._minhz, self._maxhz = self._get_limits(rfblocks)
if mode == MODESNAP:
self._canvas.delete(MODESNAP)
else:
self.init_frame()
# plot frequency bands
if self._showrf:
self._plot_RF_FREQS(mode)
# for each RF block in MON-SPAN message
for i, rfblock in enumerate(specxy):
rf = len(specxy) - i - 1
col = colors[rf % len(colors)]
# draw legend for this RF block
self._plot_rf_legend(col, mode, rf, i)
# plot spectrum for this RF block
hz2 = self._minhz / GHZ
db2 = self._mindb
for n, (hz, db) in enumerate(rfblock):
hz1, db1 = hz2, db2
hz2, db2 = hz / GHZ, db
if n:
self._canvas.create_gline(
hz1,
db1,
hz2,
db2,
fill=col,
width=OL_WID,
tags=(mode, TAG_DATA),
)
self.update_idletasks()
# display any marked db/hz coordinate
if self._chartpos is not None:
self._plot_marker(mode)
def _plot_rf_legend(self, col: str, mode: str, rf: int, index: int):
"""
Draw RF block legend(s)
:param str col: color
:param str mode: plot or snapshot
:param int rf: RF block
:param int i: RF block index
"""
rfw = self.width / 10
x1 = (
self.width
- self._canvas.xoffr
- rfw * (index + 1)
- (index * self._canvas.fnth)
)
y = 3 # self._canvas.yofft * 2
x2 = x1 + rfw
if mode == MODESNAP:
y += self._canvas.fnth
else:
self._canvas.create_text(
(x1 + x2) / 2,
y,
text=f"RF {rf + 1}",
fill=FGCOL,
font=self._canvas.font,
anchor=N,
tags=(mode, TAG_XLABEL),
)
self._canvas.create_line(
x1,
y + self._canvas.fnth,
x2,
y + self._canvas.fnth,
fill=col,
width=OL_WID,
tags=(mode, TAG_XLABEL),
)
def _plot_RF_FREQS(self, mode: str):
"""
Plot RF frequency markers
:param int mode: plot or snapshot
"""
for nam, frq in RF_FREQS.items():
if self._minhz < frq < self._maxhz:
yoff, col = {
"L": (self._canvas.fnth, GNSS_LIST[0][1]), # GPS
"G": (self._canvas.fnth * 2, GNSS_LIST[6][1]), # GLONASS
"E": (self._canvas.fnth * 3, GNSS_LIST[2][1]), # Galileo
"B": (self._canvas.fnth * 4, GNSS_LIST[3][1]), # Beidou
"I": (self._canvas.fnth * 5, "#FF83FA"), # INMARSAT
}[nam[0:1]]
if nam not in (
"E5b",
"E1",
"B2I",
"B1C",
): # same freq as other bands
self._canvas.create_gline(
frq / GHZ,
self._mindb,
frq / GHZ,
self._maxdb,
fill=col,
dash=(5, 2),
width=OL_WID,
tags=TAG_XLABEL,
)
x, y = self._canvas.d2xy(frq / GHZ, self._mindb)
self._canvas.create_text(
x + 2,
y - yoff - 1,
text=nam,
fill=col,
anchor=NW,
font=self._canvas.font,
tags=TAG_XLABEL,
)
def _plot_marker(self, mode):
"""
Plot saved db/hz coordinate marker.
:param str mode: plot or snapshot
"""
hz, db = self._chartpos
x, y = self._canvas.d2xy(hz, db)
self._canvas.create_text(
x,
y,
text=f"{hz:.3f} GHz\n{db:.1f} dB",
fill=FGCOL,
font=self._canvas.font,
anchor=CENTER,
tags=(TAG_XLABEL, mode),
)
def _get_limits(self, rfblocks: list) -> tuple:
"""
Get axis limits for all RF blocks and convert
spectrum arrays to (x,y) arrays.
Frequencies expressed as GHz.
:param list rfblocks: RF Blocks
:return: tuple of points and axis limits
:rtype: tuple
"""
minhz = 999 * 1e9
maxhz = 0
specxy = []
# for each RF block in MON-SPAN message
for i, rfblock in enumerate(rfblocks):
spec, spn, res, ctr, pga = rfblock
minhz = int(min(minhz, ctr - res * (spn / res) / 2))
maxhz = int(max(maxhz, ctr + res * (spn / res) / 2))
spanhz = []
for i, db in enumerate(spec):
if self._pgaoffset.get():
db += pga # compensate for programmable gain
hz = int(ctr - (spn / 2) + (res * i))
spanhz.append((hz, db))
specxy.append(spanhz)
return (
specxy,
int(min(minhz, MIN_HZ)),
int(max(maxhz, MAX_HZ)),
)
def _on_click(self, event):
"""
Save flagged chart position.
"""
self._chartpos = self._canvas.xy2d(event.x, event.y)
self._redraw = True
def _on_toggle_rf(self, event): # pylint: disable=unused-argument
"""
Toggle RF band markers on/off.
"""
self._showrf = not self._showrf
self._chartpos = None
self._redraw = True
def _on_snapshot(self, event): # pylint: disable=unused-argument
"""
Capture snapshot of current spectrum.
"""
self._spectrum_snapshot = self.__app.gnss_status.spectrum_data
self._redraw = True
def _on_clear_snapshot(self, event): # pylint: disable=unused-argument
"""
Clear snapshot of current spectrum.
"""
self._spectrum_snapshot = []
self._canvas.delete("snap")
self._redraw = True
def _on_resize(self, event): # pylint: disable=unused-argument
"""
Resize frame.
:param event event: resize event
"""
self.width, self.height = self.get_size()
self._chartpos = None
self._redraw = True
self._on_waiting()
def _on_waiting(self):
"""
Display 'waiting for data' alert.
"""
if self._waiting:
txt = DLGNOMONSPAN if self._waits >= MAXWAIT else DLGWAITMONSPAN
self._canvas.create_alert(txt, tags=TAG_WAIT)
[docs]
def get_size(self):
"""
Get current canvas size.
:return: window size (width, height)
:rtype: tuple
"""
self.update_idletasks() # Make sure we know about any resizing
return self._canvas.winfo_width(), self._canvas.winfo_height()