#!/usr/bin/python
# tz_pulse_server.py
# --copyright-- Copyright 2011 (C) Tranzoa, Co. All rights reserved. Warranty: You're free and on your own here. This code is not necessarily up-to-date or of public quality.
# --url-- http://www.tranzoa.net/tzpython/
# --email-- pycode is the name to send to. tranzoa.com is the place to send to.
# --bodstamps--
# October 4, 2011 bar
# October 8, 2011 bar put the log in the current dir
# October 10, 2011 bar finger samples
# print latest clients
# spoofer
# get rid of the .py in the log file name
# October 11, 2011 bar we only need so many spoof samples. Don't overdo it.
# restart writing to a new file every hour
# November 7, 2011 bar close log in main
# November 10, 2011 bar log 4-bit Y mismatches
# --flush
# November 11, 2011 bar pass verbose to the server
# November 13, 2011 bar show_samples
# November 14, 2011 bar typo
# November 28, 2011 bar client .when is not elapsed_time()
# November 29, 2011 bar pyflake cleanup
# January 10, 2012 bar don't fuss about an unopenable port if it's given in argv
# March 13, 2012 bar dance around a serial numbered cardiox cp2102 serial to USB converter
# May 27, 2012 bar doxygen namespace
# February 18, 2015 bar print the time when the finger goes in and out
# January 18, 2016 bar except syntax change
# June 8, 2017 bar gray the background of the graph when finger is out
# June 9, 2017 bar put saturation and bpm on the page in color
# September 30, 2017 bar put tick marks on the graph
# darker no-finger background for the phone, roam
# November 27, 2017 bar remove goof error msg
# wrongly use table for layout (a middle column can't be stretched)
# goof up spoofed bpm and oxi values
# November 29, 2017 bar another try at instantaneous bpm
# December 2, 2017 bar vert line at bpm in instantaneous bpm
# and ticks and legend hint in ibpm
# bring the instant bpm bottom limit down to 0 and top to 180
# --hist_size --hist_range
# December 3, 2017 bar sample when's
# December 7, 2017 bar don't paint the sample_when when he's holding the display still
# January 9, 2018 bar clear the histogram when finger in/out
# lighten up on our cpu usage when no one is connected
# January 25, 2018 bar just set the on-screen time/date for real-time samples to the current time/date
# March 20, 2018 bar paint the BPM near the current BPM on the histogram
# November 5, 2019 bar send down the peak-peak amplitude and paint it, sorta
# histogram explanation
# November 13, 2019 bar put a little new-tab icon on the help link
# December 16, 2019 bar spoofing: add new files as needed to the end of the samples and use them. don't start at the top, again
# N key
# simplify the spoof file reading logic
# December 17, 2019 bar there was a reason why we lopped too many samples off the end, rather than from the beginning of a file
# --write_graph
# February 25, 2023 bar remove pyflakes fuss
# March 3, 2023 bar python3
# March 4, 2023 bar cmd line opt for log file name - like --how_long, for tz_lib_test.py
# --eodstamps--
## \file
# \namespace tzpython.tz_pulse_server
#
#
# This is a web server for streaming real time pulse data for graphing.
#
#
from __future__ import print_function
import json
import os
import random
import re
import sys
import tzlib
import tz_cms50
import tz_http_server
import tz_stream_graph_server
LOG_FILE_NAME = os.path.splitext(os.path.basename(__file__))[0] + ".log"
NO_FINGER_COLOR = "#e0e0e0"
FINGER_COLOR = "#ffffff"
BG_COLOR = "#ffffe0"
BPM_COLOR = "blue"
BPM_BG_COLOR = BG_COLOR
OXI_COLOR = "yellow"
OXI_BG_COLOR = "#800080"
def set_main_page(me, title = None, mn_ibpm = None, mx_ibpm = None) :
title = title or "Pulse"
if mn_ibpm == mx_ibpm :
mn_ibpm = 'Math.min.apply(null, kys)'
mx_ibpm = 'Math.max.apply(null, kys)'
else :
mn_ibpm = str(mn_ibpm)
mx_ibpm = str(mx_ibpm)
# protocol = "http://"
url = '' # "%s%s:%d" % ( protocol, tz_http_server.get_local_ip_address(), me.http_port )
tz_stream_graph_server.MAIN_PAGE = tz_stream_graph_server.MAIN_PAGE.replace('
', """
"""
)
me.main_file = tz_stream_graph_server.MAIN_PAGE % ( url, title, title, url, 'pulse' )
me.main_file = re.sub(r"", "" % BG_COLOR, me.main_file)
me.main_file = re.sub(r"(?s).*?
", """
Well, I do seem to be having a hard time starting up. I blame global bugs.
|
|
lkj%% |
bpm |
|
|
|
|
|
|
|
|
|
""" % ( BPM_COLOR, BPM_BG_COLOR, OXI_COLOR, OXI_BG_COLOR, mn_ibpm, mx_ibpm, OXI_COLOR, OXI_COLOR, ), me.main_file)
class a_spoofer(object) :
def __init__(me, file_names = []) :
me.org_fns = {}
me.file_names = {}
me.add_files(file_names)
me.samples = []
me.si = 0
me.start_now()
def add_files(me, file_names = []) :
fns = tzlib.make_dictionary(file_names or []) # this adds a value of True keyed by each name
me.org_fns.update(fns)
me.file_names.update(fns)
def start_now(me) :
me.when = tzlib.elapsed_time()
def get_new_samples(me) :
me.si = len(me.samples) # this forces the issue
def get_sample(me) :
t = tzlib.elapsed_time()
if t - me.when >= 1.0 / tz_cms50.SAMPLE_RATE :
me.when += 1.0 / tz_cms50.SAMPLE_RATE # time to feed the caller a sample
if me.si >= len(me.samples) : # all out of samples we've got queued up?
me.samples = []
me.si = 0
plen = -1
while len(me.samples) < tz_cms50.SAMPLE_RATE * 1823 :
if not len(me.file_names) :
if len(me.samples) <= plen :
break # kick out if we can't load any samples from any files we know
plen = len(me.samples)
me.file_names.update(me.org_fns) # start over again if we've used all the files
if not len(me.file_names) :
break # give up - we know no files
pass
fn = random.choice(list(me.file_names.keys()))
del(me.file_names[fn]) # don't choose this file again
r = tz_cms50.a_recording.parse_file(fn)
if r :
sa = [ s for s in r.samples if hasattr(s, 'y') ]
if len(sa) > tz_cms50.SAMPLE_RATE * 100 : # ignore files under a 100 seconds
me.samples += sa
print("Spoofing", fn, len(sa), len(me.samples))
break
pass
pass
me.samples = me.samples[ : tz_cms50.SAMPLE_RATE * 4000 ] # don't show too many samples from a file - but do show the 1st ones so the user sees the samples from the top of the file
if me.si < len(me.samples) :
s = me.samples[me.si]
me.si += 1
return(s)
pass
return(None)
# a_spoofer
def add_extra_to_finger_stream(st, visibility = None, color = None, oxi = None, bpm = None, title = None, hist = None, when = None) :
"""
Add extra information to the stream headed to the client.
In particular, we set the graph's color, and the BPM and saturation values.
"""
ws = (when and ('
' + ' ' + time.asctime(time.localtime(when)) + '
')) or ""
if False and hist :
mn = min([ min(h) for h in hist.values() ])
mx = max([ max(h) for h in hist.values() ])
print("@@@@", mn, mx, mx - mn)
if visibility and oxi :
ovs = """document.getElementById('bpmcont').style.visibility = '%s';\n""" % visibility
else :
ovs = ""
if visibility and bpm :
rvs = """document.getElementById('satcont').style.visibility = '%s'\n;""" % visibility
else :
rvs = ""
if color :
color = """document.getElementById('graph').style.backgroundColor = '%s';\n""" % color
else :
color = ""
if oxi :
oxi = '%u' % oxi
else :
oxi = ''
if bpm :
bpm = '%u' % bpm
else :
bpm = ''
if title :
title = """
document.getElementById('title' ).innerHTML = "%s";
document.getElementById('title_txt').innerHTML = "%s";
""" % ( title, title, )
else :
title = ""
st.extra_js("""%s%s%s%s
document.getElementById('oxi').innerHTML = "%s";
document.getElementById('bpm').innerHTML = "%s";
paint_hist(%s);
if (stream_grapher.set_painting())
{
document.getElementById('sample_when').innerHTML = "%s";
}
""" % ( color, ovs, rvs, title, oxi, bpm, json.dumps(hist), ws, )
)
pass
class a_hister(object) :
def __init__(me, size = None) :
me.size = size or 120
me.clear()
def append(me, v) :
r = me.pkr.append(v)
if r[0] != None :
me.peaks.append(r)
if len(me.peaks) >= 3 :
# Queue up the information that our javascript will read in to 'hist' on the client side.
# The 1st value in the array is to allow the display to age the item in the histogram (to make 'em go dim as they age out).
# The 2nd is to allow paint_hist() to paint the items of varying lengths depending on amplitude of the heart beat.
d = me.peaks[-1][1] - me.peaks[-3][1] # duration
a = abs(me.peaks[-1][0] - me.peaks[-2][0]) # aplitude
me.fifo.append(3600.0 / d, [ me.fifo.total_item_count(), a ])
while len(me.peaks) > 3 :
del(me.peaks[0])
pass
return(((me.fifo.count() >= 12) and me.fifo.picture()) or None)
def clear(me) :
me.pkr = tzlib.a_peak_finder()
me.peaks = []
me.fifo = tzlib.a_fifo_histogram(me.size)
# a_hister
help_str = """
python %s (options) (output_file_name)
Options:
--hist_range low high Set the histogram range.
Default: %u %u
0 0 auto-fits.
--hist_size size Set the BPM histogram size/length.
Default: %u
--title Set web page title.
--live_title Set the web page title when the CMS-50 is reading.
--http_port port_number Set the server port number.
--port port_number Set the COM port.
--port_list List possible ports (twice, list all available ports)
--spoof file_name Add files matching the ambiguous name to the list of spoof files.
Spoof files are used for data when the 'finger' is out.
--write_graph Write star-graphs of the waveforms to output .plsoxi files.
--flush Flush the output file every sample.
--verbose Raise the verbosity level.
Web server for pulse information streaming to a browser.
"""
if __name__ == '__main__' :
import time
import serial
import TZKeyReady
import tz_server_logger
import TZCommandLineAtFile
import tz_timer
import tz_usb
program_name = sys.argv.pop(0)
TZCommandLineAtFile.expand_at_sign_command_line_files(sys.argv)
http_port = tz_stream_graph_server.HTTP_PORT
port = 0 # my COM port, not yours
port_list = 0
log_file = None
verbose = 0
do_flush = 0
title = None
live_title = None
hist_size = 120
mn_ibpm = 0
mx_ibpm = 150
write_graph = False
spoofer = a_spoofer()
how_long = float(sys.maxsize)
if tzlib.array_find(sys.argv, [ "--help", "-h", "-?", "/?", "-?" ] ) >= 0 :
print(help_str % ( os.path.basename(program_name), mn_ibpm, mx_ibpm, hist_size, ))
sys.exit(254)
while True :
oi = tzlib.array_find(sys.argv, [ "--http_port", "--httpport", "--http-port", ])
if oi < 0 : break
del sys.argv[oi]
http_port = int(sys.argv.pop(oi))
while True :
oi = tzlib.array_find(sys.argv, [ "--port", "-p" ] )
if oi < 0 : break
del sys.argv[oi]
if (oi >= len(sys.argv)) or not len(sys.argv[oi]) :
print("No COM port given!")
sys.exit(102)
port = sys.argv.pop(oi)
while True :
oi = tzlib.array_find(sys.argv, [ "--port_list", "--port-list", "--portlist", "--pl", ] )
if oi < 0 : break
del sys.argv[oi]
port_list += 1
while True :
oi = tzlib.array_find(sys.argv, [ "--verbose", "-v", ] )
if oi < 0 : break
del sys.argv[oi]
verbose += 1
while True :
oi = tzlib.array_find(sys.argv, [ "--flush", "-f", ] )
if oi < 0 : break
del sys.argv[oi]
do_flush += 1
while True :
oi = tzlib.array_find(sys.argv, [ "--write_graph", "--write-graph", "--writegraph", "--wg", ] )
if oi < 0 : break
del sys.argv[oi]
write_graph = True
while True :
oi = tzlib.array_find(sys.argv, [ "--title", "-t", ] )
if oi < 0 : break
del sys.argv[oi]
title = sys.argv.pop(oi)
while True :
oi = tzlib.array_find(sys.argv, [ "--live_title", "--live-title", "--livetitle", "--lt", ] )
if oi < 0 : break
del sys.argv[oi]
live_title = sys.argv.pop(oi)
while True :
oi = tzlib.array_find(sys.argv, [ "--spoof", ] )
if oi < 0 : break
del sys.argv[oi]
afn = sys.argv.pop(oi)
fns = tzlib.ambiguous_file_list(afn)
if not fns :
afn += tz_cms50.a_recording.FILE_EXT
fns = tzlib.ambiguous_file_list(afn)
if not fns :
print("No spoof files matching: [%s]!" % afn)
sys.exit(105)
spoofer.add_files(fns)
while True :
oi = tzlib.array_find(sys.argv, [ "--hist_range", "--hist-range", "--histrange", ] )
if oi < 0 : break
del sys.argv[oi]
mn_ibpm = max(0, int(sys.argv.pop(oi)))
mx_ibpm = max(0, int(sys.argv.pop(oi)))
if mn_ibpm > mx_ibpm :
mn_ibpm, mx_ibpm = mx_ibpm, mn_ibpm
if mx_ibpm - mn_ibpm > 250 :
mx_ibpm = mn_ibpm + 250
pass
while True :
oi = tzlib.array_find(sys.argv, [ "--hist_size", "--hist-size", "--histsize", ] )
if oi < 0 : break
del sys.argv[oi]
hist_size = max(10, int(sys.argv.pop(oi)))
while True :
oi = tzlib.array_find(sys.argv, [ "--log_file", "--log-file", "--logfile", "--log_file_name", "--log-file-name", "--logfilename", "-l", ] )
if oi < 0 : break
del sys.argv[oi]
log_file = sys.argv.pop(oi)
while True :
oi = tzlib.array_find(sys.argv, [ "--how_long", "--how-long", "--howlong", ] )
if oi < 0 : break
del sys.argv[oi]
how_long = float(sys.argv.pop(oi))
ofile_name = None
if len(sys.argv) >= 1 :
ofile_name = sys.argv.pop(0)
if ofile_name.startswith('-') :
print("Put the whole path or a dot/slash before the output file name. Dashes are confusing: [%s]" % ofile_name)
sys.exit(104)
pass
if len(sys.argv) :
print("I don't understand %s (--help for options)!" % ( sys.argv ))
sys.exit(104)
if port_list :
print(tz_usb.find_likely_COM_ports(vendor_id = tz_cms50.USB_VENDOR_ID, product_id = tz_cms50.USB_PRODUCT_ID, list_level = port_list))
gport = port
if not port :
nports = tz_usb.find_likely_COM_ports(vendor_id = tz_cms50.USB_VENDOR_ID, product_id = tz_cms50.USB_PRODUCT_ID, serial_number = "ca8d") # kludge to avoid cardiox driver board in one incarnation
ports = [ p for p in tz_usb.find_likely_COM_ports(vendor_id = tz_cms50.USB_VENDOR_ID, product_id = tz_cms50.USB_PRODUCT_ID) if p not in nports ]
if len(ports) :
port = ports[0]
print( "Using port", port,)
if len(ports) > 1 :
print("Found ports", ports, end = ' ')
print('')
pass
if not port :
print("Please tell me a COM port to use with the --port option (e.g. --port 2 )!")
sys.exit(103)
try :
port = int(port)
cport = port - 1
except ValueError :
cport = port
io = None
try :
io = serial.Serial(port = cport, baudrate = 19200, parity = "O", timeout = 0.001) # note: PC program sets 8O1. serial.Serial() multiplies timeout by 1000 before passing to windows (This 1 mill is minimum for windows. I don't know about other OS's.)
except serial.SerialException :
if not gport :
print("Port %s [%s] cannot be opened!" % ( str(port), str(cport) ))
sys.exit(111)
pass
cm = tz_cms50.a_comm(io)
logger = tz_server_logger.a_logger( http_port = http_port, file_name = log_file)
me = tz_stream_graph_server.a_server(http_port = http_port, logger = logger, show_exceptions = True, verbose = verbose)
set_main_page(me, title, mn_ibpm, mx_ibpm)
ipadr = tz_http_server.get_local_ip_address()
print("Connect to http://localhost:%u/ or http://%s:%u/" % ( me.http_port, ipadr, me.http_port ))
me.start()
samples = tz_cms50.a_recording(write_graph = write_graph)
if ofile_name :
fn = tz_cms50.get_output_file_name(ofile_name, program_name = program_name)
print("Outputting to: ", samples.open_write_file(fn))
prg = tz_cms50.a_progress_rtn()
stopped = 1000
finger = False
ts = tzlib.elapsed_time()
rx_when = ts
cm.start_usb() # in case he's not turned it on (though we'll do this every half second of silence from the device, anyway
st = tz_stream_graph_server.a_stream('pulse')
st.num_rate = tz_cms50.SAMPLE_RATE # put tick marks on the graph
me.add_stream(st)
add_extra_to_finger_stream(st, 'hidden', NO_FINGER_COLOR)
show_samples = False
curr_title = title
hist = a_hister(size = hist_size)
if how_long < 0 :
how_long = sys.maxsize
start_time = tz_timer.tick()
while tz_timer.tick() - start_time < how_long :
no_finger = False
s = None
try :
def mismatch(s, b, ay) :
logger.log("; Mismatch b=%d ay=%d sample=%s" % ( b, ay, str(s) ) )
s = cm.read_sample(progress_rtn = prg.show_progress, verbose = verbose, mismatch_callback = mismatch)
if s :
st.append(s.y)
if not finger :
samples.append(tz_cms50.a_finger_sample(True))
samples.append(s)
if do_flush :
samples.flush_file()
avya = [ ox for ox in samples.samples[-5 * tz_cms50.SAMPLE_RATE : ] if hasattr(ox, 'y') ]
avy = sum([ ox.y for ox in avya ]) / float(max(1, len(avya)))
if not finger :
finger = True
samples.flush_file()
logger.flush()
print("Finger", time.asctime())
curr_title = live_title or title
hist.clear()
add_extra_to_finger_stream(st, 'visible', FINGER_COLOR, bpm = s.hr, oxi = s.ox, title = curr_title, hist = hist.append(s.y), when = time.time()) # getattr(s, 'when', None)) # send a real sample from the device out to the client
stopped = max(stopped - 10, 0)
ys = (' ' * int((100.0 * (s.y - 0)) / max(1.0, (128 - 0)))) + '*' # (100.0 * s.y) / (s.y + (s.bc & 0xf)), ys )
if show_samples :
print("%s %s" % ( s.print_str(), ys ))
if len(samples.samples) > 3600 * tz_cms50.SAMPLE_RATE * 2 : # note: it's an accident of code that the 1st file will be 2 hours of samples and each subsequent file will be 1 hour
samples.flush_file()
samples.forget_old_samples(3600 * tz_cms50.SAMPLE_RATE) # keep some around so we can update clients with them (though we only need a few seconds worth
if ofile_name :
fn = tz_cms50.get_output_file_name(ofile_name, program_name = program_name)
print("Outputting to: ", samples.open_write_file(fn, only_new_samples = True))
pass
rx_when = tzlib.elapsed_time()
t = tzlib.elapsed_time()
if t - ts > 59 :
ts = t
samples.flush_file()
logger.flush()
pass
except tz_cms50.a_cms50_no_finger_exception :
no_finger = True
except tz_cms50.a_cms50_data_exception as msg :
no_finger = True
samples.flush_file()
print(msg) # those bits are not, apparently, dupes of each other
if not stopped :
sys.exit(199)
stopped -= 1
except ( tz_cms50.a_cms50_exception, tz_cms50.a_cms50_timeout_exception, ) :
t = tzlib.elapsed_time()
if t - rx_when > 0.5 :
rx_when = t
samples.flush_file()
cm.start_usb()
no_finger = True
pass
if no_finger : # this sample, did we detect no finger?
if finger : # and last sample was from a finger?
finger = False
hist.clear()
add_extra_to_finger_stream(st, 'hidden', NO_FINGER_COLOR, bpm = None, oxi = None, title = curr_title) # tell the client the finger is out
samples.append(tz_cms50.a_finger_sample(False))
spoofer.start_now()
print("No finger", time.asctime())
samples.flush_file()
logger.flush()
curr_title = title
pass
if (not s) and (not finger) :
if tzlib.elapsed_time() - me.get_when() > 11 : # if there's no one out there, let's slow down the action
spoofer.start_now()
time.sleep(0.23)
s = spoofer.get_sample()
if s :
st.append(s.y)
add_extra_to_finger_stream(st, 'visible', NO_FINGER_COLOR, bpm = s.hr, oxi = s.ox, title = curr_title, hist = hist.append(s.y), when = getattr(s, 'when', None)) # send a spoofed sample to the client
if show_samples :
ys = (' ' * int((100.0 * (s.y - 0)) / max(1.0, (128 - 0)))) + '*'
print("%6u: %s %s" % ( spoofer.si - 1, s.print_str(), ys ))
pass
pass
if len(cm.data) :
samples.flush_file()
while len(cm.data) :
dt = cm.data.pop(0)
if len(dt.samples) :
fn = tz_cms50.get_output_file_name(ofile_name, program_name = program_name, ext = ".dat")
dfn = dt.write_file(fn)
if not dfn :
print("Probably no data to write, so file not written.")
else :
print("Wrote driven upload to", fn, "and", dfn, len(dt.samples))
pass
pass
stopped = 1000
k = TZKeyReady.key_ready()
if k :
print('')
samples.flush_file()
logger.flush()
if k.lower() in [ 'q', '\033', ] :
break
if k.lower() in [ '?', ] :
ca = list(me.clients.values())
ca.sort(key = lambda a : a.when)
ca.reverse()
lsa = [ "%s : %s:%s %s %s" % ( c.ip_adr, str(c.ci), c.name or "", time.asctime(time.localtime(time.time() - (tzlib.elapsed_time() - c.when))), c.host or '', ) for c in ca[-20:] ]
for ls in lsa :
logger.log(ls, flush = True)
print("Go to new spoofed samples (key: N)")
print("Show samples %s (key: V to toggle)" % show_samples)
if k.lower() in [ 'v', ] :
show_samples = not show_samples
print("Show samples %s (key: V to toggle)" % show_samples)
if k.lower() in [ 'n', ] :
print("Spoof new samples")
spoofer.get_new_samples()
pass
pass
me.stop()
samples.close_write_file()
cm.blind_close()
logger.close()
#
#
#
# eof