Source code for mylinux.model.linux.Executor

from __future__ import print_function
import subprocess
import glob
from mylinux.constants import runTime

[docs]class Executor: """Main model terminal executor. This class handles shell bash executing, returning stdout/stderr etc... Attributes: __errorsCount (int): Count errors. __executeCount (int): Count how many command was executed. __status (int): Holding status variables for user visualization of the progress. """ def __init__(self): self.__errorsCount = 0 self.__executeCount = 0 self.__exit = False self.cwd = None self.__status = { 'name': None, 'count_all': 0, 'count': 0, 'cmd_string': None } @property def exit(self): return self.__exit @property def errorsCount(self): """ Return: __errorsCount """ return self.__errorsCount @property def executeCount(self): """ Return: __executeCount """ return self.__executeCount @property def status(self): """ Return: __status """ return self.__status def __display(self, line): """Print display line of executing info. Args: line (str): What string should be showed to the user when there is active subprocess. """ statusLine = self.__getStatusLine() if line: if len(statusLine) > len(line): line = line.replace('\n', '') line += ' ' * (len(statusLine) - len(line)) line += '\n' print(line, end="") print(statusLine, end="\r") def __getStatusLine(self): """Format and return status line to be showed. Return: Pretty status line """ return '{0} : ({1}/{2}) ERR.({3}) ===> {4}'.format( self.__status['name'], self.__status['count_all'], self.__status['count'], self.__errorsCount, self.__status['cmd_string'] )
[docs] def executeArrays(self, cmdArrays): """Wraper around execute method. Args: cmdArrays (arr-arr-str): Command arrays that must be executed. Return: [Pass (boo),stdoutAll,stderrAll] Other Parameters: Execute array Fill stdout/stderr with data """ stdoutAll = '' stderrAll = '' for i in range(len(cmdArrays)): didPass, stdout, stderr = self.execute(cmdArrays[i]) stdoutAll += stdout stderrAll += stderr if not didPass: return [False, stdoutAll, stderrAll] return [True, stdoutAll, stderrAll] #Todo: doc this thing...
def __supportedCmdArray(self, cmdArray): #Todo: Add msg to the constants... cwd = self.cwd.split('/') newCmdArray = [] for i,arg in enumerate(cmdArray): if '~' in arg: if '~' in arg.split('/')[0]: arg = arg.replace('~',runTime.homePath) if '..' in arg: if '..' in arg.split('/')[0]: arg = arg.replace('..','/'.join(cwd[:-1])) if '*' in arg: args = glob.glob(arg) for arg in args: newCmdArray.append(arg) continue newCmdArray.append(arg) return newCmdArray
[docs] def execute(self, cmdArray): """Main executing method for shell executing. Args: cmdArray (arr-str): Commands to be executed in shell. Return: [Pass (boo), stdout, stderr] Other Parameters: Add 1 to the __executeCount variable. Update status with cmd string. Make new processs for cmdArray and pipe stdout/stderr. Check for user interupts (KeyboardInterrupt,SystemExit). Check if error has been caught, if tru raise __errorsCount variable Display stdout/stderr line. Check for exit code Close process. """ self.__executeCount += 1 self.__status['cmd_string'] = ' '.join(cmdArray) stdout = '' stderr = '' try: try: cmdArray = self.__supportedCmdArray(cmdArray=cmdArray) process = subprocess.Popen(cmdArray,cwd=self.cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) except OSError: self.__errorsCount += 1 return [False, '', 'ERROR : command(' + ' '.join(cmdArray) + ') could not get executed!'] for line in iter(process.stdout.readline, b''): try: echoLine = line.decode("utf-8") except: echoLine = str(line) self.__display(echoLine) stdout += echoLine for line in iter(process.stderr.readline, b''): try: echoLine = line.decode("utf-8") except: echoLine = str(line) self.__display(echoLine) stderr += echoLine except (KeyboardInterrupt,SystemExit) as err: self.__exit = True return [False,'',str(err)] process.stdout.close() returnCode = process.wait() if returnCode != 0 or stderr != '': self.__errorsCount += 1 return [False, stdout, stderr] else: return [True, stdout, stderr]