#!/usr/bin/python3 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 # # Copyright 2021 Sebastian Lederer # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import os import serial import time import random import platform import argparse blocksize = 32 BEL = 7 ACK = 6 NAK = 21 ENQ = 5 SOH = 1 STX = 2 EOT = 4 wordmask = 0xFFFFFFFF pattern = 0xAFFECAFE def get_default_device(): if platform.system() == 'Windows': return 'COM4:' else: return '/dev/ttyUSB1' def checksum(databytes): i = 0 cksum = 0 while i < len(databytes): word = databytes[i] << 24 | \ databytes[i+1] << 16 | \ databytes[i+2] << 8 | \ databytes[i+3] # print("word:{0:08x}".format(word)) i += 4 cksum = (((cksum + word) ^ pattern) << 1) & wordmask return cksum def sendchar(char, ser): ser.write(char.to_bytes(1, 'big')) def sendcommand(ser, cmd=b'L', verbose=False): verbose = True ser.write(cmd) resp = ser.read_until() if verbose: print(cmd,"sent, response:", str(resp)) return resp # send command and wait for echo def commandwait(ser, cmd): resp = sendcommand(ser, cmd, verbose=False) if len(resp) == 0: print("timeout sending '{}' command".format(cmd)) return None if resp != bytearray(cmd + b"\r\n"): print("invalid response to '{}' command".format(cmd)) return None return resp def send_size_header(ser, filesize): ser.write(b'\x05') # ENQ resp = ser.read(1) if resp != b'\x15': # NAK # print("ENQ response:",str(resp)) ser.write(b'\x01') # SOH databytes = filesize.to_bytes(4, 'big') chksum = ~filesize & 0xFFFFFFFF #print(str(databytes), len(databytes), type(filesize), filesize) #print(str(chksum.to_bytes(4,'big')), len(databytes)) ser.write(databytes) ser.write(chksum.to_bytes(4,'big')) resp = ser.read(1) if len(resp) == 0: print("timeout waiting for ACK on size header") return False char = resp[0] # print("response to size header:", str(resp), repr(char)) return char == ACK # ACK else: print("Not using size header.") return True def serload_bin(datafile, ser): resp = sendcommand(ser) if len(resp) == 0: print("timeout sending 'L' command") return sentblocks = 0 resend = 0 data = [] if datafile.endswith('.mem'): with open(datafile) as f: for l in f.readlines(): b3 = l[0:8] b2 = l[8:16] b1 = l[16:24] b0 = l[24:32] data.extend([ int(b,2) for b in [b3,b2,b1,b0]]) else: with open(datafile, 'rb') as f: data = f.read() filesize = len(data) if filesize % blocksize > 0: l = len(data) pad = blocksize - (l % blocksize) print("padding {} bytes with {} to {}".format(l, pad, l+pad)) data += bytearray(pad) print("{} total blocks".format((len(data) + blocksize - 1) // blocksize)) if not send_size_header(ser, filesize): print("Error sending size header.") return while len(data) > 0: block = data[0:32] databytes = bytearray(block) sendchar(STX, ser) #print("block:",databytes) chksum = checksum(databytes) #print("checksum: {0:08x}".format(chksum)) #if random.randrange(2) == 0: # databytes[random.randrange(len(databytes))] = 0 ser.write(databytes) #print(chksum.to_bytes(4, 'big')) ser.write(chksum.to_bytes(4,'big')) resp = ser.read(1) if len(resp) == 0: print("timeout waiting for ACK") break char = resp[0] if char == ACK: #print("ACK received") sentblocks += 1 data = data[32:] # ack received, send next block elif char == NAK: # nak received, send same block # print("NAK received") resend += 1 pass else: print("garbage received: ",char) print(ser.read(80)) break # anything else, give up print("{} blocks sent, {} retries".format(sentblocks, resend), end='\r') sendchar(EOT, ser) print() def word_from_bytes(b): w = b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3] return w def read_word(ser): b = ser.read(4) if len(b) != 4: return None return word_from_bytes(b) def serdownload(fname, ser): resp = sendcommand(ser, b'D') if len(resp) == 0: print("timeout sending 'D' command") return sendchar(BEL, ser); resp = ser.read(1) if len(resp) == 0: print("Timeout receiving size header.") return if resp[0] != SOH: print("Error receiving size header.", resp[0]) return size = read_word(ser) cksize = read_word(ser) if (~cksize & 0xFFFFFFFF) != size: print("Invalid size header received.") return sendchar(ACK, ser) print("File size: {} bytes".format(size)) count = size with open(fname, "wb") as f: while count > 0: startbyte = ser.read(1) if len(startbyte) == 0: print("Timeout receiving STX.") return if startbyte[0] != STX: print("Error receiving STX.", resp[0]) return block = ser.read(32) if len(block) != 32: print("Error receiving block") return cksum = read_word(ser) if cksum is None: print("Error receiving block checksum") return mysum = checksum(block) if cksum != mysum: print("Checksum error, retry block") sendchar(NAK, ser) continue print(".", end="", flush=True) count -= 32 f.write(block) sendchar(ACK, ser) resp = ser.read(1) if len(resp) == 0: print("Timeout receiving EOT.") if resp[0] != EOT: print("Error receiving EOT.", resp[0]) print("\nEnd of transmission, {} bytes received.".format(size)) f.truncate(size) f.close() def mput(filenames, ser): for f in filenames: resp = set_filename(f, ser) if resp is None: return serload_bin(f, ser) resp = ser.read_until() time.sleep(2) def set_filename(f, ser): f_encoded = f.encode('utf8') print("Setting filename", f) resp = commandwait(ser, b'S') if resp is None: return None resp = sendcommand(ser, f_encoded + b'\r') if not f_encoded in resp: print("unrecognized response to filename, aborting") return None return resp def getnamedfile(filename, ser): resp = set_filename(filename, ser) if resp is None: return None serdownload(filename, ser) def putnamedfile(filename, ser): resp = set_filename(filename, ser) if resp is None: return None serload_bin(filename, ser) print("Remote status:") showdata(ser) def showdata(ser): promptseen = False while not promptseen: c = ser.read(1) if c == b'>': promptseen = True else: print(c.decode('utf8'), end='') rest = ser.read(1) def localdir(): result = os.walk(".") for dirpath, dirnames, filenames in os.walk("."): for f in filenames: print(f) break def interactive(ser): done = False while not done: args = input("> ").strip().split() if len(args) > 0: cmd = args[0] args.pop(0) if cmd == 'dir': if commandwait(ser, b'Y') is None: return showdata(ser) elif cmd == 'get': if len(args) > 1: print("exactly one argument required (filename)") else: getnamedfile(args[0], ser) elif cmd == 'put': if len(args) > 1: print("exactly one argument required (filename)") else: putnamedfile(args[0], ser) elif cmd == 'ldir': if len(args) > 0: print("superfluous argument") else: localdir() else: print("Unknown command. Valid commands are: dir get ldir put") if __name__ == "__main__": argparser = argparse.ArgumentParser( description='transfer files from/to the Tridora-CPU') argparser.add_argument('-d', '--device', help='serial device', default=get_default_device()) argparser.add_argument('command', choices=['get', 'put', 'mput', 'interactive']) argparser.add_argument('filename', nargs='*') args = argparser.parse_args() cmd = args.command serial_port = args.device filenames = args.filename ser = serial.Serial(serial_port,115200, timeout=3) if cmd == 'get': serdownload(filenames[0], ser) elif cmd == 'put': serload_bin(filenames[0], ser) elif cmd == 'mput': mput(filenames, ser) elif cmd == 'interactive': interactive(ser) else: print("should not get here") #if cmd is not None: # ser.close()