projectal.entities.task

  1import datetime
  2import copy
  3import sys
  4import projectal
  5from projectal.entity import Entity
  6from projectal.enums import DateLimit
  7from projectal.linkers import *
  8
  9
 10class Task(
 11    Entity,
 12    ResourceLinker,
 13    SkillLinker,
 14    FileLinker,
 15    StageLinker,
 16    StaffLinker,
 17    RebateLinker,
 18    NoteLinker,
 19    TagLinker,
 20    PredecessorTaskLinker,
 21):
 22    """
 23    Implementation of the [Task](https://projectal.com/docs/latest/#tag/Task) API.
 24    """
 25
 26    _path = "task"
 27    _name = "task"
 28    _links = [
 29        ResourceLinker,
 30        SkillLinker,
 31        FileLinker,
 32        StageLinker,
 33        StaffLinker,
 34        RebateLinker,
 35        NoteLinker,
 36        TagLinker,
 37    ]
 38    _links_reverse = [
 39        PredecessorTaskLinker,
 40    ]
 41
 42    def _add_link_def(self, cls, reverse=False):
 43        """
 44        Each entity is accompanied by a dict with details about how to
 45        get access to the data of the link within the object. Subclasses
 46        can pass in customizations to this dict when their APIs differ.
 47
 48        reverse denotes a reverse linker, where extra work is done to
 49        reverse the relationship of the link internally so that it works.
 50        The backend only offers one side of the relationship.
 51        """
 52        d = {
 53            "name": cls._link_name,
 54            "link_key": cls._link_key or cls._link_name + "List",
 55            "data_name": cls._link_data_name,
 56            "type": cls._link_type,
 57            "entity": cls._link_entity or cls._link_name.capitalize(),
 58            "reverse": reverse,
 59        }
 60        self._link_def_by_key[d["link_key"]] = d
 61        self._link_def_by_name[d["name"]] = d
 62        if cls._link_name == "predecessor_task":
 63            d_after_reverse = copy.deepcopy(d)
 64            d_after_reverse["reverse"] = False
 65            self._link_def_by_name["task"] = d_after_reverse
 66            # We need this to be present in the link def so that
 67            # returned predecessor tasks can be typed as Tasks
 68            d_for_pred_link_typing = copy.deepcopy(d)
 69            d_for_pred_link_typing["link_key"] = "planList"
 70            self._link_def_by_key[
 71                d_for_pred_link_typing["link_key"]
 72            ] = d_for_pred_link_typing
 73
 74    @classmethod
 75    def create(
 76        cls,
 77        holder,
 78        entities,
 79        batch_linking=True,
 80        disable_system_features=True,
 81        enable_system_features_on_exit=True,
 82    ):
 83        """Create a Task
 84
 85        `holder`: An instance or the `uuId` of the owner
 86
 87        `entities`: `dict` containing the fields of the entity to be created,
 88        or a list of such `dict`s to create in bulk.
 89        """
 90        holder_id = holder["uuId"] if isinstance(holder, dict) else holder
 91        params = "?holder=" + holder_id
 92        out = super().create(
 93            entities,
 94            params,
 95            batch_linking,
 96            disable_system_features,
 97            enable_system_features_on_exit,
 98        )
 99
100        # Tasks should always refer to their parent and project. We don't get this information
101        # from the creation api method, but we can insert them ourselves because we know what
102        # they are.
103        def add_fields(obj):
104            obj.set_readonly("projectRef", holder_id)
105            obj.set_readonly("parent", obj.get("parent", holder_id))
106
107        if isinstance(out, dict):
108            add_fields(out)
109        if isinstance(out, list):
110            for obj in out:
111                add_fields(obj)
112        return out
113
114    @classmethod
115    def get(cls, entities, links=None, deleted_at=None):
116        r = super().get(entities, links, deleted_at)
117        if not links:
118            return r
119        # When Predecessor Task links are fetched,
120        # make sure the key matches the name expected
121        # by the predecessor linking REST end point
122        if PredecessorTaskLinker._link_name.casefold() in (
123            link.casefold() for link in links
124        ):
125            if isinstance(r, dict):
126                r["taskList"] = r.pop("planList", [])
127                r._Entity__old = copy.deepcopy(r)
128            else:
129                for entity in r:
130                    entity["taskList"] = entity.pop("planList", [])
131                    entity._Entity__old = copy.deepcopy(entity)
132        return r
133
134    # Override here to correctly format the URL for the Predecessor Task link case
135    def _link(
136        self, to_entity_name, to_link, operation, update_cache=True, batch_linking=True
137    ):
138        """
139        `to_entity_name`: Destination entity name (e.g. 'staff')
140
141        `to_link`: List of Entities of the same type (and optional data) to link to
142
143        `operation`: `add`, `update`, `delete`
144
145        'update_cache': also modify the entity's internal representation of the links
146        to match the operation that was done. Set this to False when replacing the
147        list with a new one (i.e., when calling save() instead of a linker method).
148
149        'batch_linking': Enabled by default, batches any link
150        updates required into composite API requests. If disabled
151        a request will be executed for each link update.
152        Recommended to leave enabled to increase performance.
153        """
154
155        link_def = self._link_def_by_name[to_entity_name]
156        to_key = link_def["link_key"]
157
158        if isinstance(to_link, dict) and link_def["type"] == list:
159            # Convert input dict to list when link type is a list (we allow linking to single entity for convenience)
160            to_link = [to_link]
161
162            # For cases where user passed in dict instead of Entity, we turn them into
163            # Entity on their behalf.
164            typed_list = []
165            target_cls = getattr(sys.modules["projectal.entities"], link_def["entity"])
166            for link in to_link:
167                if not isinstance(link, target_cls):
168                    typed_list.append(target_cls(link))
169                else:
170                    typed_list.append(link)
171            to_link = typed_list
172        else:
173            # For everything else, we expect types to match.
174            if not isinstance(to_link, link_def["type"]):
175                raise api.UsageException(
176                    "Expected link type to be {}. Got {}.".format(
177                        link_def["type"], type(to_link)
178                    )
179                )
180
181        if not to_link:
182            return
183
184        url = ""
185        payload = {}
186        request_list = []
187        # Is it a reverse linker? If so, invert the relationship
188        if link_def["reverse"]:
189            for link in to_link:
190                # Sets the data attribute on the correct
191                # link entity
192                if link_def["name"] == "predecessor_task":
193                    data_name = link_def.get("data_name")
194                    self[data_name] = copy.deepcopy(link[data_name])
195                request_list.extend(
196                    link._link(
197                        self._name,
198                        self,
199                        operation,
200                        update_cache,
201                        batch_linking=batch_linking,
202                    )
203                )
204        else:
205            # Only keep UUID and the data attribute, if it has one
206            def strip_payload(link):
207                single = {"uuId": link["uuId"]}
208                data_name = link_def.get("data_name")
209                if data_name and data_name in link:
210                    single[data_name] = copy.deepcopy(link[data_name])
211                    # limiting data attribute removal to only planLink
212                    # in case of side effects
213                    if data_name == "planLink":
214                        del link[data_name]
215                return single
216
217            # If batch linking is enabled and the entity to link is a list of entities,
218            # a separate request must be constructed for each one because the final composite
219            # request permits only one input per call
220            if to_entity_name == "predecessor_task" or to_entity_name == "task":
221                url = "/api/{}/plan/TASK/{}".format(self._path, operation)
222            else:
223                url = "/api/{}/link/{}/{}".format(self._path, to_entity_name, operation)
224            to_link_payload = None
225            if isinstance(to_link, list):
226                to_link_payload = []
227                for link in to_link:
228                    if batch_linking:
229                        request_list.append(
230                            {
231                                "method": "POST",
232                                "invoke": url,
233                                "body": {
234                                    "uuId": self["uuId"],
235                                    to_key: [strip_payload(link)],
236                                },
237                            }
238                        )
239                    else:
240                        to_link_payload.append(strip_payload(link))
241            if isinstance(to_link, dict):
242                if batch_linking:
243                    request_list.append(
244                        {
245                            "method": "POST",
246                            "invoke": url,
247                            "body": {
248                                "uuId": self["uuId"],
249                                to_key: strip_payload(to_link),
250                            },
251                        }
252                    )
253                else:
254                    to_link_payload = strip_payload(to_link)
255
256            if not batch_linking:
257                payload = {"uuId": self["uuId"], to_key: to_link_payload}
258                api.post(url, payload=payload)
259
260        if not update_cache:
261            return request_list
262
263        # Set the initial state if first add. We need the type to be set to correctly update the cache
264        if operation == "add" and self.get(to_key, None) is None:
265            if link_def.get("type") == dict:
266                self[to_key] = {}
267            elif link_def.get("type") == list:
268                self[to_key] = []
269
270        # Modify the entity object's cache of links to match the changes we pushed to the server.
271        if isinstance(self.get(to_key, []), list):
272            if operation == "add":
273                # Sometimes the backend doesn't return a list when it has none. Create it.
274                if to_key not in self:
275                    self[to_key] = []
276
277                for to_entity in to_link:
278                    self[to_key].append(to_entity)
279            else:
280                for to_entity in to_link:
281                    # Find it in original list
282                    for i, old in enumerate(self.get(to_key, [])):
283                        if old["uuId"] == to_entity["uuId"]:
284                            if operation == "update":
285                                self[to_key][i] = to_entity
286                            elif operation == "delete":
287                                del self[to_key][i]
288        if isinstance(self.get(to_key, None), dict):
289            if operation in ["add", "update"]:
290                self[to_key] = to_link
291            elif operation == "delete":
292                self[to_key] = None
293
294        # Update the "old" record of the link on the entity to avoid
295        # flagging it for changes (link lists are not meant to be user editable).
296        if to_key in self:
297            self._Entity__old[to_key] = self[to_key]
298
299        return request_list
300
301    def update_order(self, order_at_uuId, order_as=True):
302        url = "/api/task/update?order-at={}&order-as={}".format(
303            order_at_uuId, "true" if order_as else "false"
304        )
305        return api.put(url, [{"uuId": self["uuId"]}])
306
307    def link_predecessor_task(self, predecessor_task):
308        return self.__plan(self, predecessor_task, "add")
309
310    def relink_predecessor_task(self, predecessor_task):
311        return self.__plan(self, predecessor_task, "update")
312
313    def unlink_predecessor_task(self, predecessor_task):
314        return self.__plan(self, predecessor_task, "delete")
315
316    @classmethod
317    def __plan(cls, from_task, to_task, operation):
318        url = "/api/task/plan/task/{}".format(operation)
319        # Invert the link relationship to match the linker
320        if isinstance(to_task, dict):
321            from_task_copy = copy.deepcopy(from_task)
322            from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy(
323                to_task[PredecessorTaskLinker._link_data_name]
324            )
325            payload = {"uuId": to_task["uuId"], "taskList": [from_task_copy]}
326            api.post(url, payload=payload)
327        elif isinstance(to_task, list):
328            for task in to_task:
329                from_task_copy = copy.deepcopy(from_task)
330                from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy(
331                    task[PredecessorTaskLinker._link_data_name]
332                )
333                payload = {"uuId": task["uuId"], "taskList": [from_task_copy]}
334                api.post(url, payload=payload)
335        return True
336
337    def parents(self):
338        """
339        Return an ordered list of [name, uuId] pairs of this task's parents, up to
340        (but not including) the root of the project.
341        """
342        payload = {
343            "name": "Task Parents",
344            "type": "msql",
345            "start": 0,
346            "limit": -1,
347            "holder": "{}".format(self["uuId"]),
348            "select": [
349                ["TASK(one).PARENT_ALL_TASK.name"],
350                ["TASK(one).PARENT_ALL_TASK.uuId"],
351            ],
352        }
353        list = api.query(payload)
354        # Results come back in reverse order. Flip them around
355        list.reverse()
356        return list
357
358    def project_uuId(self):
359        """Return the `uuId` of the Project that holds this Task."""
360        payload = {
361            "name": "Project that holds this task",
362            "type": "msql",
363            "start": 0,
364            "limit": 1,
365            "holder": "{}".format(self["uuId"]),
366            "select": [["TASK.PROJECT.uuId"]],
367        }
368        projects = api.query(payload)
369        for t in projects:
370            return t[0]
371        return None
372
373    @classmethod
374    def add_task_template(cls, project, template):
375        """Insert TaskTemplate `template` into Project `project`"""
376        url = "/api/task/task_template/add?override=false&group=false"
377        payload = {"uuId": project["uuId"], "templateList": [template]}
378        api.post(url, payload)
379
380    def reset_duration(self, calendars=None):
381        """Set this task's duration based on its start and end dates while
382        taking into account the calendar for weekends and scheduled time off.
383
384        calendars is expected to be the list of calendar objects for the
385        location of the project that holds this task. You may provide this
386        list yourself for efficiency (recommended) - if not provided, it
387        will be fetched for you by issuing requests to the server.
388        """
389        if not calendars:
390            if "projectRef" not in self:
391                task = projectal.Task.get(self)
392                project_ref = task["projectRef"]
393            else:
394                project_ref = self["projectRef"]
395            project = projectal.Project.get(project_ref, links=["LOCATION"])
396            for location in project.get("locationList", []):
397                calendars = location.calendar()
398                break
399
400        start = self.get("startTime")
401        end = self.get("closeTime")
402        if not start or start == DateLimit.Min:
403            return 0
404        if not end or end == DateLimit.Max:
405            return 0
406
407        # Build a list of weekday names that are non-working
408        base_non_working = set()
409        location_non_working = {}
410        location_working = set()
411        for calendar in calendars:
412            if calendar["name"] == "base_calendar":
413                for item in calendar["calendarList"]:
414                    if not item["isWorking"]:
415                        base_non_working.add(item["type"])
416
417            if calendar["name"] == "location":
418                for item in calendar["calendarList"]:
419                    start_date = datetime.date.fromisoformat(item["startDate"])
420                    end_date = datetime.date.fromisoformat(item["endDate"])
421                    if not item["isWorking"]:
422                        delta = start_date - end_date
423                        location_non_working[item["startDate"]] = delta.days + 1
424                    else:
425                        location_working = {
426                            (start_date + datetime.timedelta(days=x)).strftime(
427                                "%Y-%m-%d"
428                            )
429                            for x in range((end_date - start_date).days + 1)
430                        }
431
432        start = datetime.datetime.fromtimestamp(start / 1000)
433        end = datetime.datetime.fromtimestamp(end / 1000)
434        minutes = 0
435        current = start
436        while current <= end:
437            if (
438                current.strftime("%A") in base_non_working
439                and current.strftime("%Y-%m-%d") not in location_working
440            ):
441                current += datetime.timedelta(days=1)
442                continue
443            if current.strftime("%Y-%m-%d") in location_non_working:
444                days = location_non_working[current.strftime("%Y-%m-%d")]
445                current += datetime.timedelta(days=days)
446                continue
447            minutes += 8 * 60
448            current += datetime.timedelta(days=1)
449
450        self["duration"] = minutes
 11class Task(
 12    Entity,
 13    ResourceLinker,
 14    SkillLinker,
 15    FileLinker,
 16    StageLinker,
 17    StaffLinker,
 18    RebateLinker,
 19    NoteLinker,
 20    TagLinker,
 21    PredecessorTaskLinker,
 22):
 23    """
 24    Implementation of the [Task](https://projectal.com/docs/latest/#tag/Task) API.
 25    """
 26
 27    _path = "task"
 28    _name = "task"
 29    _links = [
 30        ResourceLinker,
 31        SkillLinker,
 32        FileLinker,
 33        StageLinker,
 34        StaffLinker,
 35        RebateLinker,
 36        NoteLinker,
 37        TagLinker,
 38    ]
 39    _links_reverse = [
 40        PredecessorTaskLinker,
 41    ]
 42
 43    def _add_link_def(self, cls, reverse=False):
 44        """
 45        Each entity is accompanied by a dict with details about how to
 46        get access to the data of the link within the object. Subclasses
 47        can pass in customizations to this dict when their APIs differ.
 48
 49        reverse denotes a reverse linker, where extra work is done to
 50        reverse the relationship of the link internally so that it works.
 51        The backend only offers one side of the relationship.
 52        """
 53        d = {
 54            "name": cls._link_name,
 55            "link_key": cls._link_key or cls._link_name + "List",
 56            "data_name": cls._link_data_name,
 57            "type": cls._link_type,
 58            "entity": cls._link_entity or cls._link_name.capitalize(),
 59            "reverse": reverse,
 60        }
 61        self._link_def_by_key[d["link_key"]] = d
 62        self._link_def_by_name[d["name"]] = d
 63        if cls._link_name == "predecessor_task":
 64            d_after_reverse = copy.deepcopy(d)
 65            d_after_reverse["reverse"] = False
 66            self._link_def_by_name["task"] = d_after_reverse
 67            # We need this to be present in the link def so that
 68            # returned predecessor tasks can be typed as Tasks
 69            d_for_pred_link_typing = copy.deepcopy(d)
 70            d_for_pred_link_typing["link_key"] = "planList"
 71            self._link_def_by_key[
 72                d_for_pred_link_typing["link_key"]
 73            ] = d_for_pred_link_typing
 74
 75    @classmethod
 76    def create(
 77        cls,
 78        holder,
 79        entities,
 80        batch_linking=True,
 81        disable_system_features=True,
 82        enable_system_features_on_exit=True,
 83    ):
 84        """Create a Task
 85
 86        `holder`: An instance or the `uuId` of the owner
 87
 88        `entities`: `dict` containing the fields of the entity to be created,
 89        or a list of such `dict`s to create in bulk.
 90        """
 91        holder_id = holder["uuId"] if isinstance(holder, dict) else holder
 92        params = "?holder=" + holder_id
 93        out = super().create(
 94            entities,
 95            params,
 96            batch_linking,
 97            disable_system_features,
 98            enable_system_features_on_exit,
 99        )
100
101        # Tasks should always refer to their parent and project. We don't get this information
102        # from the creation api method, but we can insert them ourselves because we know what
103        # they are.
104        def add_fields(obj):
105            obj.set_readonly("projectRef", holder_id)
106            obj.set_readonly("parent", obj.get("parent", holder_id))
107
108        if isinstance(out, dict):
109            add_fields(out)
110        if isinstance(out, list):
111            for obj in out:
112                add_fields(obj)
113        return out
114
115    @classmethod
116    def get(cls, entities, links=None, deleted_at=None):
117        r = super().get(entities, links, deleted_at)
118        if not links:
119            return r
120        # When Predecessor Task links are fetched,
121        # make sure the key matches the name expected
122        # by the predecessor linking REST end point
123        if PredecessorTaskLinker._link_name.casefold() in (
124            link.casefold() for link in links
125        ):
126            if isinstance(r, dict):
127                r["taskList"] = r.pop("planList", [])
128                r._Entity__old = copy.deepcopy(r)
129            else:
130                for entity in r:
131                    entity["taskList"] = entity.pop("planList", [])
132                    entity._Entity__old = copy.deepcopy(entity)
133        return r
134
135    # Override here to correctly format the URL for the Predecessor Task link case
136    def _link(
137        self, to_entity_name, to_link, operation, update_cache=True, batch_linking=True
138    ):
139        """
140        `to_entity_name`: Destination entity name (e.g. 'staff')
141
142        `to_link`: List of Entities of the same type (and optional data) to link to
143
144        `operation`: `add`, `update`, `delete`
145
146        'update_cache': also modify the entity's internal representation of the links
147        to match the operation that was done. Set this to False when replacing the
148        list with a new one (i.e., when calling save() instead of a linker method).
149
150        'batch_linking': Enabled by default, batches any link
151        updates required into composite API requests. If disabled
152        a request will be executed for each link update.
153        Recommended to leave enabled to increase performance.
154        """
155
156        link_def = self._link_def_by_name[to_entity_name]
157        to_key = link_def["link_key"]
158
159        if isinstance(to_link, dict) and link_def["type"] == list:
160            # Convert input dict to list when link type is a list (we allow linking to single entity for convenience)
161            to_link = [to_link]
162
163            # For cases where user passed in dict instead of Entity, we turn them into
164            # Entity on their behalf.
165            typed_list = []
166            target_cls = getattr(sys.modules["projectal.entities"], link_def["entity"])
167            for link in to_link:
168                if not isinstance(link, target_cls):
169                    typed_list.append(target_cls(link))
170                else:
171                    typed_list.append(link)
172            to_link = typed_list
173        else:
174            # For everything else, we expect types to match.
175            if not isinstance(to_link, link_def["type"]):
176                raise api.UsageException(
177                    "Expected link type to be {}. Got {}.".format(
178                        link_def["type"], type(to_link)
179                    )
180                )
181
182        if not to_link:
183            return
184
185        url = ""
186        payload = {}
187        request_list = []
188        # Is it a reverse linker? If so, invert the relationship
189        if link_def["reverse"]:
190            for link in to_link:
191                # Sets the data attribute on the correct
192                # link entity
193                if link_def["name"] == "predecessor_task":
194                    data_name = link_def.get("data_name")
195                    self[data_name] = copy.deepcopy(link[data_name])
196                request_list.extend(
197                    link._link(
198                        self._name,
199                        self,
200                        operation,
201                        update_cache,
202                        batch_linking=batch_linking,
203                    )
204                )
205        else:
206            # Only keep UUID and the data attribute, if it has one
207            def strip_payload(link):
208                single = {"uuId": link["uuId"]}
209                data_name = link_def.get("data_name")
210                if data_name and data_name in link:
211                    single[data_name] = copy.deepcopy(link[data_name])
212                    # limiting data attribute removal to only planLink
213                    # in case of side effects
214                    if data_name == "planLink":
215                        del link[data_name]
216                return single
217
218            # If batch linking is enabled and the entity to link is a list of entities,
219            # a separate request must be constructed for each one because the final composite
220            # request permits only one input per call
221            if to_entity_name == "predecessor_task" or to_entity_name == "task":
222                url = "/api/{}/plan/TASK/{}".format(self._path, operation)
223            else:
224                url = "/api/{}/link/{}/{}".format(self._path, to_entity_name, operation)
225            to_link_payload = None
226            if isinstance(to_link, list):
227                to_link_payload = []
228                for link in to_link:
229                    if batch_linking:
230                        request_list.append(
231                            {
232                                "method": "POST",
233                                "invoke": url,
234                                "body": {
235                                    "uuId": self["uuId"],
236                                    to_key: [strip_payload(link)],
237                                },
238                            }
239                        )
240                    else:
241                        to_link_payload.append(strip_payload(link))
242            if isinstance(to_link, dict):
243                if batch_linking:
244                    request_list.append(
245                        {
246                            "method": "POST",
247                            "invoke": url,
248                            "body": {
249                                "uuId": self["uuId"],
250                                to_key: strip_payload(to_link),
251                            },
252                        }
253                    )
254                else:
255                    to_link_payload = strip_payload(to_link)
256
257            if not batch_linking:
258                payload = {"uuId": self["uuId"], to_key: to_link_payload}
259                api.post(url, payload=payload)
260
261        if not update_cache:
262            return request_list
263
264        # Set the initial state if first add. We need the type to be set to correctly update the cache
265        if operation == "add" and self.get(to_key, None) is None:
266            if link_def.get("type") == dict:
267                self[to_key] = {}
268            elif link_def.get("type") == list:
269                self[to_key] = []
270
271        # Modify the entity object's cache of links to match the changes we pushed to the server.
272        if isinstance(self.get(to_key, []), list):
273            if operation == "add":
274                # Sometimes the backend doesn't return a list when it has none. Create it.
275                if to_key not in self:
276                    self[to_key] = []
277
278                for to_entity in to_link:
279                    self[to_key].append(to_entity)
280            else:
281                for to_entity in to_link:
282                    # Find it in original list
283                    for i, old in enumerate(self.get(to_key, [])):
284                        if old["uuId"] == to_entity["uuId"]:
285                            if operation == "update":
286                                self[to_key][i] = to_entity
287                            elif operation == "delete":
288                                del self[to_key][i]
289        if isinstance(self.get(to_key, None), dict):
290            if operation in ["add", "update"]:
291                self[to_key] = to_link
292            elif operation == "delete":
293                self[to_key] = None
294
295        # Update the "old" record of the link on the entity to avoid
296        # flagging it for changes (link lists are not meant to be user editable).
297        if to_key in self:
298            self._Entity__old[to_key] = self[to_key]
299
300        return request_list
301
302    def update_order(self, order_at_uuId, order_as=True):
303        url = "/api/task/update?order-at={}&order-as={}".format(
304            order_at_uuId, "true" if order_as else "false"
305        )
306        return api.put(url, [{"uuId": self["uuId"]}])
307
308    def link_predecessor_task(self, predecessor_task):
309        return self.__plan(self, predecessor_task, "add")
310
311    def relink_predecessor_task(self, predecessor_task):
312        return self.__plan(self, predecessor_task, "update")
313
314    def unlink_predecessor_task(self, predecessor_task):
315        return self.__plan(self, predecessor_task, "delete")
316
317    @classmethod
318    def __plan(cls, from_task, to_task, operation):
319        url = "/api/task/plan/task/{}".format(operation)
320        # Invert the link relationship to match the linker
321        if isinstance(to_task, dict):
322            from_task_copy = copy.deepcopy(from_task)
323            from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy(
324                to_task[PredecessorTaskLinker._link_data_name]
325            )
326            payload = {"uuId": to_task["uuId"], "taskList": [from_task_copy]}
327            api.post(url, payload=payload)
328        elif isinstance(to_task, list):
329            for task in to_task:
330                from_task_copy = copy.deepcopy(from_task)
331                from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy(
332                    task[PredecessorTaskLinker._link_data_name]
333                )
334                payload = {"uuId": task["uuId"], "taskList": [from_task_copy]}
335                api.post(url, payload=payload)
336        return True
337
338    def parents(self):
339        """
340        Return an ordered list of [name, uuId] pairs of this task's parents, up to
341        (but not including) the root of the project.
342        """
343        payload = {
344            "name": "Task Parents",
345            "type": "msql",
346            "start": 0,
347            "limit": -1,
348            "holder": "{}".format(self["uuId"]),
349            "select": [
350                ["TASK(one).PARENT_ALL_TASK.name"],
351                ["TASK(one).PARENT_ALL_TASK.uuId"],
352            ],
353        }
354        list = api.query(payload)
355        # Results come back in reverse order. Flip them around
356        list.reverse()
357        return list
358
359    def project_uuId(self):
360        """Return the `uuId` of the Project that holds this Task."""
361        payload = {
362            "name": "Project that holds this task",
363            "type": "msql",
364            "start": 0,
365            "limit": 1,
366            "holder": "{}".format(self["uuId"]),
367            "select": [["TASK.PROJECT.uuId"]],
368        }
369        projects = api.query(payload)
370        for t in projects:
371            return t[0]
372        return None
373
374    @classmethod
375    def add_task_template(cls, project, template):
376        """Insert TaskTemplate `template` into Project `project`"""
377        url = "/api/task/task_template/add?override=false&group=false"
378        payload = {"uuId": project["uuId"], "templateList": [template]}
379        api.post(url, payload)
380
381    def reset_duration(self, calendars=None):
382        """Set this task's duration based on its start and end dates while
383        taking into account the calendar for weekends and scheduled time off.
384
385        calendars is expected to be the list of calendar objects for the
386        location of the project that holds this task. You may provide this
387        list yourself for efficiency (recommended) - if not provided, it
388        will be fetched for you by issuing requests to the server.
389        """
390        if not calendars:
391            if "projectRef" not in self:
392                task = projectal.Task.get(self)
393                project_ref = task["projectRef"]
394            else:
395                project_ref = self["projectRef"]
396            project = projectal.Project.get(project_ref, links=["LOCATION"])
397            for location in project.get("locationList", []):
398                calendars = location.calendar()
399                break
400
401        start = self.get("startTime")
402        end = self.get("closeTime")
403        if not start or start == DateLimit.Min:
404            return 0
405        if not end or end == DateLimit.Max:
406            return 0
407
408        # Build a list of weekday names that are non-working
409        base_non_working = set()
410        location_non_working = {}
411        location_working = set()
412        for calendar in calendars:
413            if calendar["name"] == "base_calendar":
414                for item in calendar["calendarList"]:
415                    if not item["isWorking"]:
416                        base_non_working.add(item["type"])
417
418            if calendar["name"] == "location":
419                for item in calendar["calendarList"]:
420                    start_date = datetime.date.fromisoformat(item["startDate"])
421                    end_date = datetime.date.fromisoformat(item["endDate"])
422                    if not item["isWorking"]:
423                        delta = start_date - end_date
424                        location_non_working[item["startDate"]] = delta.days + 1
425                    else:
426                        location_working = {
427                            (start_date + datetime.timedelta(days=x)).strftime(
428                                "%Y-%m-%d"
429                            )
430                            for x in range((end_date - start_date).days + 1)
431                        }
432
433        start = datetime.datetime.fromtimestamp(start / 1000)
434        end = datetime.datetime.fromtimestamp(end / 1000)
435        minutes = 0
436        current = start
437        while current <= end:
438            if (
439                current.strftime("%A") in base_non_working
440                and current.strftime("%Y-%m-%d") not in location_working
441            ):
442                current += datetime.timedelta(days=1)
443                continue
444            if current.strftime("%Y-%m-%d") in location_non_working:
445                days = location_non_working[current.strftime("%Y-%m-%d")]
446                current += datetime.timedelta(days=days)
447                continue
448            minutes += 8 * 60
449            current += datetime.timedelta(days=1)
450
451        self["duration"] = minutes

Implementation of the Task API.

@classmethod
def create( cls, holder, entities, batch_linking=True, disable_system_features=True, enable_system_features_on_exit=True):
 75    @classmethod
 76    def create(
 77        cls,
 78        holder,
 79        entities,
 80        batch_linking=True,
 81        disable_system_features=True,
 82        enable_system_features_on_exit=True,
 83    ):
 84        """Create a Task
 85
 86        `holder`: An instance or the `uuId` of the owner
 87
 88        `entities`: `dict` containing the fields of the entity to be created,
 89        or a list of such `dict`s to create in bulk.
 90        """
 91        holder_id = holder["uuId"] if isinstance(holder, dict) else holder
 92        params = "?holder=" + holder_id
 93        out = super().create(
 94            entities,
 95            params,
 96            batch_linking,
 97            disable_system_features,
 98            enable_system_features_on_exit,
 99        )
100
101        # Tasks should always refer to their parent and project. We don't get this information
102        # from the creation api method, but we can insert them ourselves because we know what
103        # they are.
104        def add_fields(obj):
105            obj.set_readonly("projectRef", holder_id)
106            obj.set_readonly("parent", obj.get("parent", holder_id))
107
108        if isinstance(out, dict):
109            add_fields(out)
110        if isinstance(out, list):
111            for obj in out:
112                add_fields(obj)
113        return out

Create a Task

holder: An instance or the uuId of the owner

entities: dict containing the fields of the entity to be created, or a list of such dicts to create in bulk.

@classmethod
def get(cls, entities, links=None, deleted_at=None):
115    @classmethod
116    def get(cls, entities, links=None, deleted_at=None):
117        r = super().get(entities, links, deleted_at)
118        if not links:
119            return r
120        # When Predecessor Task links are fetched,
121        # make sure the key matches the name expected
122        # by the predecessor linking REST end point
123        if PredecessorTaskLinker._link_name.casefold() in (
124            link.casefold() for link in links
125        ):
126            if isinstance(r, dict):
127                r["taskList"] = r.pop("planList", [])
128                r._Entity__old = copy.deepcopy(r)
129            else:
130                for entity in r:
131                    entity["taskList"] = entity.pop("planList", [])
132                    entity._Entity__old = copy.deepcopy(entity)
133        return r

Get one or more entities of the same type. The entity type is determined by the subclass calling this method.

entities: One of several formats containing the uuIds of the entities you want to get (see bottom for examples):

  • str or list of str
  • dict or list of dict (with uuId key)

links: A case-insensitive list of entity names to fetch with this entity. For performance reasons, links are only returned on demand.

Links follow a common naming convention in the output with a _List suffix. E.g.: links=['company', 'location'] will appear as companyList and locationList in the response.

# Example usage:
# str
projectal.Project.get('1b21e445-f29a-4a9f-95ff-fe253a3e1b11')

# list of str
ids = ['1b21e445-f29a...', '1b21e445-f29a...', '1b21e445-f29a...']
projectal.Project.get(ids)

# dict
project = project.Project.create({'name': 'MyProject'})
# project = {'uuId': '1b21e445-f29a...', 'name': 'MyProject', ...}
projectal.Project.get(project)

# list of dicts (e.g. from a query)
# projects = [{'uuId': '1b21e445-f29a...'}, {'uuId': '1b21e445-f29a...'}, ...]
project.Project.get(projects)

# str with links
projectal.Project.get('1b21e445-f29a...', 'links=['company', 'location']')

deleted_at: Include this parameter to get a deleted entity. This value should be a UTC timestamp from a webhook delete event.

def update_order(self, order_at_uuId, order_as=True):
302    def update_order(self, order_at_uuId, order_as=True):
303        url = "/api/task/update?order-at={}&order-as={}".format(
304            order_at_uuId, "true" if order_as else "false"
305        )
306        return api.put(url, [{"uuId": self["uuId"]}])
def parents(self):
338    def parents(self):
339        """
340        Return an ordered list of [name, uuId] pairs of this task's parents, up to
341        (but not including) the root of the project.
342        """
343        payload = {
344            "name": "Task Parents",
345            "type": "msql",
346            "start": 0,
347            "limit": -1,
348            "holder": "{}".format(self["uuId"]),
349            "select": [
350                ["TASK(one).PARENT_ALL_TASK.name"],
351                ["TASK(one).PARENT_ALL_TASK.uuId"],
352            ],
353        }
354        list = api.query(payload)
355        # Results come back in reverse order. Flip them around
356        list.reverse()
357        return list

Return an ordered list of [name, uuId] pairs of this task's parents, up to (but not including) the root of the project.

def project_uuId(self):
359    def project_uuId(self):
360        """Return the `uuId` of the Project that holds this Task."""
361        payload = {
362            "name": "Project that holds this task",
363            "type": "msql",
364            "start": 0,
365            "limit": 1,
366            "holder": "{}".format(self["uuId"]),
367            "select": [["TASK.PROJECT.uuId"]],
368        }
369        projects = api.query(payload)
370        for t in projects:
371            return t[0]
372        return None

Return the uuId of the Project that holds this Task.

@classmethod
def add_task_template(cls, project, template):
374    @classmethod
375    def add_task_template(cls, project, template):
376        """Insert TaskTemplate `template` into Project `project`"""
377        url = "/api/task/task_template/add?override=false&group=false"
378        payload = {"uuId": project["uuId"], "templateList": [template]}
379        api.post(url, payload)

Insert TaskTemplate template into Project project

def reset_duration(self, calendars=None):
381    def reset_duration(self, calendars=None):
382        """Set this task's duration based on its start and end dates while
383        taking into account the calendar for weekends and scheduled time off.
384
385        calendars is expected to be the list of calendar objects for the
386        location of the project that holds this task. You may provide this
387        list yourself for efficiency (recommended) - if not provided, it
388        will be fetched for you by issuing requests to the server.
389        """
390        if not calendars:
391            if "projectRef" not in self:
392                task = projectal.Task.get(self)
393                project_ref = task["projectRef"]
394            else:
395                project_ref = self["projectRef"]
396            project = projectal.Project.get(project_ref, links=["LOCATION"])
397            for location in project.get("locationList", []):
398                calendars = location.calendar()
399                break
400
401        start = self.get("startTime")
402        end = self.get("closeTime")
403        if not start or start == DateLimit.Min:
404            return 0
405        if not end or end == DateLimit.Max:
406            return 0
407
408        # Build a list of weekday names that are non-working
409        base_non_working = set()
410        location_non_working = {}
411        location_working = set()
412        for calendar in calendars:
413            if calendar["name"] == "base_calendar":
414                for item in calendar["calendarList"]:
415                    if not item["isWorking"]:
416                        base_non_working.add(item["type"])
417
418            if calendar["name"] == "location":
419                for item in calendar["calendarList"]:
420                    start_date = datetime.date.fromisoformat(item["startDate"])
421                    end_date = datetime.date.fromisoformat(item["endDate"])
422                    if not item["isWorking"]:
423                        delta = start_date - end_date
424                        location_non_working[item["startDate"]] = delta.days + 1
425                    else:
426                        location_working = {
427                            (start_date + datetime.timedelta(days=x)).strftime(
428                                "%Y-%m-%d"
429                            )
430                            for x in range((end_date - start_date).days + 1)
431                        }
432
433        start = datetime.datetime.fromtimestamp(start / 1000)
434        end = datetime.datetime.fromtimestamp(end / 1000)
435        minutes = 0
436        current = start
437        while current <= end:
438            if (
439                current.strftime("%A") in base_non_working
440                and current.strftime("%Y-%m-%d") not in location_working
441            ):
442                current += datetime.timedelta(days=1)
443                continue
444            if current.strftime("%Y-%m-%d") in location_non_working:
445                days = location_non_working[current.strftime("%Y-%m-%d")]
446                current += datetime.timedelta(days=days)
447                continue
448            minutes += 8 * 60
449            current += datetime.timedelta(days=1)
450
451        self["duration"] = minutes

Set this task's duration based on its start and end dates while taking into account the calendar for weekends and scheduled time off.

calendars is expected to be the list of calendar objects for the location of the project that holds this task. You may provide this list yourself for efficiency (recommended) - if not provided, it will be fetched for you by issuing requests to the server.