#!/usr/bin/env python ''' This script comunicated with the K40 Laser Cutter. Copyright (C) 2017-2023 Scorch www.scorchworks.com This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ''' try: import usb.core import usb.util import usb.backend.libusb0 except: print("Unable to load USB library (Sending data to Laser will not work.)") import sys import struct import os from shutil import copyfile from egv import egv import traceback from windowsinhibitor import WindowsInhibitor from time import time ############################################################################## class K40_CLASS: def __init__(self): self.dev = None self.n_timeouts = 10 self.timeout = 200 # Time in milliseconds self.write_addr = 0x2 # Write address self.read_addr = 0x82 # Read address self.read_length= 168 #### RESPONSE CODES #### self.OK = 206 self.BUFFER_FULL = 238 self.CRC_ERROR = 207 self.TASK_COMPLETE = 236 self.UNKNOWN_2 = 239 #after failed initialization followed by succesful initialization self.TASK_COMPLETE_M3 = 204 ####################### self.hello = [160] self.unlock = [166,0,73,83,50,80,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,166,15] self.home = [166,0,73,80,80,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,166,228] self.estop = [166,0,73,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,166,130] self.USB_Location = None def say_hello(self): cnt=0 status_timeouts = self.n_timeouts while cnt < status_timeouts: cnt=cnt+1 try: self.send_packet(self.hello) break except: pass if cnt >= status_timeouts: return None response = None read_cnt = 0 while response==None and read_cnt < status_timeouts: try: response = self.dev.read(self.read_addr,self.read_length,self.timeout) except: response = None read_cnt = read_cnt + 1 DEBUG = False if response != None: if DEBUG: if int(response[0]) != 255: print ("0: ", response[0]) elif int(response[1]) != 206: print ("1: ", response[1]) elif int(response[2]) != 111: print ("2: ", response[2]) elif int(response[3]) != 8: print ("3: ", response[3]) elif int(response[4]) != 19: #Get a 3 if you try to initialize when already initialized print ("4: ", response[4]) elif int(response[5]) != 0: print ("5: ", response[5]) else: print (".",) if response[1]==self.OK or \ response[1]==self.BUFFER_FULL or \ response[1]==self.CRC_ERROR or \ response[1]==self.TASK_COMPLETE or \ response[1]==self.TASK_COMPLETE_M3 or \ response[1]==self.UNKNOWN_2: return response[1] else: return 9999 else: return None def unlock_rail(self): self.send_packet(self.unlock) def e_stop(self): self.send_packet(self.estop) def home_position(self): self.send_packet(self.home) def reset_usb(self): self.dev.reset() def release_usb(self): usb.util.dispose_resources(self.dev) self.dev = None self.USB_Location = None def pause_un_pause(self): try: self.send_data([ord('P'),ord('N')]) except: pass def unfreeze(self): try: self.send_data([ord('F'),ord('N'),ord('S'),ord('E')]) #print("unfreeze sent") except: pass ####################################################################### # The one wire CRC algorithm is derived from the OneWire.cpp Library # The latest version of this library may be found at: # http://www.pjrc.com/teensy/td_libs_OneWire.html ####################################################################### def OneWireCRC(self,line): crc=0 for i in range(len(line)): inbyte=line[i] for j in range(8): mix = (crc ^ inbyte) & 0x01 crc >>= 1 if (mix): crc ^= 0x8C inbyte >>= 1 return crc ####################################################################### def none_function(self,dummy=None,bgcolor=None): #Don't delete this function (used in send_data) return False def send_data(self,data,update_gui=None,stop_calc=None,passes=1,preprocess_crc=True, wait_for_laser=False): if stop_calc == None: stop_calc=[] stop_calc.append(0) if update_gui == None: update_gui = self.none_function NoSleep = WindowsInhibitor() NoSleep.inhibit() blank = [166,0,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,70,166,80] packets = [] packet = blank[:] cnt=2 len_data = len(data) for j in range(passes): if j == 0: istart = 0 else: istart = 1 if passes > 1: if j == passes-1: data[-4]=ord("F") else: data[-4]=ord("@") timestamp=0 for i in range(istart,len_data): if cnt > 31: packet[-1] = self.OneWireCRC(packet[1:len(packet)-2]) stamp=int(3*time()) #update every 1/3 of a second if not preprocess_crc: self.send_packet_w_error_checking(packet,update_gui,stop_calc) if (stamp != timestamp): timestamp=stamp #interlock update_gui("Sending Data to Laser = %.1f%%" %(100.0*float(i)/float(len_data))) else: packets.append(packet) if (stamp != timestamp): timestamp=stamp #interlock update_gui("Calculating CRC data and Generate Packets: %.1f%%" %(100.0*float(i)/float(len_data))) packet = blank[:] cnt = 2 if stop_calc[0]==True: NoSleep.uninhibit() self.stop_sending_data() #raise Exception("Action Stopped by User.") packet[cnt]=data[i] cnt=cnt+1 packet[-1]=self.OneWireCRC(packet[1:len(packet)-2]) if not preprocess_crc: self.send_packet_w_error_checking(packet,update_gui,stop_calc) if cnt > 31: self.send_packet_w_error_checking(blank,update_gui,stop_calc) else: packets.append(packet) if cnt > 31: packets.append(blank[:]) update_gui("CRC data and Packets are Ready") packet_cnt = 0 for line in packets: update_gui() self.send_packet_w_error_checking(line,update_gui,stop_calc) packet_cnt = packet_cnt+1.0 update_gui( "Sending Data to Laser = %.1f%%" %( 100.0*packet_cnt/len(packets) ) ) ############################################################## if wait_for_laser: self.wait_for_laser_to_finish(update_gui,stop_calc) NoSleep.uninhibit() def send_packet_w_error_checking(self,line,update_gui=None,stop_calc=None): timeout_cnt = 1 crc_cnt = 1 while True: if stop_calc[0]: self.stop_sending_data() response = self.say_hello() if response == self.BUFFER_FULL: while response == self.BUFFER_FULL: response = self.say_hello() update_gui() if stop_calc[0]: self.stop_sending_data() try: self.send_packet(line) except: timeout_cnt=timeout_cnt+1 if timeout_cnt < self.n_timeouts: msg = "USB Timeout #%d" %(timeout_cnt) update_gui(msg,bgcolor='yellow') else: msg = "The laser cutter is not responding (%d attempts). Press stop to stop trying!" %(timeout_cnt) gui_active = update_gui(msg,bgcolor='red') if not gui_active: msg = "The laser cutter is not responding after %d attempts." %(timeout_cnt) raise Exception(msg) if timeout_cnt > 20: # try reconnect to laser try: self.initialize_device(self.USB_Location) except: pass continue ###################################### response = self.say_hello() if response == self.CRC_ERROR: crc_cnt=crc_cnt+1 if crc_cnt < self.n_timeouts: msg = "Data transmission (CRC) error #%d" %(crc_cnt) update_gui(msg,bgcolor='yellow') else: msg = "There are many data transmission errors (%d). Press stop to stop trying!" %(crc_cnt) gui_active = update_gui(msg,bgcolor='red') if not gui_active: msg = "There are many data transmission errors (%d)." %(crc_cnt) raise Exception(msg) continue elif response == None: # The controller board is not reporting status. but we will # assume things are going OK. until we cannot transmit to the controller. break #break to move on to next packet else: #assume: response == self.OK: break #break to move on to next packet def wait_for_laser_to_finish(self,update_gui=None,stop_calc=None): FINISHED = False while not FINISHED: response = self.say_hello() if response == self.TASK_COMPLETE or response == self.TASK_COMPLETE_M3: FINISHED = True break elif response == None: msg = "Laser stopped responding after operation was complete." update_gui(msg) #raise Exception(msg) FINISHED = True else: #assume: response == self.OK: msg = "Waiting for the laser to finish." update_gui(msg) if stop_calc[0]: self.stop_sending_data() def stop_sending_data(self): self.e_stop() raise Exception("Action Stopped by User.") def send_packet(self,line): self.dev.write(self.write_addr,line,self.timeout) def print_command(self,data): for x in data: sys.stdout.write(chr(x)) sys.stdout.write("\n") def rapid_move(self,dxmils,dymils): if (dxmils!=0 or dymils!=0): data=[] egv_inst = egv(target=lambda s:data.append(s)) egv_inst.make_move_data(dxmils,dymils) self.send_data(data, wait_for_laser=False) def detach_ch341_kernel_driver(self, device=None): if sys.platform.startswith('linux') and device is not None: if device.is_kernel_driver_active(0): try: device.detach_kernel_driver(0) print('Device detached from ch341 linux driver') except usb.core.USBError as e: print ("Could not detatch from ch341 linux driver: %s" % str(e)) def initialize_device(self,USB_Location=None,verbose=False): try: self.release_usb() except: pass backend = usb.backend.libusb0.get_backend() if backend==None and os.name == 'nt': exedir = os.path.dirname(sys.executable) os.environ['PATH'] = exedir + os.pathsep + os.environ['PATH'] # Find a laser device self.dev = None laser_cnt=0 if USB_Location == None: for device in usb.core.find(idVendor=0x1a86, idProduct=0x5512, find_all=True): self.dev=device try: # detach device from linux kernel driver self.detach_ch341_kernel_driver(device=self.dev) # set the active configuration. With no arguments, the first # configuration will be the active one self.dev.set_configuration() if (self.say_hello()!=None): self.USB_Location = (self.dev.bus,self.dev.address) break except: self.dev = None else: self.dev = usb.core.find(idVendor=0x1a86, idProduct=0x5512, bus=USB_Location[0], address=USB_Location[1]) # detach device from linux kernel driver self.detach_ch341_kernel_driver(device=self.dev) self.dev.set_configuration() self.USB_Location = (self.dev.bus,self.dev.address) if self.dev is None: raise Exception("Laser USB Device not found. (libUSB driver may not be installed)") if verbose: print("-------------- dev --------------") print(self.dev) # set the active configuration. With no arguments, the first # configuration will be the active one #try: # self.dev.set_configuration() #except: # raise Exception("Unable to set USB Device configuration.") # get an endpoint instance cfg = self.dev.get_active_configuration() if verbose: print ("-------------- cfg --------------") print (cfg) intf = cfg[(0,0)] if verbose: print ("-------------- intf --------------") print (intf) ep = usb.util.find_descriptor( intf, # match the first OUT endpoint custom_match = \ lambda e: \ usb.util.endpoint_direction(e.bEndpointAddress) == \ usb.util.ENDPOINT_OUT) if ep == None: raise Exception("Unable to match the USB 'OUT' endpoint.") if verbose: print ("-------------- ep --------------") print (ep) #self.dev.clear_halt(ep) #print self.dev.get_active_configuration() # dev.ctrl_transfer(bmRequestType, bRequest, wValue=0, wIndex=0, data_or_wLength = None, 2000) ctrlxfer = self.dev.ctrl_transfer( 0x40, 177, 0x0102, 0, 0, 2000) if verbose: print ("---------- ctrlxfer ------------") print (ctrlxfer) return self.USB_Location def hex2dec(self,hex_in): #format of "hex_in" is ["40","e7"] dec_out=[] for a in hex_in: dec_out.append(int(a,16)) return dec_out if __name__ == "__main__": k40=K40_CLASS() run_laser = False try: USB_LOCATION=k40.initialize_device(verbose=False) # the following does not work for python 2.5 except RuntimeError as e: #(RuntimeError, TypeError, NameError, StandardError): print(e) print("Exiting...") os._exit(0) print('initialize with location=',USB_LOCATION) k40.initialize_device(k40.USB_Location,verbose=False) print('hello',k40.say_hello()) #print k40.reset_position() #print k40.unlock_rail() print ("DONE")