# -*- coding: iso-8859-15 -*-
#
# pysynscan
# Copyright (c) July 2020 Nacho Mas
import os
import logging
from synscan.comm import comm
import time
UDP_IP = os.getenv("SYNSCAN_UDP_IP","192.168.4.1")
UDP_PORT = int(os.getenv("SYNSCAN_UDP_PORT",11880))
LOGGING_LEVEL=os.getenv("SYNSCAN_LOGGING_LEVEL",logging.INFO)
[docs]class motors(comm):
'''
Implementation of all motor commands and logic
following the document:
https://inter-static.skywatcher.com/downloads/skywatcher_motor_controller_command_set.pdf
**IMPORTANT NOTES** (based on the above doc):
* In the motor controller, there is a hardware timer T1 that is used to generate stepping pulse
for stepper motor or reference position for servomotor. The input clocks frequency of the timer,
plus the preset value of this timer, determine the slewing speed of the motors.
* For GOTO mode, the motor controller will take care of T1 automatically. But motion mode has to be set to tracking=False
When T1 generates an interrupt, it might:
* Drive the motor to move 1 step (1 micro-step or 1 encoder tick) for low speed slewing.
* Drive the motor to move up to 32 steps for high speed slewing. This method applies to motor
controller firmware version 2.xx. For motor controller with firmware 3.xx or above, the motor
controller always drive the motor controller 1 steps/interrupt.
Typical session:
* Check whether the motor is in full stop status. If not, stop it.
* Set the motion mode.
* Set the parameters, for example, destination or preset value of T1.
* Set the "Start" command.
* For a GOTO slewing, check the motor status to confirm that the motor stops (Generally means arriving the destination. ).
For a Speed mode slewing, send "Stop" command to end the session.
Generally, the motor controller returns to "Speed Mode" when the motor stops automatically.
**NOTE:** Methods begining with axis prefix act only onto selected axis.
'''
def __init__(self,udp_ip=UDP_IP,udp_port=UDP_PORT):
'''Init UDP comunication '''
logging.basicConfig(
format='%(asctime)s %(levelname)s:synscanMotor: %(message)s',
level=LOGGING_LEVEL
)
super(motors, self).__init__(udp_ip,udp_port)
self._init()
self.update_current_values()
def _init(self):
'''Get main motor parameters. Retry if comm fails'''
retrySec=2
try:
self.params=self.get_parameters()
except NameError as error:
logging.warning(error)
logging.warning(f'Retriying in {retrySec}..')
time.sleep(retrySec)
self._init()
def _degreesPerSecond2T1preset(self,axis,degreesPerSecond):
countsPerSecond=self.degrees2counts(axis,degreesPerSecond)
TMR_Freq=self.params[axis]['TimerInterruptFreq']
if abs(countsPerSecond) <=0:
T1preset=TMR_Freq
else:
T1preset=TMR_Freq/abs(countsPerSecond)
return T1preset
[docs] def get_values(self,parameterDict):
'''
Send all cmd in the parameterDict for both axis and return
a dictionary with the values.
Used by get_parameters and update_current_values functions
'''
params=dict()
for axis in range(1,3):
params[axis]=dict()
for parameter,cmd in parameterDict.items():
try:
params[axis][parameter]=self._send_cmd(cmd,axis)
except NameError as error:
logging.warning(error)
raise(NameError('getValuesError'))
#Send init done
self._send_cmd('F',axis)
return params
[docs] def get_parameters(self):
'''
Get main motor parameters.
Some of this parameters are needed for all calculations
so they have to be available to the rest of the code
Parameters are stored in a dict with te following keys:
* countsPerRevolution
* TimerInterruptFreq
* StepPeriod
* MotorBoardVersion
* HighSpeedRatio
'''
parameterDict={ 'countsPerRevolution':'a',
'TimerInterruptFreq':'b',
'StepPeriod':'i',
'MotorBoardVersion':'e',
'HighSpeedRatio':'g',
}
try:
params=self.get_values(parameterDict)
except NameError as error:
logging.warning(error)
raise(NameError('getParametersError'))
return {}
logging.info(f'MOUNT PARAMETERS: {params}')
return params
[docs] def axis_get_pos(self,axis):
'''Get actual position in Degrees.'''
counts=self.axis_get_posCounts(axis)
return self.counts2degrees(axis,counts)
[docs] def axis_set_pos(self,axis,degrees):
'''Syncronize position Degrees.'''
logging.info(f'AXIS{axis}: Syncronizing actual position to {degrees} degrees')
counts=self.degrees2counts(axis,degrees)
response=self.axis_set_posCounts(axis,int(counts))
def _decode_status(self,hexstring):
''' Decode Status msg.
Status msg is 12bits long (3 HEX digits).
HEX digit1 bits:
* B0: 1=Tracking,0=Goto
* B1: 1=CCW,0=CW
* B2: 1=Fast,0=Slow
HEX digit2 bits:
* B0: 1=Running,0=Stopped
* B1: 1=Blocked,0=Normal
HEX digit3 bits:
* B0: 0 = Not Init,1 = Init done
* B1: 1 = Level switch on
The decode value is returned as a dictionary with the following keys:
* Tracking
* CCW
* FastSpeed
* Stopped
* Blocked
* InitDone
* LevelSwitchOn
'''
A=int(hexstring[0],16)
B=int(hexstring[1],16)
C=int(hexstring[2],16)
logging.debug(f'Decode status {hexstring} A:{A} B:{B} C:{C}')
status=dict()
status['Tracking']=bool(A & 0x01)
status['CCW']=bool((A & 0x02) >> 1)
status['FastSpeed']=bool((A & 0x04) >> 2)
status['Stopped']=not(B & 0x01)
status['Blocked']=bool((B & 0x02) >> 1)
status['InitDone']=not(C & 0x01)
status['LevelSwitchOn']=bool((B & 0x02) >> 1)
return status
[docs] def axis_set_motion_mode(self,axis,Tracking,CW=True,fastSpeed=False):
'''Set Motion Mode.
NOTE: Channel will always be set to Tracking Mode after stopped
Motion mode msg is 1byte msg (2 HEX digits)
HEX Digit 1 bits:
* B0: 0=Goto, 1=Tracking
* B1: 0=Slow, 1=Fast (T)
0=Fast, 1=Slow (G)
* B2: 0=S/F, 1=Medium
* B3: 1x SlowGoto
HEX Digit 2 bits:
* B0: 0=CW,1=CCW
* B1: 0=Noth,1=South
* B2: 0=Normal Goto,1=Coarse Goto
'''
if not Tracking:
if fastSpeed:
speedBit=0
else:
speedBit=1
else:
if fastSpeed:
speedBit=1
else:
speedBit=0
if Tracking:
value=16
else:
value=0
value=value+speedBit*32+CW
#Send as two HEX digits
logging.info(f'AXIS{axis}: Setting Motion Mode: {value} HEX:{value:02X}')
response=self._send_cmd('G',axis,value,ndigits=2)
return response
def _set_T1_preset(self,axis,value):
'''Set step period for tracking speed'''
logging.info(f'AXIS{axis}: Setting step_period to: {value} counts per seconds')
response=self._send_cmd('I',axis,value)
return response
[docs] def axis_get_posCounts(self,axis):
'''Get actual postion in StepsCounts.'''
#Position values are offseting by 0x800000
response=self._send_cmd('j',axis)-0x800000
return response
[docs] def axis_set_goto_targetCounts(self,axis,targetCounts):
'''GoTo Target value in StepsCounts. Motors has to be stopped'''
logging.info(f'AXIS{axis}: Setting goto target to {targetCounts} counts')
#Position values are offseting by 0x800000
response=self._send_cmd('S',axis,targetCounts+0x800000)
return response
[docs] def axis_set_goto_targetIncrementCounts(self,axis,targetCounts):
#NOT IN USE. HAVE TO BE TESTED!!
'''GoTo Target increment in StepsCounts. Motors has to be stopped'''
logging.info(f'AXIS{axis}: Setting goto target INCREMENT to {targetCounts} counts')
#Position values are offseting by 0x800000
response=self._send_cmd('H',axis,targetCounts+0x800000)
#Set Brake Point Increment
response=self._send_cmd('M',axis,0x000DAC)
return response
def axis_wait2stop(self,axis):
logging.info(f'AXIS{axis}: Waitting to stop.')
self.update_current_values()
while not self.values[axis]['Status']['Stopped']:
time.sleep(1)
self.update_current_values()
logging.info(f'AXIS{axis}: Stopped')
[docs] def axis_set_posCounts(self,axis,counts):
'''Syncronize position Counts.'''
logging.info(f'AXIS{axis}: Syncronizing actual position to {counts} counts')
#Position values are offseting by 0x800000
response=self._send_cmd('E',axis,counts+0x800000)
return response
[docs] def axis_set_goto_target(self,axis,targetDegrees):
'''GoTo Target value in Degrees. Motors has to be stopped'''
logging.info(f'AXIS{axis}: Setting goto target to {targetDegrees} degrees')
posCounts=self.degrees2counts(axis,targetDegrees)
response=self.axis_set_goto_targetCounts(axis,int(posCounts))
return response
def axis_goto(self,axis,targetDegrees):
self.axis_stop_motion(axis)
actualPos=self.axis_get_pos(axis)
self.axis_set_motion_mode(axis,False,(targetDegrees<actualPos),True)
self.axis_set_goto_target(axis,targetDegrees)
self.axis_start_motion(axis)
[docs] def axis_set_speed(self,axis,degreesPerSecond):
'''Set the tracking speed in degreesPerSecond'''
logging.info(f'AXIS{axis}: Setting speed to:{degreesPerSecond} degrees per second')
if degreesPerSecond!=0:
response=self._set_T1_preset(axis,int(self._degreesPerSecond2T1preset(axis,abs(degreesPerSecond))))
else:
logging.info(f'AXIS{axis}: Requested speed==0. Stopping axis')
response=self.axis_stop_motion(axis)
return response
def axis_track(self,axis,speed):
#Check if we need to stop axis
self.update_current_values(axis)
stopped=self.values[axis]['Status']['Stopped']
CW=not self.values[axis]['Status']['CCW']
tracking=self.values[axis]['Status']['Tracking']
if not stopped:
if not tracking or (CW and (speed <0)) or (not CW and (speed >0)):
logging.info(f'TRACK asked to change dir or mode tracking:{tracking} CW:{CW} speed:{speed}')
self.axis_stop_motion(axis,syncronous=True)
self.axis_set_motion_mode(axis,True,(speed <0),False)
self.axis_set_speed(axis,speed)
self.axis_start_motion(axis)
return
else:
self.axis_set_speed(axis,speed)
return
else:
self.axis_set_motion_mode(axis,True,(speed <0),False)
self.axis_set_speed(axis,speed)
self.axis_start_motion(axis)
return
[docs] def axis_start_motion(self,axis):
'''Start Goto'''
response=self._send_cmd('J',axis)
logging.info(f'AXIS{axis}: Starting motion')
return response
[docs] def axis_stop_motion(self,axis,syncronous=True):
'''Soft stop. If syncronous==True wait to finish'''
logging.info(f'AXIS{axis}: Stopping')
response=self._send_cmd('K',axis)
if syncronous:
self.axis_wait2stop(axis)
else:
logging.info(f'AXIS{axis}: Ask to stop. In progress')
return response
#HIGH LEVEL API (arguments in degrees)
[docs] def degrees2counts(self,axis,degrees):
'''Return position or speed in counts for a given deg or deg/seconds value'''
CPR=self.params[axis]['countsPerRevolution']
value=degrees*CPR/360
return value
[docs] def counts2degrees(self,axis,counts):
'''Return position or speed in degrees for a given counts or counts/seconds value'''
CPR=self.params[axis]['countsPerRevolution']
value=counts*360/CPR
return value
[docs] def set_switch(self,on):
'''Switch on/off auxiliary switch'''
if on:
value=1
else:
value=0
logging.info(f'Auxiliary switch: {on}')
response=self._send_cmd('O',1,value,ndigits=1)
return response
[docs] def set_pos(self,alpha,beta):
''' Syncronize actual position with alpha and beta'''
self.axis_set_pos(1,alpha)
self.axis_set_pos(2,beta)
[docs] def goto(self,alpha,beta,syncronous=False):
'''GOTO. alpha,beta in degrees'''
logging.info(f'GOTO axis1={alpha} axis2={beta} degrees')
angle={}
angle[1]=alpha
angle[2]=beta
for axis in [1,2]:
self.axis_goto(axis,angle[axis])
'''
self.axis_stop_motion(axis)
self.axis_set_motion_mode(axis,False,False,True)
self.axis_set_goto_target(axis,angle[axis])
self.axis_start_motion(axis)
'''
if syncronous:
for axis in [1,2]:
self.axis_wait2stop(axis)
[docs] def track(self,alpha,beta):
'''GOTO. alpha,beta in degrees per second'''
logging.info(f'TRACK speeds axis1={alpha} axis2={beta} degrees per seconds')
self.axis_track(1,alpha)
self.axis_track(2,beta)
[docs] def update_current_values(self,logaxis=2):
'''Update current status and values
logaxis can be 1,2,3 or None. 1 for only log current values of axis 1...
'''
parameterDict={ 'GotoTarget':'h',
'Position':'j',
'StepPeriod':'i',
'Status':'f'
}
try:
params=self.get_values(parameterDict)
except NameError as error:
logging.warning(error)
return {}
for parameter in ['GotoTarget','Position']:
for axis in range(1,3):
#Position values are offseting by 0x800000
params[axis][parameter]=params[axis][parameter]-0x800000
for axis in range(1,3):
params[axis]['Status']=self._decode_status(params[axis]['Status'])
self.values=params
if logaxis==3:
logging.info(f'{params}')
if logaxis in [1,2]:
logging.info(f'AXIS{logaxis} {params[logaxis]}')
return params
#Methods for developing
def _test_goto(self,axis=2,X=90):
'''Test GOTO. X in degrees'''
logging.info(f'AXIS{axis}: GOTO test')
self.axis_stop_motion(axis)
self.axis_set_motion_mode(axis,False,X,False)
self.axis_set_goto_target(axis,X)
self.axis_start_motion(axis)
self.axis_wait2stop(axis)
def _test_slew(self,axis=1,speed=1):
'''Test SLEW'''
logging.info(f'AXIS{axis}: SLEW test')
self.axis_stop_motion(axis)
self.axis_set_motion_mode(axis,True,(speed>=0),True)
self.axis_set_speed(axis,speed)
self.axis_start_motion(axis)
if __name__ == '__main__':
smc=motors()
smc.set_pos(0,0)
#AXIS to test
AXIS=2
#Test GOTO
if True:
smc.test_goto(axis=AXIS,X=45)
smc.test_goto(axis=AXIS,X=0)
exit(0)
#Test SLEW
if False:
smc.test_slew(axis=AXIS,speed=.5)
smc.axis_wait2stop(AXIS)
exit(0)
#Two axes goto
if True:
smc.goto(5,15)
smc.goto(0,0)
exit(0)
#Ramp tracking
if False:
for speed in range(0,50,1):
time.sleep(.1)
smc.axis_track(AXIS,speed/10)
for speed in range(50,0,-1):
time.sleep(.1)
smc.axis_track(AXIS,speed/10)
for speed in range(0,50,1):
time.sleep(.1)
smc.axis_track(AXIS,-speed/10)
for speed in range(50,0,-1):
time.sleep(.1)
smc.axis_track(AXIS,-speed/10)
exit(0)
#GOTO/TRACK interrupts
if True:
pass