#!/usr/bin/python3 # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 # # Copyright 2021 Sebastian Lederer # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import serial import time import random blocksize = 32 BEL = 7 ACK = 6 NAK = 21 ENQ = 5 SOH = 1 STX = 2 EOT = 4 wordmask = 0xFFFFFFFF pattern = 0xAFFECAFE def serwrite_slow(databytes, ser): total = len(data) count = 1 for d in data: sys.stdout.write("writing {0:02x} {1:04d}/{2:04d}\r".format(ord(d), count, total)) ser.write(bytes(d,"utf8")) count += 1 time.sleep(0.020) print() def serwrite(datafile, ser): with open(datafile) as f: data = f.read() total = len(data) count = 1 for d in data: sys.stdout.write("writing {0:02x} {1:04d}/{2:04d}\r".format(ord(d), count, total)) ser.write(bytes(d,"utf8")) count += 1 time.sleep(0.020) print() def checksum(databytes): i = 0 cksum = 0 while i < len(databytes): word = databytes[i] << 24 | \ databytes[i+1] << 16 | \ databytes[i+2] << 8 | \ databytes[i+3] # print("word:{0:08x}".format(word)) i += 4 cksum = (((cksum + word) ^ pattern) << 1) & wordmask return cksum def sendchar(char, ser): ser.write(char.to_bytes(1, 'big')) def sendcommand(ser, cmd=b'L'): ser.write(cmd) resp = ser.read_until() print(cmd,"sent, response:", str(resp)) return resp def send_size_header(ser, filesize): ser.write(b'\x05') # ENQ resp = ser.read(1) if resp != b'\x15': # NAK # print("ENQ response:",str(resp)) ser.write(b'\x01') # SOH databytes = filesize.to_bytes(4, 'big') chksum = ~filesize & 0xFFFFFFFF #print(str(databytes), len(databytes), type(filesize), filesize) #print(str(chksum.to_bytes(4,'big')), len(databytes)) ser.write(databytes) ser.write(chksum.to_bytes(4,'big')) resp = ser.read(1) if len(resp) == 0: print("timeout waiting for ACK on size header") return False char = resp[0] # print("response to size header:", str(resp), repr(char)) return char == ACK # ACK else: print("Not using size header.") return True def serload_bin(datafile, ser): resp = sendcommand(ser) if len(resp) == 0: print("timeout sending 'L' command") return sentblocks = 0 resend = 0 data = [] if datafile.endswith('.mem'): with open(datafile) as f: for l in f.readlines(): b3 = l[0:8] b2 = l[8:16] b1 = l[16:24] b0 = l[24:32] data.extend([ int(b,2) for b in [b3,b2,b1,b0]]) else: with open(datafile, 'rb') as f: data = f.read() filesize = len(data) if filesize % blocksize > 0: l = len(data) pad = blocksize - (l % blocksize) print("padding {} bytes with {} to {}".format(l, pad, l+pad)) data += bytearray(pad) if not send_size_header(ser, filesize): print("Error sending size header.") return while len(data) > 0: block = data[0:32] databytes = bytearray(block) sendchar(STX, ser) #print("block:",databytes) chksum = checksum(databytes) #print("checksum: {0:08x}".format(chksum)) #if random.randrange(2) == 0: # databytes[random.randrange(len(databytes))] = 0 ser.write(databytes) #print(chksum.to_bytes(4, 'big')) ser.write(chksum.to_bytes(4,'big')) resp = ser.read(1) if len(resp) == 0: print("timeout waiting for ACK") break char = resp[0] if char == ACK: #print("ACK received") sentblocks += 1 data = data[32:] # ack received, send next block elif char == NAK: # nak received, send same block # print("NAK received") resend += 1 pass else: print("garbage received: ",char) print(ser.read(80)) break # anything else, give up print("{} blocks sent, {} retries".format(sentblocks, resend), end='\r') sendchar(EOT, ser) print() def word_from_bytes(b): w = b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3] return w def read_word(ser): b = ser.read(4) if len(b) != 4: return None return word_from_bytes(b) def serdownload(fname, ser): resp = sendcommand(ser, b'D') if len(resp) == 0: print("timeout sending 'D' command") return sendchar(BEL, ser); resp = ser.read(1) if len(resp) == 0: print("Timeout receiving size header.") return if resp[0] != SOH: print("Error receiving size header.", resp[0]) return size = read_word(ser) cksize = read_word(ser) if (~cksize & 0xFFFFFFFF) != size: print("Invalid size header received.") return sendchar(ACK, ser) print("File size: {} bytes".format(size)) count = size with open(fname, "wb") as f: while count > 0: startbyte = ser.read(1) if len(startbyte) == 0: print("Timeout receiving STX.") return if startbyte[0] != STX: print("Error receiving STX.", resp[0]) return block = ser.read(32) if len(block) != 32: print("Error receiving block") return cksum = read_word(ser) if cksum is None: print("Error receiving block checksum") return mysum = checksum(block) if cksum != mysum: print("Checksum error, retry block") sendchar(NAK, ser) continue print(".", end="", flush=True) count -= 32 f.write(block) sendchar(ACK, ser) resp = ser.read(1) if len(resp) == 0: print("Timeout receiving EOT.") if resp[0] != EOT: print("Error receiving EOT.", resp[0]) print("\nEnd of transmission, {} bytes received.".format(size)) f.truncate(size) f.close() if __name__ == "__main__": cmd = None if len(sys.argv) == 4: cmd = sys.argv[1] filename = sys.argv[2] serial_port = sys.argv[3] ser = serial.Serial(serial_port,115200, timeout=3) if cmd == 'get': serdownload(filename, ser) elif cmd == 'put': serload_bin(filename, ser) else: print("Usage: {} get|put ".format(sys.argv[0])) if cmd is not None: ser.close()