Source code for Tpdd

'''
Tpdd.py
Copyright 2016 - Gary Hammond email gfhammond@gmail.com

Python class for the Tandy TPDD and TPDD2 external floppy disk drives
'''

import operator
import serial
import sys
import time

###############################################################################
# Tpdd class start here
###############################################################################

[docs]class Tpdd(): ''' The Tpdd class is used for interfacing to a Tandy Portable Disk Drive (TPDD) or TPDD 2. :param comport: The com port that the TPDD is connected to. ''' API_VERSION = 1.0 # Default values COM_WAIT = 0.1 DEFAULT_COM_TIMEOUT = 1 MAX_READ_BYTES = 254 MAX_WRITE_BLOCK = 128 DISK_BLOCK_SIZE = 1280 DONE = 0xf0 FAIL = 0xfe OK = 0x00 # Command constants SPACES_24 = [0x20 for number in xrange(24)] CMD_COMMAND = [0x5a, 0x5a] CMD_BANK_1 = 0x40 CMD_DIR0 = [0x00, 0x1a] + SPACES_24 + [0x46, 0x00] CMD_DIR1 = [0x00, 0x1a] + SPACES_24 + [0x46, 0x01] CMD_DIR2 = [0x00, 0x1a] + SPACES_24 + [0x46, 0x02] CMD_DIR3 = [0x00, 0x1a] + SPACES_24 + [0x46, 0x03] CMD_DIR4 = [0x00, 0x1a] + SPACES_24 + [0x46, 0x04] CMD_OPEN_WRITE = [0x01, 0x01, 0x01] CMD_OPEN_APPEND = [0x01, 0x01, 0x02] CMD_OPEN_READ = [0x01, 0x01, 0x03] CMD_CLOSE = [0x02, 0x00] CMD_READ = [0x03, 0x00] CMD_WRITE = [0x04] CMD_ERASE = [0x05, 0x00] CMD_FORMAT = [0x06, 0x00] CMD_STATUS = [0x07, 0x00] CMD_CONDITION = [0x0c, 0x00] CMD_RENAME = [0x0d, 0x19] + SPACES_24 + [0x46] # Return code constants RETURN_DIR = 0x11 RETURN_FILE_READ = 0x10 RETURN_NORMAL = 0x12 # Error code constants ERR_DICT = { 0x00:'normal (no error)', 0x10:'file does not exist', 0x11:'file exists', 0x30:'no filename', 0x31:'dir search error', 0x35:'bank error', 0x36:'parameter error', 0x37:'open format mismatch', 0x3f:'end of file', 0x40:'no start mark', 0x41:'crc check error in ID', 0x42:'sector length error', 0x44:'format verify error', 0x46:'format interruption', 0x47:'erase offset error', 0x49:'crc error check in data', 0x4a:'sector number error', 0x4b:'read data timeout', 0x4d:'sector number error', 0x50:'disk write protect', 0x5e:'un-initialised disk', 0x60:'directory full', 0x61:'disk full', 0x6e:'file too long', 0x70:'no disk', 0x71:'disk change error', # Extended error codes specific to the TPPD class 0xf0:'action complete', 0xfb:'invalid checksum', 0xfc:'invalid filename', 0xfd:'CTS handshaking error. Is the TPDD turned off or in power save?', 0xfe:'no data or invalid data received from the disk drive', 0xff:'writing data to disk drive failed'} ERR_BANK_ERROR = 0x35 ERR_DISK_CHANGE = 0x71 ERR_FILE_NO_EXIST = 0x10 ERR_INVALID_CHECKSUM = 0xfb ERR_INVALID_FILENAME = 0xfc ERR_NO_CTS = 0xfd ERR_NO_DISK = 0x70 ERR_READ_FAIL = 0xfe ERR_WRITE_FAIL = 0xff ERR_CODE_INVALID_TEXT = 'invalid return code' ERR_DIR_FAILED_TEXT = 'dir command write failed' # File constants CR_LF = '\r\n' FILE_APPEND = 0x02 FILE_READ = 0x03 FILE_WRITE = 0x01 FILE_MORE_DATA = 0x80 _bank_1 = False _filename = '' def __init__ (self, comport): # Try to open the com port we have been asked to connect to try: self._ser = serial.Serial(comport, 19200, timeout=1.0, rtscts=1, write_timeout = 10) self.comport = comport except: print 'init: COM port not found. Exiting script.' sys.exit() def __del__(self): # Lets make sure we clean up com port usage on the way out. try: if self.comport != '': self._ser.close() except: print ''
[docs] def checksum(self, data): ''' Performs a checksum calculation on the block of data passed into the method. :param data: The block of data to be checksummed. :returns: A 1 byte checksum. ''' _sum = 0x00 _checksum = 0x00 for _x in data: _sum += _x _checksum = operator.xor((_sum % 256), 255) return _checksum
[docs] def delete_file(self, filename): ''' Deletes a file on the TPDD disk. :param filename: The name of the file to delete. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _cmd_dir0 = list(self.CMD_DIR0) _i = 0 _offset = 2 # Check that the file exists if self.file_exists(filename): _parsed_filename = self._filename else: return self.return_code_item(self.ERR_FILE_NO_EXIST) # Set up the initial directory entry for _x in self._filename[2:]: _cmd_dir0[_i + _offset] = ord(_x) _i += 1 _return_code_item = self.write_command(_cmd_dir0) if _return_code_item[0] == self.OK: # The directory entry has been set up _com_read = self.read_bytes(num_bytes = 31, timeout = 10) if _com_read[26] == 'F': # Erase the file _return_code_item = self.write_command(self.CMD_ERASE) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 4, timeout = 10) _return_code_item = self.parse_return_code(_com_read) return _return_code_item
[docs] def file_close(self): ''' Closes the file on a TPDD disk. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _return_code_item = self.write_command(self.CMD_CLOSE) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 4, timeout = 10) _return_code_item = self.parse_return_code(_com_read) return _return_code_item
[docs] def file_exists(self, filename): ''' Checks the directory of the disk to see if the file is present. The filename parameter is checked by :py:meth:`parse_filename` for correctness prior to searching for the file on the disk. :param filename: The filename to check. :returns: True if file found on the TPDD otherwise False ''' _cmd_dir0 = list(self.CMD_DIR0) _i = 0 _offset = 2 _return_state = False # Check to see if we have a valid filename.. _result = self.parse_filename(filename) if _result[0] == self.FAIL: return _return_state else: _parsed_filename = _result[1] # Update the command with the filename for _x in _parsed_filename[2:]: _cmd_dir0[_i + _offset] = ord(_x) _i += 1 # Send the command and check outcome _return_code_item = self.write_command(_cmd_dir0) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 31, timeout = 10) if len(_com_read) > 27: if _com_read[26] == 'F': _return_state = True else: self._ser.reset_input_buffer() # End directory reference self.write_command(self.CMD_DIR4) _com_read = self.read_bytes(num_bytes = 31, timeout = 10) return _return_state
[docs] def file_read(self, echo = False): ''' Reads the contents of a previously opened file. :param echo: (optional) Set to True to print the file to stdout during the read. Default is False. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The file contents, or the return code text in the case of an error. ''' _data_block = "" _data_read = '' _return_code_item = ['', ''] _return_code_item = self.write_command(self.CMD_READ) if _return_code_item[0] == self.OK: # Read the file _com_read = self.read_bytes(num_bytes = 131, timeout = 10) while (ord(_com_read[0]) == self.RETURN_FILE_READ) and (ord(_com_read[1]) == 0x80): _data_block = str(_com_read[2:(ord(_com_read[1]) + 2)]) _data_read += _data_block if echo: sys.stdout.write(_data_block) _return_code_item = self.write_command(self.CMD_READ) if _return_code_item[0] == self.OK: # Keep reading _com_read = self.read_bytes(num_bytes = 131) _data_block = str(_com_read[2:(ord(_com_read[1]) + 2)]) _data_read += _data_block if echo: sys.stdout.write(_data_block) _return_code_item = [ self.OK, _data_read] return _return_code_item
[docs] def file_read_open(self): ''' Opens a file on the TPDD disk for reading. This method expects that the filename to read has been parsed using :py:meth:`parse_filename`. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _cmd_dir0 = list(self.CMD_DIR0) _i = 0 _offset = 2 # Set up the initial read command for _x in self._filename[2:]: _cmd_dir0[_i + _offset] = ord(_x) _i += 1 _return_code_item = self.write_command(_cmd_dir0) if _return_code_item[0] == self.OK: # The directory entry has been set up _com_read = self.read_bytes(num_bytes = 31, timeout = 10) if _com_read[26] == 'F': _return_code = self.write_command(self.CMD_OPEN_READ) if _return_code_item[0] == self.OK: # File exists for reading _com_read = self.read_bytes(num_bytes = 4, timeout = 10) return _return_code_item
[docs] def file_write(self, data, echo = False): ''' Writes data to a previously opened file. :param data: The data to write to the file. :param echo: (optional) Set to True to print the file to stdout during the save. Default is False. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _data_block = '' _data_index = 0 _data_read = '' _return_code_item = ['', ''] _write_command = '' _data_size = len(data) while _data_index < _data_size: # While there is still data to write... _data_block = data[_data_index:(_data_index + self.MAX_WRITE_BLOCK)] _write_command = list(self.CMD_WRITE) _write_command.append(len(_data_block) + 0) for _x in _data_block: _write_command.append(ord(_x)) # Write the data block to the file _return_code_item = self.write_command(_write_command) if _return_code_item[0] == self.OK: if echo: sys.stdout.write(_data_block) _com_read = self.read_bytes(num_bytes = 4, timeout = 10) _data_index += self.MAX_WRITE_BLOCK else: return self.return_code_item(self.ERR_WRITE_FAIL) return self.return_code_item(self.OK)
[docs] def file_write_open(self): ''' Opens a file for writing on the TPDD disk. This method expects that the filename to write to has been parsed using :py:meth:`parse_filename`. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _cmd_dir0 = list(self.CMD_DIR0) _i = 0 _offset = 2 # Set up the initial write command for _x in self._filename[2:]: _cmd_dir0[_i + _offset] = ord(_x) _i += 1 _return_code_item = self.write_command(_cmd_dir0) if _return_code_item[0] == self.OK: # The directory entry has been set up _com_read = self.read_bytes(num_bytes = 31, timeout = 10) _return_code_item = self.write_command(self.CMD_OPEN_WRITE) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 4, timeout = 2) _return_code_item = self.parse_return_code(_com_read) return _return_code_item
[docs] def format_disk(self): ''' Formats the disk and reports any errors. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' # Formats the disk and reports any errors. _return_code_item = self.write_command(self.CMD_FORMAT) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(timeout = 120) _return_code_item = self.parse_return_code(_com_read) return _return_code_item
[docs] def get_bank(self): ''' Returns the current bank number as a character of either '0' or '1'. :returns: '0' for bank 0 and '1' for bank '1'. ''' if self._bank_1: return '1' else: return '0'
[docs] def get_dir(self, size = True): ''' Checks TPDD status and if ok issues the 'dir' command to the disk drive :param size: (Optional) Used to include file sizes and bytes free in listing. Default is True. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The directory contents as a list if successful or the return code error text. ''' # Check to see if a disk is in the drive _return_code_item = self.get_status() if _return_code_item[0] == self.OK: # Write the 'dir' command _return_code_item = self.write_command(self.CMD_DIR1) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 31, timeout = 10) _return_item = self.parse_dir(_com_read, size) _dir_list = [_return_item[1]] # Keep getting directory entries until none are left while _return_item[0] == self.OK: _return_code_item = self.write_command(self.CMD_DIR2) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 31) _return_item = self.parse_dir(_com_read, size) if _return_item[1] != '': _dir_list.append(_return_item[1]) _return_code_item[0] = self.OK _return_code_item[1] = _dir_list else: _dir_list = self.ERR_DIR_FAILED_TEXT else: return _return_code_item # End directory reference self.write_command(self.CMD_DIR4) _com_read = self.read_bytes(num_bytes = 31) return _return_code_item
[docs] def get_status(self): ''' Issues the status command to the disk drive. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _return_code_item = self.write_command(self.CMD_STATUS) if _return_code_item[0] == self.OK: _com_read = self.read_bytes(num_bytes = 4) _return_code_item = self.parse_return_code(_com_read) return _return_code_item
[docs] def load_file(self, filename, echo = False): ''' Loads (reads) the contents of a file from the TPDD disk. :param filename: The file on the disk to read. :param echo: (optional) Set to True to print the file to stdout during the read. Default is False. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The file contents if successful or the return code error text. ''' _return_code_item = ['', ''] # Check to see if we have a valid filename.. _result = self.parse_filename(filename) if _result[0] == self.FAIL: return self.return_code_item(self.ERR_FILE_NO_EXIST) else: _parsed_filename = _result[1] # Open the file for reading _return_code_item = self.file_read_open() if _return_code_item[0] == self.OK: # Get the file contents _return_code_item = self.file_read(echo) if _return_code_item[0] == self.OK: # All done so close the file self.file_close() # End directory reference self.write_command(self.CMD_DIR4) _com_read = self.read_bytes(num_bytes = 31) return _return_code_item
[docs] def parse_dir(self, data, size = True): ''' Takes a raw line from the results of a 'dir' command and reformats it into a readable text format :param data: The raw data as read from the com port following a 'dir' command. :param size: (Optional) Used to include file sizes and bytes free in listing. Default is True. :returns: A directory entry in readable text format - An exit code of either Tpdd.OK or Tpdd.DONE ''' _data_length = 0 _dir_text = '' _exit_code = self.DONE # Check that we have a dir entry return code if ord(data[0]) == self.RETURN_DIR: # Get the length of the data in this entry _data_length = ord(data[1]) # Check that the data length is correct for a dir entry if _data_length == 0x1c: # Check that the first entry if at the end of the directory if ord(data[2]) != 0x00: if size: _dir_text = data[2:12] + ' ' + str((ord(data[27]) * 256) + ord(data[28])) else: _dir_text = data[2:12] _exit_code = self.OK else: if size: _dir_text = str((ord(data[29]) * self.DISK_BLOCK_SIZE)) + ' bytes free' return [_exit_code, _dir_text]
[docs] def parse_filename(self, filename): ''' Checks the filename and pads spaces where required. i.e., '0:file.do' will be returned as '0:file .do' If the checks are ok, the Tpdd object updates its bank and filename to match the parsed filename. :param filename: The filename to check. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The filename formatted correctly for the TPDD or an error meesage or the error text in the case of an invalid filename. ''' # Check for invalid filenames if (filename[0] != '0' and filename[0] != '1'): return [self.FAIL, 'No or invalid bank specified', filename] if filename[1] != ':': return [self.FAIL, 'No bank separator specified', filename] if filename.find('.') == -1: return [self.FAIL, 'No extension period found', filename] if filename.find('.') == (len(filename) - 1): return [self.FAIL, 'Extension period but no extension', filename] if (len(filename) - 4) > filename.find('.'): return [self.FAIL, 'Extension is more than 3 characters', filename] if filename.find('.') > 8: return [self.FAIL, 'Filename is more than 6 characters', filename] # Set the bank number self.set_bank(filename[0]) # Pad with spaces as required _token = filename[2:].split('.', 1) self._filename = (filename[0:2] + _token[0].ljust(6) + '.' + _token[1]) return [self.OK, self._filename]
[docs] def parse_return_code(self, data): ''' Parses return code data from the TPDD in the form of (hex) 12:01:xx:yy where xx is the return code, and yy is the checksum. Note: Does not check the checksum. :param data: The raw data as read from the com port following a 'status' command. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' if len(data) != 4: _code = 0xff elif ord(data[0]) == self.RETURN_NORMAL and ord(data[1]) == 0x01: _code = ord(data[2]) else: _code = 0xff return self.return_code_item(_code)
[docs] def read_bytes(self, num_bytes = None, timeout = DEFAULT_COM_TIMEOUT): ''' Reads a byte stream (if any) from the TPDD. :param num_bytes: (optional) The number of bytes to read from the com port before returning. The default is no bytes specified resulting in relying on the timeout to complete the read. :param timeout: (optional) The time period in seconds to wait before returning with no bytes read. The default is the Tpdd.DEFAULT_COM_TIMEOUT. :returns: The raw data as read from the com port (if any). ''' _bytes_read = None timeout -= 1.0 if num_bytes == None: num_bytes = self.MAX_READ_BYTES while timeout > 0.0: if self._ser.in_waiting == 0: time.sleep(self.COM_WAIT) timeout -= self.COM_WAIT else: timeout = 0.0 # If the TPDD is ready to send, get the byte stream (if any) if self._ser.rts: _bytes_read = self._ser.read(num_bytes) return _bytes_read
[docs] def rename_file(self, old_filename, new_filename): ''' Renames a file on the disk. :param old_filename: The current name of the file. :param new_filename: The new name for the file. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _cmd_temp = list(self.CMD_DIR0) _i = 0 _offset = 2 # Check that the old file exists if self.file_exists(old_filename.upper()): _old_parsed_filename = self._filename # Check that the new filename is a valid filename. _return_code_item = self.parse_filename(new_filename.upper()) if _return_code_item[0] == self.OK: # Set up the initial dir 0 command for _x in _old_parsed_filename[2:]: _cmd_temp[_i + _offset] = ord(_x) _i += 1 _return_code_item = self.write_command(_cmd_temp) if _return_code_item[0] == self.OK: # The directory entry has been set up _com_read = self.read_bytes(num_bytes = 31, timeout = 10) if _com_read[26] == 'F': _cmd_temp = list(self.CMD_RENAME) _i = 0 _new_parsed_filename = self.parse_filename(new_filename.upper())[1] for _x in _new_parsed_filename[2:]: _cmd_temp[_i + _offset] = ord(_x) _i += 1 _return_code_item = self.write_command(_cmd_temp) if _return_code_item[0] == self.OK: # The rename command has succeeded _com_read = self.read_bytes(num_bytes = 31, timeout = 10) else: return self.return_code_item(self.ERR_INVALID_FILENAME) else: return self.return_code_item(self.ERR_FILE_NO_EXIST) return _return_code_item
[docs] def return_code_item(self, code): ''' Given a return code number, return the number and associated message for the return code. If return code not found, set the message to indicate an invalid return code. :param code: A return code number to be looked up in the return code dictionary. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _code_text = self.ERR_DICT.get(code) if _code_text is not None: return [code, _code_text] else: return [code, self.ERR_CODE_INVALID_TEXT]
[docs] def save_file(self, filename, data, echo = False): ''' Saves (writes) the data passed in to a file on the TPDD disk. :param filename: The file on the disk to write to. :param data: The data to write to the file. :param echo: (optional) Set to True to print the file to stdout during the save. Default is False. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' _return_code_item = ['', ''] # Check to see if we have a valid filename.. _result = self.parse_filename(filename) if _result[0] == self.FAIL: return self.return_code_item(self.ERR_FILE_NO_EXIST) # If the file already exists, delete the file first if self.file_exists(filename): self.delete_file(filename) # Open the file for writing _return_code_item = self.file_write_open() if _return_code_item[0] == self.OK: # Write the file contents _return_code_item = self.file_write(data,echo) if _return_code_item[0] == self.OK: # All done so close the file _return_code_item = self.file_close() return _return_code_item
[docs] def set_bank(self, bank): ''' Sets the bank number for file dir/file operations. If setting of the bank number is unsuccessful, the bank number remains unchanged. :param bank: The bank number expressed as a character of either '0' or '1'. :returns: True if setting of the bank number was successful, otherwise False. ''' if bank == '0': self._bank_1 = False return True elif bank == '1': self._bank_1 = True return True else: return False
[docs] def write_bytes(self, bytes): ''' Writes a byte stream to the TPDD. :param bytes: The data to be written to the serial port. :returns: A return code item (tuple) consisting of: - The return code (0x00 to 0xff) - The return code text. ''' # Check status of the hardware handshaking. if self._ser.cts: # Write the data... self._ser.write(bytes) return self.return_code_item(self.OK) else: return self.return_code_item(self.ERR_NO_CTS)
[docs] def write_command(self, command_type): ''' Assembles the byte stream for a command to the TPDD to pass on to the :py:meth:`write_bytes` method. The method prepends the TPDD command bytes, calculates the checksum and appends the checksum. :param command_type: The hex bytes for the command i.e., Tpdd.CMD_STATUS :returns: The response from the drive as a byte stream (if any). ''' _bytes = '' _cmd_checksum = 0 _cmd_type = list(command_type) # If teh command is for bank 1, check to make sure that the command type is one that accepts bank 1 commands. if (self._bank_1 and (command_type[0] <= 0x05 or command_type[0] == 0x0d)): _cmd_type[0] += self.CMD_BANK_1 _cmd_checksum = self.checksum(_cmd_type) _list = self.CMD_COMMAND + _cmd_type + [_cmd_checksum] for x in _list: _bytes += chr(x) self._ser.reset_input_buffer() return self.write_bytes(_bytes)
def main(): pass if __name__ == '__main__': main()