393 lines
10 KiB
Python
393 lines
10 KiB
Python
#!/usr/bin/python3
|
|
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
|
|
#
|
|
# Copyright 2021 Sebastian Lederer
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import sys
|
|
import os
|
|
import serial
|
|
import time
|
|
import random
|
|
import platform
|
|
import argparse
|
|
|
|
blocksize = 32
|
|
BEL = 7
|
|
ACK = 6
|
|
NAK = 21
|
|
ENQ = 5
|
|
SOH = 1
|
|
STX = 2
|
|
EOT = 4
|
|
wordmask = 0xFFFFFFFF
|
|
pattern = 0xAFFECAFE
|
|
|
|
|
|
def get_default_device():
|
|
if platform.system() == 'Windows':
|
|
return 'COM4:'
|
|
else:
|
|
return '/dev/ttyUSB1'
|
|
|
|
|
|
def checksum(databytes):
|
|
i = 0
|
|
cksum = 0
|
|
while i < len(databytes):
|
|
word = databytes[i] << 24 | \
|
|
databytes[i+1] << 16 | \
|
|
databytes[i+2] << 8 | \
|
|
databytes[i+3]
|
|
# print("word:{0:08x}".format(word))
|
|
i += 4
|
|
|
|
cksum = (((cksum + word) ^ pattern) << 1) & wordmask
|
|
|
|
return cksum
|
|
|
|
|
|
def sendchar(char, ser):
|
|
ser.write(char.to_bytes(1, 'big'))
|
|
|
|
|
|
def sendcommand(ser, cmd=b'L', verbose=False):
|
|
verbose = True
|
|
ser.write(cmd)
|
|
resp = ser.read_until()
|
|
if verbose:
|
|
print(cmd,"sent, response:", str(resp))
|
|
return resp
|
|
|
|
|
|
# send command and wait for echo
|
|
def commandwait(ser, cmd):
|
|
resp = sendcommand(ser, cmd, verbose=False)
|
|
if len(resp) == 0:
|
|
print("timeout sending '{}' command".format(cmd))
|
|
return None
|
|
|
|
if resp != bytearray(cmd + b"\r\n"):
|
|
print("invalid response to '{}' command".format(cmd))
|
|
return None
|
|
|
|
return resp
|
|
|
|
|
|
def send_size_header(ser, filesize):
|
|
ser.write(b'\x05') # ENQ
|
|
resp = ser.read(1)
|
|
if resp != b'\x15': # NAK
|
|
# print("ENQ response:",str(resp))
|
|
ser.write(b'\x01') # SOH
|
|
|
|
databytes = filesize.to_bytes(4, 'big')
|
|
chksum = ~filesize & 0xFFFFFFFF
|
|
|
|
#print(str(databytes), len(databytes), type(filesize), filesize)
|
|
#print(str(chksum.to_bytes(4,'big')), len(databytes))
|
|
|
|
ser.write(databytes)
|
|
ser.write(chksum.to_bytes(4,'big'))
|
|
|
|
resp = ser.read(1)
|
|
if len(resp) == 0:
|
|
print("timeout waiting for ACK on size header")
|
|
return False
|
|
char = resp[0]
|
|
# print("response to size header:", str(resp), repr(char))
|
|
return char == ACK # ACK
|
|
else:
|
|
print("Not using size header.")
|
|
return True
|
|
|
|
|
|
def serload_bin(datafile, ser):
|
|
resp = sendcommand(ser)
|
|
if len(resp) == 0:
|
|
print("timeout sending 'L' command")
|
|
return
|
|
|
|
sentblocks = 0
|
|
resend = 0
|
|
|
|
data = []
|
|
|
|
if datafile.endswith('.mem'):
|
|
with open(datafile) as f:
|
|
for l in f.readlines():
|
|
b3 = l[0:8]
|
|
b2 = l[8:16]
|
|
b1 = l[16:24]
|
|
b0 = l[24:32]
|
|
data.extend([ int(b,2) for b in [b3,b2,b1,b0]])
|
|
else:
|
|
with open(datafile, 'rb') as f:
|
|
data = f.read()
|
|
|
|
filesize = len(data)
|
|
|
|
if filesize % blocksize > 0:
|
|
l = len(data)
|
|
pad = blocksize - (l % blocksize)
|
|
|
|
print("padding {} bytes with {} to {}".format(l, pad, l+pad))
|
|
|
|
data += bytearray(pad)
|
|
|
|
print("{} total blocks".format((len(data) + blocksize - 1) // blocksize))
|
|
|
|
if not send_size_header(ser, filesize):
|
|
print("Error sending size header.")
|
|
return
|
|
|
|
while len(data) > 0:
|
|
block = data[0:32]
|
|
databytes = bytearray(block)
|
|
|
|
sendchar(STX, ser)
|
|
#print("block:",databytes)
|
|
chksum = checksum(databytes)
|
|
#print("checksum: {0:08x}".format(chksum))
|
|
#if random.randrange(2) == 0:
|
|
# databytes[random.randrange(len(databytes))] = 0
|
|
ser.write(databytes)
|
|
#print(chksum.to_bytes(4, 'big'))
|
|
ser.write(chksum.to_bytes(4,'big'))
|
|
|
|
resp = ser.read(1)
|
|
if len(resp) == 0:
|
|
print("timeout waiting for ACK")
|
|
break
|
|
char = resp[0]
|
|
|
|
if char == ACK:
|
|
#print("ACK received")
|
|
sentblocks += 1
|
|
data = data[32:] # ack received, send next block
|
|
elif char == NAK: # nak received, send same block
|
|
# print("NAK received")
|
|
resend += 1
|
|
pass
|
|
else:
|
|
print("garbage received: ",char)
|
|
print(ser.read(80))
|
|
break # anything else, give up
|
|
|
|
print("{} blocks sent, {} retries".format(sentblocks, resend), end='\r')
|
|
|
|
sendchar(EOT, ser)
|
|
print()
|
|
|
|
|
|
def word_from_bytes(b):
|
|
w = b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3]
|
|
return w
|
|
|
|
|
|
def read_word(ser):
|
|
b = ser.read(4)
|
|
if len(b) != 4:
|
|
return None
|
|
return word_from_bytes(b)
|
|
|
|
|
|
def serdownload(fname, ser):
|
|
resp = sendcommand(ser, b'D')
|
|
if len(resp) == 0:
|
|
print("timeout sending 'D' command")
|
|
return
|
|
|
|
sendchar(BEL, ser);
|
|
|
|
resp = ser.read(1)
|
|
if len(resp) == 0:
|
|
print("Timeout receiving size header.")
|
|
return
|
|
if resp[0] != SOH:
|
|
print("Error receiving size header.", resp[0])
|
|
return
|
|
|
|
size = read_word(ser)
|
|
cksize = read_word(ser)
|
|
if (~cksize & 0xFFFFFFFF) != size:
|
|
print("Invalid size header received.")
|
|
return
|
|
|
|
sendchar(ACK, ser)
|
|
|
|
print("File size: {} bytes".format(size))
|
|
|
|
count = size
|
|
with open(fname, "wb") as f:
|
|
while count > 0:
|
|
startbyte = ser.read(1)
|
|
if len(startbyte) == 0:
|
|
print("Timeout receiving STX.")
|
|
return
|
|
if startbyte[0] != STX:
|
|
print("Error receiving STX.", resp[0])
|
|
return
|
|
|
|
block = ser.read(32)
|
|
if len(block) != 32:
|
|
print("Error receiving block")
|
|
return
|
|
|
|
cksum = read_word(ser)
|
|
if cksum is None:
|
|
print("Error receiving block checksum")
|
|
return
|
|
|
|
mysum = checksum(block)
|
|
if cksum != mysum:
|
|
print("Checksum error, retry block")
|
|
sendchar(NAK, ser)
|
|
continue
|
|
|
|
print(".", end="", flush=True)
|
|
count -= 32
|
|
f.write(block)
|
|
sendchar(ACK, ser)
|
|
|
|
resp = ser.read(1)
|
|
if len(resp) == 0:
|
|
print("Timeout receiving EOT.")
|
|
if resp[0] != EOT:
|
|
print("Error receiving EOT.", resp[0])
|
|
|
|
print("\nEnd of transmission, {} bytes received.".format(size))
|
|
f.truncate(size)
|
|
f.close()
|
|
|
|
|
|
def mput(filenames, ser):
|
|
for f in filenames:
|
|
resp = set_filename(f, ser)
|
|
if resp is None:
|
|
return
|
|
serload_bin(f, ser)
|
|
|
|
resp = ser.read_until()
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
def set_filename(f, ser):
|
|
f_encoded = f.encode('utf8')
|
|
print("Setting filename", f)
|
|
resp = commandwait(ser, b'S')
|
|
if resp is None:
|
|
return None
|
|
resp = sendcommand(ser, f_encoded + b'\r')
|
|
if not f_encoded in resp:
|
|
print("unrecognized response to filename, aborting")
|
|
return None
|
|
return resp
|
|
|
|
|
|
def getnamedfile(filename, ser):
|
|
resp = set_filename(filename, ser)
|
|
if resp is None:
|
|
return None
|
|
serdownload(filename, ser)
|
|
|
|
|
|
def putnamedfile(filename, ser):
|
|
resp = set_filename(filename, ser)
|
|
if resp is None:
|
|
return None
|
|
serload_bin(filename, ser)
|
|
print("Remote status:")
|
|
showdata(ser)
|
|
|
|
|
|
def showdata(ser):
|
|
|
|
promptseen = False
|
|
|
|
while not promptseen:
|
|
c = ser.read(1)
|
|
if c == b'>':
|
|
promptseen = True
|
|
else:
|
|
print(c.decode('utf8'), end='')
|
|
rest = ser.read(1)
|
|
|
|
|
|
def localdir():
|
|
result = os.walk(".")
|
|
for dirpath, dirnames, filenames in os.walk("."):
|
|
for f in filenames:
|
|
print(f)
|
|
break
|
|
|
|
|
|
def interactive(ser):
|
|
done = False
|
|
while not done:
|
|
args = input("> ").strip().split()
|
|
if len(args) > 0:
|
|
cmd = args[0]
|
|
args.pop(0)
|
|
if cmd == 'dir':
|
|
if commandwait(ser, b'Y') is None:
|
|
return
|
|
showdata(ser)
|
|
elif cmd == 'get':
|
|
if len(args) > 1:
|
|
print("exactly one argument required (filename)")
|
|
else:
|
|
getnamedfile(args[0], ser)
|
|
elif cmd == 'put':
|
|
if len(args) > 1:
|
|
print("exactly one argument required (filename)")
|
|
else:
|
|
putnamedfile(args[0], ser)
|
|
elif cmd == 'ldir':
|
|
if len(args) > 0:
|
|
print("superfluous argument")
|
|
else:
|
|
localdir()
|
|
else:
|
|
print("Unknown command. Valid commands are: dir get ldir put")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
argparser = argparse.ArgumentParser(
|
|
description='transfer files from/to the Tridora-CPU')
|
|
argparser.add_argument('-d', '--device', help='serial device', default=get_default_device())
|
|
argparser.add_argument('command', choices=['get', 'put', 'mput', 'interactive'])
|
|
argparser.add_argument('filename', nargs='*')
|
|
args = argparser.parse_args()
|
|
|
|
cmd = args.command
|
|
serial_port = args.device
|
|
filenames = args.filename
|
|
|
|
ser = serial.Serial(serial_port,115200, timeout=3)
|
|
|
|
if cmd == 'get':
|
|
serdownload(filenames[0], ser)
|
|
elif cmd == 'put':
|
|
serload_bin(filenames[0], ser)
|
|
elif cmd == 'mput':
|
|
mput(filenames, ser)
|
|
elif cmd == 'interactive':
|
|
interactive(ser)
|
|
else:
|
|
print("should not get here")
|
|
|
|
#if cmd is not None:
|
|
# ser.close()
|