diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/compute.py b/compute.py new file mode 100644 index 00000000..cf47183b --- /dev/null +++ b/compute.py @@ -0,0 +1,41 @@ + +import processGraph as pg + +import os +import sys + +import argparse + + +parser = argparse.ArgumentParser(description='Execute a Graph of processes.') +parser.add_argument('graphFile', metavar='GRAPHFILE.mg', type=str, + help='Filepath to a graph file.') +parser.add_argument('--node', metavar='NODE_NAME', type=str, + help='Process the node alone.') +parser.add_argument('--graph', metavar='NODE_NAME', type=str, + help='Process the node and all previous nodes needed.') +parser.add_argument("--force", help="Force recompute", + action="store_true") + +args = parser.parse_args() + +# graph = pg.Graph('bashTest') +# ls = graph.addNewNode('Ls', input='/tmp') +# appendText = graph.addNewNode('AppendText', inputText='plop') +# graph.addEdge(ls.output, appendText.input) +## graph.save(args.graphFile) + +graph = pg.loadGraph(args.graphFile) +graph.update() + +if args.node: + # Execute the node + node = graph.nodes[args.node] + if args.force or node.status.status != pg.Status.SUCCESS: + node.process() +else: + startNodes = None + if args.graph: + startNodes = [graph.nodes[args.graph]] + pg.execute(graph, startNodes=startNodes, force=args.force) + diff --git a/newNodeType.py b/newNodeType.py new file mode 100644 index 00000000..bff941b8 --- /dev/null +++ b/newNodeType.py @@ -0,0 +1,55 @@ + +import processGraph as pg + +import os +import sys +import re +import argparse + + +parser = argparse.ArgumentParser(description='Create a new Node Type') +parser.add_argument('node', metavar='NodeName', type=str, + help='New node name') + +args = parser.parse_args() + +if sys.stdin.isatty(): + print('No input documentation.') + print('Usage: YOUR_COMMAND --help | {cmd} YourCommand'.format(cmd=os.path.splitext(__file__)[0])) + exit(-1) + +inputDoc = [line for line in sys.stdin] +inputArgs = [line for line in inputDoc if '--' in line] + +arg_re = re.compile('.*?--(?P\w+).*?') + + +def convertToLabel(name): + camelCaseToLabel = re.sub('(.)([A-Z][a-z]+)', r'\1 \2', name) + snakeToLabel = ' '.join(word.capitalize() for word in camelCaseToLabel.split('_')) + snakeToLabel = ' '.join(word.capitalize() for word in snakeToLabel.split(' ')) + # print name, camelCaseToLabel, snakeToLabel + return snakeToLabel + + +outputNodeStr = ''' +import processGraph as pg + +class __COMMANDNAME__(pg.CommandLineNodeDesc): + internalFolder = '{nodeType}/{uid0}/' + cmdLineExpr = '{nodeType} {allParams}' +'''.replace('__COMMANDNAME__', 'args.node') + +for inputArg in inputArgs: + paramName = arg_re.match(inputArg).group('longName') + + inputArgLower = inputArg.lower() + isFile = 'path' in inputArgLower or 'folder' in inputArgLower or 'file' in inputArgLower + outputNodeStr += ''' + {name} = pg.{attributeType}( + label='{label}', + uid=[0], + )'''.format(name=paramName, label=convertToLabel(paramName), attributeType='FileAttribute' if isFile else 'ParamAttribute') + + +print(outputNodeStr) \ No newline at end of file diff --git a/nodes/AppendText.py b/nodes/AppendText.py new file mode 100644 index 00000000..17c353a2 --- /dev/null +++ b/nodes/AppendText.py @@ -0,0 +1,19 @@ +from processGraph import desc + +class AppendText(desc.CommandLineNode): + commandLine = 'cat {inputValue} > {outputValue} && echo {inputTextValue} >> {outputValue}' + input = desc.FileAttribute( + label='Input File', + uid=[0], + ) + output = desc.FileAttribute( + label='Output', + value='{cache}/{nodeType}/{uid0}/appendText.txt', + isOutput=True, + hasExpression=True, + ) + inputText = desc.FileAttribute( + label='Input Text', + uid=[0], + ) + diff --git a/nodes/Ls.py b/nodes/Ls.py new file mode 100644 index 00000000..c3fd339f --- /dev/null +++ b/nodes/Ls.py @@ -0,0 +1,16 @@ +from processGraph import desc + + +class Ls(desc.CommandLineNode): + commandLine = 'ls {inputValue} > {outputValue}' + input = desc.FileAttribute( + label='Input', + uid=[0], + ) + output = desc.FileAttribute( + label='Output', + value='{cache}/{nodeType}/{uid0}/ls.txt', + isOutput=True, + hasExpression=True, + ) + diff --git a/nodes/__init__.py b/nodes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/processGraph/Graph.py b/processGraph/Graph.py new file mode 100644 index 00000000..9b3bb2e7 --- /dev/null +++ b/processGraph/Graph.py @@ -0,0 +1,604 @@ +from __future__ import print_function + +import processGraph as pg + +from collections import defaultdict +import psutil +import inspect +from pprint import pprint +import json + +# Replace default encoder to support Enums +DefaultJSONEncoder = json.JSONEncoder # store the original one +class MyJSONEncoder(DefaultJSONEncoder): # declare a new one with Enum support + def default(self, obj): + if isinstance(obj, Enum): + return obj.name + return DefaultJSONEncoder.default(self, obj) # use the default one for all other types +json.JSONEncoder = MyJSONEncoder # replace the default implementation with our new one + +import time +import os +import uuid + +import threading +import subprocess +import hashlib + +from enum import Enum # available by default in python3. For python2: "pip install enum34" + + +try: + unicode = unicode +except NameError: + # 'unicode' is undefined, must be Python 3 + str = str + unicode = str + bytes = bytes + basestring = (str, bytes) +else: + # 'unicode' exists, must be Python 2 + str = str + unicode = unicode + bytes = str + basestring = basestring + + +def hash(v): + hashObject = hashlib.sha1(str(v).encode('utf-8')) + return hashObject.hexdigest() + + +class Attribute: + ''' + ''' + def __init__(self, name, node, attributeDesc): + self.attrName = name + self.node = node + self._value = attributeDesc.__dict__.get('value', None) + self.attributeDesc = attributeDesc + + def aboluteName(self): + return '{}.{}.{}'.format(self.node.graph.name, self.node.name, self.attrName) + + def name(self): + ''' + Name inside the Graph. + ''' + return '{}.{}'.format(self.node.name, self.attrName) + + def uid(self): + ''' + ''' + if self.attributeDesc.isOutput: + # only dependent of the linked node uid, so it is independant of the cache folder which may be used in the filepath. + return self.node.uid() + if self.isLink(): + return self.node.graph.edges[self].uid() + if isinstance(self._value, basestring): + return hash(str(self._value)) + return hash(self._value) + + def isLink(self): + ''' + If the attribute is a link to another attribute. + ''' + if self.attributeDesc.isOutput: + return False + else: + return self in self.node.graph.edges + + def getLinkParam(self): + if not self.isLink(): + return None + return self.node.graph.edges[self] + + def _applyExpr(self): + ''' + For string parameters with an expression (when loaded from file), + this function convert the expression into a real edge in the graph + and clear the string value. + ''' + v = self._value + if isinstance(v, basestring) and len(v) > 2 and v[0] == '{' and v[-1] == '}': + # value is a link to another attribute + g = self.node.graph + link = v[1:-1] + linkNode, linkAttr = link.split('.') + g.addEdge(g.nodes[linkNode].attributes[linkAttr], self) + self._value = "" + + def getExportValue(self): + value = self._value + # print('getExportValue: ', self.name(), value, self.isLink()) + if self.isLink(): + value = '{' + self.node.graph.edges[self].name() + '}' + return value + + +class Status(Enum): + ''' + ''' + NONE = 1 + SUBMITTED_EXTERN = 2 + SUBMITTED_LOCAL = 3 + RUNNING = 4 + ERROR = 5 + SUCCESS = 6 + + +class Statistics: + ''' + ''' + def __init__(self): + self.duration = 0 # computation time set at the end of the execution + self.cpuUsage = [] + self.nbCores = 0 + self.cpuFreq = 0 + self.ramUsage = [] # store cpuUsage every minute + self.ramAvailable = 0 # GB + self.vramUsage = [] + self.vramAvailable = 0 # GB + self.swapUsage = [] + self.swapAvailable = 0 + def toDict(self): + return self.__dict__ + def fromDict(self, d): + self.__dict__ = d + + +class StatusData: + ''' + ''' + def __init__(self, nodeName, nodeType): + self.status = Status.NONE + self.nodeName = nodeName + self.nodeType = nodeType + self.statistics = Statistics() + self.graph = '' + + def toDict(self): + return {k: (v.toDict() if getattr(v, "toDict", None) else v) for k, v in self.__dict__.items()} + + def fromDict(self, d): + self.status = Status._member_map_[d['status']] + self.nodeName = d['nodeName'] + self.nodeType = d['nodeType'] + self.statistics.fromDict(d['statistics']) + self.graph = d['graph'] + + +bytesPerGiga = 1024.*1024.*1024. + +class StatisticsThread(threading.Thread): + + def __init__(self, node): + threading.Thread.__init__(self) + self.node = node + self.running = True + self.statistics = self.node.status.statistics + self.initStats() + + def initStats(self): + self.lastTime = time.time() + self.statistics.duration = 0 + self.statistics.cpuUsage = [] + self.statistics.nbCores = psutil.cpu_count(logical=False) + self.statistics.cpuFreq = psutil.cpu_freq()[2] + self.statistics.ramUsage = [] + self.statistics.ramAvailable = psutil.virtual_memory().total / bytesPerGiga + self.statistics.swapUsage = [] + self.statistics.swapAvailable = psutil.swap_memory().total / bytesPerGiga + self.statistics.vramUsage = [] + self.statistics.vramAvailable = 0 + self.updateStats() + + def updateStats(self): + self.lastTime = time.time() + self.statistics.cpuUsage.append(psutil.cpu_percent(interval=0.1, percpu=True)) + self.statistics.ramUsage.append(psutil.virtual_memory().percent) + self.statistics.swapUsage.append(psutil.swap_memory().percent) + self.statistics.vramUsage.append(0) + self.node.saveStatusFile() + + def run(self): + while self.running: + if time.time() - self.lastTime > 10: + self.updateStats() + time.sleep(1) + + +class Node: + ''' + ''' + name = None + graph = None + + + def __init__(self, nodeDesc, **kwargs): + self.nodeDesc = pg.nodesDesc[nodeDesc]() + self.attributes = {} + self.attributesPerUid = defaultdict(set) + self._initFromDesc() + for k, v in kwargs.items(): + self.attributes[k]._value = v + self.status = StatusData(self.name, self.nodeType()) + + def __getattr__(self, k): + try: + # Throws exception if not in prototype chain + # return object.__getattribute__(self, k) # doesn't work in python2 + return object.__getattr__(self, k) + except AttributeError: + try: + return self.attributes[k] + except KeyError: + raise AttributeError(k) + + def _initFromDesc(self): + # Init from class members + for name, desc in self.nodeDesc.__class__.__dict__.items(): + if issubclass(desc.__class__, pg.desc.Attribute): + self.attributes[name] = Attribute(name, self, desc) + # Init from instance members + for name, desc in self.nodeDesc.__dict__.items(): + if issubclass(desc.__class__, pg.desc.Attribute): + self.attributes[name] = Attribute(name, self, desc) + # List attributes per uid + for name, attr in self.attributes.items(): + for uidIndex in attr.attributeDesc.uid: + self.attributesPerUid[uidIndex].add(attr) + + def _applyExpr(self): + for attr in self.attributes.values(): + attr._applyExpr() + + def nodeType(self): + return self.nodeDesc.__class__.__name__ + + def uid(self): + return self.nodeUid + + def _updateUid(self): + hashInputParams = [(attr.attrName, attr.uid()) for attr in self.attributes.values() if not attr.attributeDesc.isOutput] + hashInputParams.sort() + self.nodeUid = hash(tuple([b for a, b in hashInputParams])) + return self.nodeUid + + def getDepth(self): + return self.graph.getDepth(self) + + def toDict(self): + attributes = {k: v.getExportValue() for k, v in self.attributes.items()} + return { + 'nodeType': self.nodeType(), + 'attributes': {k: v for k, v in attributes.items() if v is not None}, # filter empty values + } + + def updateInternals(self): + self._updateUid() + + self._cmdVars = {} + for uidIndex, associatedAttributes in self.attributesPerUid.items(): + assAttr = [(a.attrName, a.uid()) for a in associatedAttributes] + assAttr.sort() + self._cmdVars['uid{}'.format(uidIndex)] = hash(tuple([b for a, b in assAttr])) + + for name, attr in self.attributes.items(): + if attr.attributeDesc.isOutput: + attr._value = attr.attributeDesc.value.format( + cache=pg.cacheFolder, + nodeType=self.nodeType(), + **self._cmdVars) # self._cmdVars only contains uids at this step + + for name, attr in self.attributes.items(): + linkAttr = attr.getLinkParam() + v = attr._value + if linkAttr: + v = linkAttr._value + + self._cmdVars[name] = '--{name} {value}'.format(name=name, value=v) + self._cmdVars[name + 'Value'] = str(v) + self._cmdVars[attr.attributeDesc.group] = self._cmdVars.get(attr.attributeDesc.group, '') + ' ' + self._cmdVars[name] + + def internalFolder(self): + return self.nodeDesc.internalFolder.format(nodeType=self.nodeType(), **self._cmdVars) + + def commandLine(self): + return self.nodeDesc.commandLine.format(nodeType=self.nodeType(), **self._cmdVars) + + def statusFile(self): + return os.path.join(pg.cacheFolder, self.internalFolder(), 'status') + + def logFile(self): + return os.path.join(pg.cacheFolder, self.internalFolder(), 'log') + + def updateStatusFromCache(self): + ''' + Need up-to-date UIDs. + ''' + statusFile = self.statusFile() + if not os.path.exists(statusFile): + self.status.status = Status.NONE + return + with open(statusFile, 'r') as jsonFile: + statusData = json.load(jsonFile) + self.status.fromDict(statusData) + + def saveStatusFile(self): + ''' + Need up-to-date UIDs. + ''' + data = self.status.toDict() + statusFilepath = self.statusFile() + folder = os.path.dirname(statusFilepath) + if not os.path.exists(folder): + os.makedirs(folder) + statusFilepathWriting = statusFilepath + '.writing.' + str(uuid.uuid4()) + with open(statusFilepathWriting, 'w') as jsonFile: + json.dump(data, jsonFile, indent=4) + os.rename(statusFilepathWriting, statusFilepath) + + def upgradeStatusTo(self, newStatus): + if int(newStatus.value) <= int(self.status.status.value): + print('WARNING: downgrade status on node "{}" from {} to {}'.format(self.name, self.status.status.name, newStatus)) + self.status.status = newStatus + self.saveStatusFile() + + def submit(self): + self.upgradeStatusTo(pg.Status.SUBMITTED_EXTERN) + + def beginSequence(self): + self.upgradeStatusTo(pg.Status.SUBMITTED_LOCAL) + + def process(self): + self.upgradeStatusTo(pg.Status.RUNNING) + statThread = StatisticsThread(self) + statThread.start() + try: + with open(self.logFile(), 'w') as logF: + cmd = self.commandLine() + print(' =====> commandLine: ', cmd) + print(' - logFile: ', self.logFile()) + subprocess.call(cmd, stdout=logF, stderr=logF, shell=True) + except: + self.upgradeStatusTo(pg.Status.ERROR) + raise + statThread.running = False + statThread.join() + + self.upgradeStatusTo(pg.Status.SUCCESS) + + def endSequence(self): + pass + +WHITE = 0 +GRAY = 1 +BLACK = 2 + +class Visitor: + # def initializeVertex(self, s, g): + # '''is invoked on every vertex of the graph before the start of the graph search.''' + # pass + # def startVertex(self, s, g): + # '''is invoked on the source vertex once before the start of the search.''' + # pass + def discoverVertex(self, u, g): + '''is invoked when a vertex is encountered for the first time.''' + pass + # def examineEdge(self, e, g): + # '''is invoked on every out-edge of each vertex after it is discovered.''' + # pass + # def treeEdge(self, e, g): + # '''is invoked on each edge as it becomes a member of the edges that form the search tree. If you wish to record predecessors, do so at this event point.''' + # pass + # def backEdge(self, e, g): + # '''is invoked on the back edges in the graph.''' + # pass + # def forwardOrCrossEdge(self, e, g): + # '''is invoked on forward or cross edges in the graph. In an undirected graph this method is never called.''' + # pass + # def finishEdge(self, e, g): + # '''is invoked on the non-tree edges in the graph as well as on each tree edge after its target vertex is finished.''' + # pass + def finishVertex(self, u, g): + '''is invoked on a vertex after all of its out edges have been added to the search tree and all of the adjacent vertices have been discovered (but before their out-edges have been examined).''' + pass + + +class Graph: + ''' + _________________ _________________ _________________ + | | | | | | + | Node A | | Node B | | Node C | + | | edge | | edge | | + |input output|>---->|input output|>---->|input output| + |_______________| |_______________| |_______________| + + Data structures: + + nodes = {'A': , 'B': , 'C': } + edges = {B.input: A.output, C.input: B.output,} + + + ''' + def __init__(self, name): + self.name = name + self.nodes = {} + self.edges = {} # key/input <- value/output, it is organized this way because key/input can have only one connection. + + def addNode(self, node, uniqueName=None): + if node.graph != None and node.graph != self: + raise RuntimeError('Node "{}" cannot be part of the Graph "{}", as it is already part of the other graph "{}".'.format( + self.node.nodeType(), self.name, node.graph.name)) + if uniqueName: + assert(uniqueName not in self.nodes) + node.name = uniqueName + else: + node.name = self._createUniqueNodeName(node.nodeType()) + node.graph = self + self.nodes[node.name] = node + + return node + + def addNewNode(self, nodeName, *args, **kwargs): + return self.addNode(Node(nodeDesc=nodeName, *args, **kwargs)) + + def _createUniqueNodeName(self, inputName): + i = 1 + while i: + newName = "{name}_{index}".format(name=inputName, index=i) + if newName not in self.nodes: + return newName + i += 1 + + def getLeaves(self): + nodesWithOuput = set([outputAttr.node for outputAttr in self.edges.values()]) + return set(self.nodes.values()) - nodesWithOuput + + def addEdge(self, outputAttr, inputAttr): + assert(isinstance(outputAttr, Attribute)) + assert(isinstance(inputAttr, Attribute)) + if(outputAttr.node.graph != self or inputAttr.node.graph != self): + raise RuntimeError('The attributes of the edge should be part of a common graph.') + if inputAttr in self.edges: + raise RuntimeError('Input attribute "{}" is already connected.'.format(inputAttr.fullName())) + self.edges[inputAttr] = outputAttr + + def addEdges(self, *edges): + for edge in edges: + self.addEdge(*edge) + + def getDepth(self, node): + return len(self.dfsNodesOnFinish([node])) + + def _getNodeEdges(self): + nodeEdges = defaultdict(set) + + for attrInput, attrOutput in self.edges.items(): + nodeEdges[attrInput.node].add(attrOutput.node) + + return nodeEdges + + def dfs(self, visitor, startNodes=None): + nodeChildrens = self._getNodeEdges() + colors = {} + for u in self.nodes.values(): + colors[u] = WHITE + time = 0 + if startNodes: + for startNode in startNodes: + self.dfsVisit(startNode, visitor, colors, nodeChildrens) + else: + leaves = self.getLeaves() + for u in leaves: + if colors[u] == WHITE: + self.dfsVisit(u, visitor, colors, nodeChildrens) + + def dfsVisit(self, u, visitor, colors, nodeChildrens): + colors[u] = GRAY + visitor.discoverVertex(u, self) + # d_time[u] = time = time + 1 + for v in nodeChildrens[u]: + if colors[v] == WHITE: + # (u,v) is a tree edge + self.dfsVisit(v, visitor, colors, nodeChildrens) # TODO: avoid recursion + elif colors[v] == GRAY: + pass # (u,v) is a back edge + elif colors[v] == BLACK: + pass # (u,v) is a cross or forward edge + colors[u] = BLACK + visitor.finishVertex(u, self) + + def dfsNodesOnFinish(self, startNodes=None): + nodes = [] + visitor = Visitor() + visitor.finishVertex = lambda vertex, graph: nodes.append(vertex) + self.dfs(visitor=visitor, startNodes=startNodes) + return nodes + + def dfsNodesToProcess(self, startNodes=None): + nodes = [] + visitor = Visitor() + def finishVertex(vertex, graph): + if vertex.status.status in (Status.SUBMITTED_EXTERN, + Status.SUBMITTED_LOCAL): + print('WARNING: node "{}" is already submitted.'.format(vertex.name)) + if vertex.status.status is Status.RUNNING: + print('WARNING: node "{}" is already running.'.format(vertex.name)) + if vertex.status.status is not Status.SUCCESS: + nodes.append(vertex) + visitor.finishVertex = finishVertex + self.dfs(visitor=visitor, startNodes=startNodes) + return nodes + + def _applyExpr(self): + for node in self.nodes.values(): + node._applyExpr() + + def toDict(self): + return {k: node.toDict() for k, node in self.nodes.items()} + + def save(self, filepath): + ''' + ''' + data = self.toDict() + pprint(data) + with open(filepath, 'w') as jsonFile: + json.dump(data, jsonFile, indent=4) + + def updateInternals(self, startNodes=None): + nodes = self.dfsNodesOnFinish(startNodes=startNodes) + for node in nodes: + node.updateInternals() + + def updateStatusFromCache(self): + for node in self.nodes.values(): + node.updateStatusFromCache() + + def update(self): + self.updateInternals() + self.updateStatusFromCache() + + +def loadGraph(filepath): + ''' + ''' + graphData = None + with open(filepath) as jsonFile: + graphData = json.load(jsonFile) + if not isinstance(graphData, dict): + raise RuntimeError('loadGraph error: Graph is not a dict. File: {}'.format(filepath)) + + graph = Graph(os.path.splitext(os.path.basename(filepath))[0]) + for nodeName, nodeData in graphData.items(): + if not isinstance(nodeData, dict): + raise RuntimeError('loadGraph error: Node is not a dict. File: {}'.format(filepath)) + n = Node(nodeData['nodeType'], **nodeData['attributes']) + graph.addNode(n, uniqueName=nodeName) + graph._applyExpr() + return graph + + +def execute(graph, startNodes=None, force=False): + ''' + ''' + if force: + nodes = graph.dfsNodesOnFinish(startNodes=startNodes) + else: + nodes = graph.dfsNodesToProcess(startNodes=startNodes) + + print('execute: ', str([n.name for n in nodes])) + + for node in nodes: + node.updateInternals() + + for node in nodes: + node.beginSequence() + + for node in nodes: + node.process() + + for node in nodes: + node.endSequence() + diff --git a/processGraph/__init__.py b/processGraph/__init__.py new file mode 100644 index 00000000..02cb775a --- /dev/null +++ b/processGraph/__init__.py @@ -0,0 +1,55 @@ +from .Graph import * +from . import desc + +import os +import sys +import re +import inspect +import importlib +import json + +cacheFolder = '/tmp/processGraphCache' +nodesDesc = {} + + +def loadNodesDesc(folder, package='nodes'): + ''' + ''' + global nodesDesc + + pysearchre = re.compile('.py$', re.IGNORECASE) + pluginfiles = filter(pysearchre.search, + os.listdir(os.path.join(folder, + package))) + # import parent module + importlib.import_module(package) + nodeTypes = [] + errors = [] + for pluginFile in pluginfiles: + if pluginFile.startswith('__'): + continue + + try: + pluginName = os.path.splitext(pluginFile)[0] + module = '.' + pluginName + m = importlib.import_module(module, package=package) + p = [a for a in m.__dict__.values() if inspect.isclass(a) and issubclass(a, desc.Node)] + if not p: + raise RuntimeError('No class defined in plugin: %s' % module) + nodeTypes.extend(p) + except Exception as e: + errors.append(' * Errors while loading "{}".\n File: {}\n {}'.format(pluginName, pluginFile, str(e))) + + + nodesDesc = dict([(m.__name__, m) for m in nodeTypes]) + print('Plugins loaded: ', ', '.join(nodesDesc.keys())) + if errors: + print('== Error while loading the following plugins: ==') + print('\n'.join(errors)) + print('================================================') + + return nodeTypes + +# Load plugins +loadNodesDesc(folder=os.path.dirname(os.path.dirname(__file__))) + diff --git a/processGraph/desc.py b/processGraph/desc.py new file mode 100644 index 00000000..611d4e00 --- /dev/null +++ b/processGraph/desc.py @@ -0,0 +1,45 @@ +import inspect +import os + + +class Attribute: + ''' + ''' + isOutput = False + uid = [] + group = 'allParams' + commandLine = '{nodeType} --help' # need to be overridden + def __init__(self): + pass + + +class FileAttribute(Attribute): + ''' + ''' + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class ParamAttribute(Attribute): + ''' + ''' + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class Node: + ''' + ''' + internalFolder = '{nodeType}/{uid0}/' + + def __init__(self): + pass + + +class CommandLineNode(Node): + ''' + ''' + def __init__(self): + pass diff --git a/tests/test_task.py b/tests/test_task.py new file mode 100644 index 00000000..f66d6800 --- /dev/null +++ b/tests/test_task.py @@ -0,0 +1,23 @@ +import processGraph as pg +from nose.tools import * + + +def test_depth(): + graph = pg.Graph('Tests tasks depth') + + tA = graph.addNewNode('Ls', input='/tmp') + tB = graph.addNewNode('AppendText', inputText='echo B') + tC = graph.addNewNode('AppendText', inputText='echo C') + + graph.addEdges( + (tA.output, tB.input), + (tB.output, tC.input) + ) + + assert_equal(tA.getDepth(), 1) + assert_equal(tB.getDepth(), 2) + assert_equal(tC.getDepth(), 3) + + +if __name__ == '__main__': + test_depth()