projectal.entities.task
1import datetime 2import copy 3import sys 4import projectal 5from projectal.entity import Entity 6from projectal.enums import DateLimit 7from projectal.linkers import * 8 9 10class Task( 11 Entity, 12 ResourceLinker, 13 SkillLinker, 14 FileLinker, 15 StageLinker, 16 StaffLinker, 17 RebateLinker, 18 NoteLinker, 19 TagLinker, 20 PredecessorTaskLinker, 21): 22 """ 23 Implementation of the [Task](https://projectal.com/docs/latest/#tag/Task) API. 24 """ 25 26 _path = "task" 27 _name = "task" 28 _links = [ 29 ResourceLinker, 30 SkillLinker, 31 FileLinker, 32 StageLinker, 33 StaffLinker, 34 RebateLinker, 35 NoteLinker, 36 TagLinker, 37 ] 38 _links_reverse = [ 39 PredecessorTaskLinker, 40 ] 41 42 def _add_link_def(self, cls, reverse=False): 43 """ 44 Each entity is accompanied by a dict with details about how to 45 get access to the data of the link within the object. Subclasses 46 can pass in customizations to this dict when their APIs differ. 47 48 reverse denotes a reverse linker, where extra work is done to 49 reverse the relationship of the link internally so that it works. 50 The backend only offers one side of the relationship. 51 """ 52 d = { 53 "name": cls._link_name, 54 "link_key": cls._link_key or cls._link_name + "List", 55 "data_name": cls._link_data_name, 56 "type": cls._link_type, 57 "entity": cls._link_entity or cls._link_name.capitalize(), 58 "reverse": reverse, 59 } 60 self._link_def_by_key[d["link_key"]] = d 61 self._link_def_by_name[d["name"]] = d 62 if cls._link_name == "predecessor_task": 63 d_after_reverse = copy.deepcopy(d) 64 d_after_reverse["reverse"] = False 65 self._link_def_by_name["task"] = d_after_reverse 66 # We need this to be present in the link def so that 67 # returned predecessor tasks can be typed as Tasks 68 d_for_pred_link_typing = copy.deepcopy(d) 69 d_for_pred_link_typing["link_key"] = "planList" 70 self._link_def_by_key[ 71 d_for_pred_link_typing["link_key"] 72 ] = d_for_pred_link_typing 73 74 @classmethod 75 def create( 76 cls, 77 holder, 78 entities, 79 batch_linking=True, 80 disable_system_features=True, 81 enable_system_features_on_exit=True, 82 ): 83 """Create a Task 84 85 `holder`: An instance or the `uuId` of the owner 86 87 `entities`: `dict` containing the fields of the entity to be created, 88 or a list of such `dict`s to create in bulk. 89 """ 90 holder_id = holder["uuId"] if isinstance(holder, dict) else holder 91 params = "?holder=" + holder_id 92 out = super().create( 93 entities, 94 params, 95 batch_linking, 96 disable_system_features, 97 enable_system_features_on_exit, 98 ) 99 100 # Tasks should always refer to their parent and project. We don't get this information 101 # from the creation api method, but we can insert them ourselves because we know what 102 # they are. 103 def add_fields(obj): 104 obj.set_readonly("projectRef", holder_id) 105 obj.set_readonly("parent", obj.get("parent", holder_id)) 106 107 if isinstance(out, dict): 108 add_fields(out) 109 if isinstance(out, list): 110 for obj in out: 111 add_fields(obj) 112 return out 113 114 @classmethod 115 def get(cls, entities, links=None, deleted_at=None): 116 r = super().get(entities, links, deleted_at) 117 if not links: 118 return r 119 # When Predecessor Task links are fetched, 120 # make sure the key matches the name expected 121 # by the predecessor linking REST end point 122 if PredecessorTaskLinker._link_name.casefold() in ( 123 link.casefold() for link in links 124 ): 125 if isinstance(r, dict): 126 r["taskList"] = r.pop("planList", []) 127 r._Entity__old = copy.deepcopy(r) 128 else: 129 for entity in r: 130 entity["taskList"] = entity.pop("planList", []) 131 entity._Entity__old = copy.deepcopy(entity) 132 return r 133 134 # Override here to correctly format the URL for the Predecessor Task link case 135 def _link( 136 self, to_entity_name, to_link, operation, update_cache=True, batch_linking=True 137 ): 138 """ 139 `to_entity_name`: Destination entity name (e.g. 'staff') 140 141 `to_link`: List of Entities of the same type (and optional data) to link to 142 143 `operation`: `add`, `update`, `delete` 144 145 'update_cache': also modify the entity's internal representation of the links 146 to match the operation that was done. Set this to False when replacing the 147 list with a new one (i.e., when calling save() instead of a linker method). 148 149 'batch_linking': Enabled by default, batches any link 150 updates required into composite API requests. If disabled 151 a request will be executed for each link update. 152 Recommended to leave enabled to increase performance. 153 """ 154 155 link_def = self._link_def_by_name[to_entity_name] 156 to_key = link_def["link_key"] 157 158 if isinstance(to_link, dict) and link_def["type"] == list: 159 # Convert input dict to list when link type is a list (we allow linking to single entity for convenience) 160 to_link = [to_link] 161 162 # For cases where user passed in dict instead of Entity, we turn them into 163 # Entity on their behalf. 164 typed_list = [] 165 target_cls = getattr(sys.modules["projectal.entities"], link_def["entity"]) 166 for link in to_link: 167 if not isinstance(link, target_cls): 168 typed_list.append(target_cls(link)) 169 else: 170 typed_list.append(link) 171 to_link = typed_list 172 else: 173 # For everything else, we expect types to match. 174 if not isinstance(to_link, link_def["type"]): 175 raise api.UsageException( 176 "Expected link type to be {}. Got {}.".format( 177 link_def["type"], type(to_link) 178 ) 179 ) 180 181 if not to_link: 182 return 183 184 url = "" 185 payload = {} 186 request_list = [] 187 # Is it a reverse linker? If so, invert the relationship 188 if link_def["reverse"]: 189 for link in to_link: 190 # Sets the data attribute on the correct 191 # link entity 192 if link_def["name"] == "predecessor_task": 193 data_name = link_def.get("data_name") 194 self[data_name] = copy.deepcopy(link[data_name]) 195 request_list.extend( 196 link._link( 197 self._name, 198 self, 199 operation, 200 update_cache, 201 batch_linking=batch_linking, 202 ) 203 ) 204 else: 205 # Only keep UUID and the data attribute, if it has one 206 def strip_payload(link): 207 single = {"uuId": link["uuId"]} 208 data_name = link_def.get("data_name") 209 if data_name and data_name in link: 210 single[data_name] = copy.deepcopy(link[data_name]) 211 # limiting data attribute removal to only planLink 212 # in case of side effects 213 if data_name == "planLink": 214 del link[data_name] 215 return single 216 217 # If batch linking is enabled and the entity to link is a list of entities, 218 # a separate request must be constructed for each one because the final composite 219 # request permits only one input per call 220 if to_entity_name == "predecessor_task" or to_entity_name == "task": 221 url = "/api/{}/plan/TASK/{}".format(self._path, operation) 222 else: 223 url = "/api/{}/link/{}/{}".format(self._path, to_entity_name, operation) 224 to_link_payload = None 225 if isinstance(to_link, list): 226 to_link_payload = [] 227 for link in to_link: 228 if batch_linking: 229 request_list.append( 230 { 231 "method": "POST", 232 "invoke": url, 233 "body": { 234 "uuId": self["uuId"], 235 to_key: [strip_payload(link)], 236 }, 237 } 238 ) 239 else: 240 to_link_payload.append(strip_payload(link)) 241 if isinstance(to_link, dict): 242 if batch_linking: 243 request_list.append( 244 { 245 "method": "POST", 246 "invoke": url, 247 "body": { 248 "uuId": self["uuId"], 249 to_key: strip_payload(to_link), 250 }, 251 } 252 ) 253 else: 254 to_link_payload = strip_payload(to_link) 255 256 if not batch_linking: 257 payload = {"uuId": self["uuId"], to_key: to_link_payload} 258 api.post(url, payload=payload) 259 260 if not update_cache: 261 return request_list 262 263 # Set the initial state if first add. We need the type to be set to correctly update the cache 264 if operation == "add" and self.get(to_key, None) is None: 265 if link_def.get("type") == dict: 266 self[to_key] = {} 267 elif link_def.get("type") == list: 268 self[to_key] = [] 269 270 # Modify the entity object's cache of links to match the changes we pushed to the server. 271 if isinstance(self.get(to_key, []), list): 272 if operation == "add": 273 # Sometimes the backend doesn't return a list when it has none. Create it. 274 if to_key not in self: 275 self[to_key] = [] 276 277 for to_entity in to_link: 278 self[to_key].append(to_entity) 279 else: 280 for to_entity in to_link: 281 # Find it in original list 282 for i, old in enumerate(self.get(to_key, [])): 283 if old["uuId"] == to_entity["uuId"]: 284 if operation == "update": 285 self[to_key][i] = to_entity 286 elif operation == "delete": 287 del self[to_key][i] 288 if isinstance(self.get(to_key, None), dict): 289 if operation in ["add", "update"]: 290 self[to_key] = to_link 291 elif operation == "delete": 292 self[to_key] = None 293 294 # Update the "old" record of the link on the entity to avoid 295 # flagging it for changes (link lists are not meant to be user editable). 296 if to_key in self: 297 self._Entity__old[to_key] = self[to_key] 298 299 return request_list 300 301 def update_order(self, order_at_uuId, order_as=True): 302 url = "/api/task/update?order-at={}&order-as={}".format( 303 order_at_uuId, "true" if order_as else "false" 304 ) 305 return api.put(url, [{"uuId": self["uuId"]}]) 306 307 def link_predecessor_task(self, predecessor_task): 308 return self.__plan(self, predecessor_task, "add") 309 310 def relink_predecessor_task(self, predecessor_task): 311 return self.__plan(self, predecessor_task, "update") 312 313 def unlink_predecessor_task(self, predecessor_task): 314 return self.__plan(self, predecessor_task, "delete") 315 316 @classmethod 317 def __plan(cls, from_task, to_task, operation): 318 url = "/api/task/plan/task/{}".format(operation) 319 # Invert the link relationship to match the linker 320 if isinstance(to_task, dict): 321 from_task_copy = copy.deepcopy(from_task) 322 from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy( 323 to_task[PredecessorTaskLinker._link_data_name] 324 ) 325 payload = {"uuId": to_task["uuId"], "taskList": [from_task_copy]} 326 api.post(url, payload=payload) 327 elif isinstance(to_task, list): 328 for task in to_task: 329 from_task_copy = copy.deepcopy(from_task) 330 from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy( 331 task[PredecessorTaskLinker._link_data_name] 332 ) 333 payload = {"uuId": task["uuId"], "taskList": [from_task_copy]} 334 api.post(url, payload=payload) 335 return True 336 337 def parents(self): 338 """ 339 Return an ordered list of [name, uuId] pairs of this task's parents, up to 340 (but not including) the root of the project. 341 """ 342 payload = { 343 "name": "Task Parents", 344 "type": "msql", 345 "start": 0, 346 "limit": -1, 347 "holder": "{}".format(self["uuId"]), 348 "select": [ 349 ["TASK(one).PARENT_ALL_TASK.name"], 350 ["TASK(one).PARENT_ALL_TASK.uuId"], 351 ], 352 } 353 list = api.query(payload) 354 # Results come back in reverse order. Flip them around 355 list.reverse() 356 return list 357 358 def project_uuId(self): 359 """Return the `uuId` of the Project that holds this Task.""" 360 payload = { 361 "name": "Project that holds this task", 362 "type": "msql", 363 "start": 0, 364 "limit": 1, 365 "holder": "{}".format(self["uuId"]), 366 "select": [["TASK.PROJECT.uuId"]], 367 } 368 projects = api.query(payload) 369 for t in projects: 370 return t[0] 371 return None 372 373 @classmethod 374 def add_task_template(cls, project, template): 375 """Insert TaskTemplate `template` into Project `project`""" 376 url = "/api/task/task_template/add?override=false&group=false" 377 payload = {"uuId": project["uuId"], "templateList": [template]} 378 api.post(url, payload) 379 380 def reset_duration(self, calendars=None): 381 """Set this task's duration based on its start and end dates while 382 taking into account the calendar for weekends and scheduled time off. 383 384 calendars is expected to be the list of calendar objects for the 385 location of the project that holds this task. You may provide this 386 list yourself for efficiency (recommended) - if not provided, it 387 will be fetched for you by issuing requests to the server. 388 """ 389 if not calendars: 390 if "projectRef" not in self: 391 task = projectal.Task.get(self) 392 project_ref = task["projectRef"] 393 else: 394 project_ref = self["projectRef"] 395 project = projectal.Project.get(project_ref, links=["LOCATION"]) 396 for location in project.get("locationList", []): 397 calendars = location.calendar() 398 break 399 400 start = self.get("startTime") 401 end = self.get("closeTime") 402 if not start or start == DateLimit.Min: 403 return 0 404 if not end or end == DateLimit.Max: 405 return 0 406 407 # Build a list of weekday names that are non-working 408 base_non_working = set() 409 location_non_working = {} 410 location_working = set() 411 for calendar in calendars: 412 if calendar["name"] == "base_calendar": 413 for item in calendar["calendarList"]: 414 if not item["isWorking"]: 415 base_non_working.add(item["type"]) 416 417 if calendar["name"] == "location": 418 for item in calendar["calendarList"]: 419 start_date = datetime.date.fromisoformat(item["startDate"]) 420 end_date = datetime.date.fromisoformat(item["endDate"]) 421 if not item["isWorking"]: 422 delta = start_date - end_date 423 location_non_working[item["startDate"]] = delta.days + 1 424 else: 425 location_working = { 426 (start_date + datetime.timedelta(days=x)).strftime( 427 "%Y-%m-%d" 428 ) 429 for x in range((end_date - start_date).days + 1) 430 } 431 432 start = datetime.datetime.fromtimestamp(start / 1000) 433 end = datetime.datetime.fromtimestamp(end / 1000) 434 minutes = 0 435 current = start 436 while current <= end: 437 if ( 438 current.strftime("%A") in base_non_working 439 and current.strftime("%Y-%m-%d") not in location_working 440 ): 441 current += datetime.timedelta(days=1) 442 continue 443 if current.strftime("%Y-%m-%d") in location_non_working: 444 days = location_non_working[current.strftime("%Y-%m-%d")] 445 current += datetime.timedelta(days=days) 446 continue 447 minutes += 8 * 60 448 current += datetime.timedelta(days=1) 449 450 self["duration"] = minutes
11class Task( 12 Entity, 13 ResourceLinker, 14 SkillLinker, 15 FileLinker, 16 StageLinker, 17 StaffLinker, 18 RebateLinker, 19 NoteLinker, 20 TagLinker, 21 PredecessorTaskLinker, 22): 23 """ 24 Implementation of the [Task](https://projectal.com/docs/latest/#tag/Task) API. 25 """ 26 27 _path = "task" 28 _name = "task" 29 _links = [ 30 ResourceLinker, 31 SkillLinker, 32 FileLinker, 33 StageLinker, 34 StaffLinker, 35 RebateLinker, 36 NoteLinker, 37 TagLinker, 38 ] 39 _links_reverse = [ 40 PredecessorTaskLinker, 41 ] 42 43 def _add_link_def(self, cls, reverse=False): 44 """ 45 Each entity is accompanied by a dict with details about how to 46 get access to the data of the link within the object. Subclasses 47 can pass in customizations to this dict when their APIs differ. 48 49 reverse denotes a reverse linker, where extra work is done to 50 reverse the relationship of the link internally so that it works. 51 The backend only offers one side of the relationship. 52 """ 53 d = { 54 "name": cls._link_name, 55 "link_key": cls._link_key or cls._link_name + "List", 56 "data_name": cls._link_data_name, 57 "type": cls._link_type, 58 "entity": cls._link_entity or cls._link_name.capitalize(), 59 "reverse": reverse, 60 } 61 self._link_def_by_key[d["link_key"]] = d 62 self._link_def_by_name[d["name"]] = d 63 if cls._link_name == "predecessor_task": 64 d_after_reverse = copy.deepcopy(d) 65 d_after_reverse["reverse"] = False 66 self._link_def_by_name["task"] = d_after_reverse 67 # We need this to be present in the link def so that 68 # returned predecessor tasks can be typed as Tasks 69 d_for_pred_link_typing = copy.deepcopy(d) 70 d_for_pred_link_typing["link_key"] = "planList" 71 self._link_def_by_key[ 72 d_for_pred_link_typing["link_key"] 73 ] = d_for_pred_link_typing 74 75 @classmethod 76 def create( 77 cls, 78 holder, 79 entities, 80 batch_linking=True, 81 disable_system_features=True, 82 enable_system_features_on_exit=True, 83 ): 84 """Create a Task 85 86 `holder`: An instance or the `uuId` of the owner 87 88 `entities`: `dict` containing the fields of the entity to be created, 89 or a list of such `dict`s to create in bulk. 90 """ 91 holder_id = holder["uuId"] if isinstance(holder, dict) else holder 92 params = "?holder=" + holder_id 93 out = super().create( 94 entities, 95 params, 96 batch_linking, 97 disable_system_features, 98 enable_system_features_on_exit, 99 ) 100 101 # Tasks should always refer to their parent and project. We don't get this information 102 # from the creation api method, but we can insert them ourselves because we know what 103 # they are. 104 def add_fields(obj): 105 obj.set_readonly("projectRef", holder_id) 106 obj.set_readonly("parent", obj.get("parent", holder_id)) 107 108 if isinstance(out, dict): 109 add_fields(out) 110 if isinstance(out, list): 111 for obj in out: 112 add_fields(obj) 113 return out 114 115 @classmethod 116 def get(cls, entities, links=None, deleted_at=None): 117 r = super().get(entities, links, deleted_at) 118 if not links: 119 return r 120 # When Predecessor Task links are fetched, 121 # make sure the key matches the name expected 122 # by the predecessor linking REST end point 123 if PredecessorTaskLinker._link_name.casefold() in ( 124 link.casefold() for link in links 125 ): 126 if isinstance(r, dict): 127 r["taskList"] = r.pop("planList", []) 128 r._Entity__old = copy.deepcopy(r) 129 else: 130 for entity in r: 131 entity["taskList"] = entity.pop("planList", []) 132 entity._Entity__old = copy.deepcopy(entity) 133 return r 134 135 # Override here to correctly format the URL for the Predecessor Task link case 136 def _link( 137 self, to_entity_name, to_link, operation, update_cache=True, batch_linking=True 138 ): 139 """ 140 `to_entity_name`: Destination entity name (e.g. 'staff') 141 142 `to_link`: List of Entities of the same type (and optional data) to link to 143 144 `operation`: `add`, `update`, `delete` 145 146 'update_cache': also modify the entity's internal representation of the links 147 to match the operation that was done. Set this to False when replacing the 148 list with a new one (i.e., when calling save() instead of a linker method). 149 150 'batch_linking': Enabled by default, batches any link 151 updates required into composite API requests. If disabled 152 a request will be executed for each link update. 153 Recommended to leave enabled to increase performance. 154 """ 155 156 link_def = self._link_def_by_name[to_entity_name] 157 to_key = link_def["link_key"] 158 159 if isinstance(to_link, dict) and link_def["type"] == list: 160 # Convert input dict to list when link type is a list (we allow linking to single entity for convenience) 161 to_link = [to_link] 162 163 # For cases where user passed in dict instead of Entity, we turn them into 164 # Entity on their behalf. 165 typed_list = [] 166 target_cls = getattr(sys.modules["projectal.entities"], link_def["entity"]) 167 for link in to_link: 168 if not isinstance(link, target_cls): 169 typed_list.append(target_cls(link)) 170 else: 171 typed_list.append(link) 172 to_link = typed_list 173 else: 174 # For everything else, we expect types to match. 175 if not isinstance(to_link, link_def["type"]): 176 raise api.UsageException( 177 "Expected link type to be {}. Got {}.".format( 178 link_def["type"], type(to_link) 179 ) 180 ) 181 182 if not to_link: 183 return 184 185 url = "" 186 payload = {} 187 request_list = [] 188 # Is it a reverse linker? If so, invert the relationship 189 if link_def["reverse"]: 190 for link in to_link: 191 # Sets the data attribute on the correct 192 # link entity 193 if link_def["name"] == "predecessor_task": 194 data_name = link_def.get("data_name") 195 self[data_name] = copy.deepcopy(link[data_name]) 196 request_list.extend( 197 link._link( 198 self._name, 199 self, 200 operation, 201 update_cache, 202 batch_linking=batch_linking, 203 ) 204 ) 205 else: 206 # Only keep UUID and the data attribute, if it has one 207 def strip_payload(link): 208 single = {"uuId": link["uuId"]} 209 data_name = link_def.get("data_name") 210 if data_name and data_name in link: 211 single[data_name] = copy.deepcopy(link[data_name]) 212 # limiting data attribute removal to only planLink 213 # in case of side effects 214 if data_name == "planLink": 215 del link[data_name] 216 return single 217 218 # If batch linking is enabled and the entity to link is a list of entities, 219 # a separate request must be constructed for each one because the final composite 220 # request permits only one input per call 221 if to_entity_name == "predecessor_task" or to_entity_name == "task": 222 url = "/api/{}/plan/TASK/{}".format(self._path, operation) 223 else: 224 url = "/api/{}/link/{}/{}".format(self._path, to_entity_name, operation) 225 to_link_payload = None 226 if isinstance(to_link, list): 227 to_link_payload = [] 228 for link in to_link: 229 if batch_linking: 230 request_list.append( 231 { 232 "method": "POST", 233 "invoke": url, 234 "body": { 235 "uuId": self["uuId"], 236 to_key: [strip_payload(link)], 237 }, 238 } 239 ) 240 else: 241 to_link_payload.append(strip_payload(link)) 242 if isinstance(to_link, dict): 243 if batch_linking: 244 request_list.append( 245 { 246 "method": "POST", 247 "invoke": url, 248 "body": { 249 "uuId": self["uuId"], 250 to_key: strip_payload(to_link), 251 }, 252 } 253 ) 254 else: 255 to_link_payload = strip_payload(to_link) 256 257 if not batch_linking: 258 payload = {"uuId": self["uuId"], to_key: to_link_payload} 259 api.post(url, payload=payload) 260 261 if not update_cache: 262 return request_list 263 264 # Set the initial state if first add. We need the type to be set to correctly update the cache 265 if operation == "add" and self.get(to_key, None) is None: 266 if link_def.get("type") == dict: 267 self[to_key] = {} 268 elif link_def.get("type") == list: 269 self[to_key] = [] 270 271 # Modify the entity object's cache of links to match the changes we pushed to the server. 272 if isinstance(self.get(to_key, []), list): 273 if operation == "add": 274 # Sometimes the backend doesn't return a list when it has none. Create it. 275 if to_key not in self: 276 self[to_key] = [] 277 278 for to_entity in to_link: 279 self[to_key].append(to_entity) 280 else: 281 for to_entity in to_link: 282 # Find it in original list 283 for i, old in enumerate(self.get(to_key, [])): 284 if old["uuId"] == to_entity["uuId"]: 285 if operation == "update": 286 self[to_key][i] = to_entity 287 elif operation == "delete": 288 del self[to_key][i] 289 if isinstance(self.get(to_key, None), dict): 290 if operation in ["add", "update"]: 291 self[to_key] = to_link 292 elif operation == "delete": 293 self[to_key] = None 294 295 # Update the "old" record of the link on the entity to avoid 296 # flagging it for changes (link lists are not meant to be user editable). 297 if to_key in self: 298 self._Entity__old[to_key] = self[to_key] 299 300 return request_list 301 302 def update_order(self, order_at_uuId, order_as=True): 303 url = "/api/task/update?order-at={}&order-as={}".format( 304 order_at_uuId, "true" if order_as else "false" 305 ) 306 return api.put(url, [{"uuId": self["uuId"]}]) 307 308 def link_predecessor_task(self, predecessor_task): 309 return self.__plan(self, predecessor_task, "add") 310 311 def relink_predecessor_task(self, predecessor_task): 312 return self.__plan(self, predecessor_task, "update") 313 314 def unlink_predecessor_task(self, predecessor_task): 315 return self.__plan(self, predecessor_task, "delete") 316 317 @classmethod 318 def __plan(cls, from_task, to_task, operation): 319 url = "/api/task/plan/task/{}".format(operation) 320 # Invert the link relationship to match the linker 321 if isinstance(to_task, dict): 322 from_task_copy = copy.deepcopy(from_task) 323 from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy( 324 to_task[PredecessorTaskLinker._link_data_name] 325 ) 326 payload = {"uuId": to_task["uuId"], "taskList": [from_task_copy]} 327 api.post(url, payload=payload) 328 elif isinstance(to_task, list): 329 for task in to_task: 330 from_task_copy = copy.deepcopy(from_task) 331 from_task_copy[PredecessorTaskLinker._link_data_name] = copy.deepcopy( 332 task[PredecessorTaskLinker._link_data_name] 333 ) 334 payload = {"uuId": task["uuId"], "taskList": [from_task_copy]} 335 api.post(url, payload=payload) 336 return True 337 338 def parents(self): 339 """ 340 Return an ordered list of [name, uuId] pairs of this task's parents, up to 341 (but not including) the root of the project. 342 """ 343 payload = { 344 "name": "Task Parents", 345 "type": "msql", 346 "start": 0, 347 "limit": -1, 348 "holder": "{}".format(self["uuId"]), 349 "select": [ 350 ["TASK(one).PARENT_ALL_TASK.name"], 351 ["TASK(one).PARENT_ALL_TASK.uuId"], 352 ], 353 } 354 list = api.query(payload) 355 # Results come back in reverse order. Flip them around 356 list.reverse() 357 return list 358 359 def project_uuId(self): 360 """Return the `uuId` of the Project that holds this Task.""" 361 payload = { 362 "name": "Project that holds this task", 363 "type": "msql", 364 "start": 0, 365 "limit": 1, 366 "holder": "{}".format(self["uuId"]), 367 "select": [["TASK.PROJECT.uuId"]], 368 } 369 projects = api.query(payload) 370 for t in projects: 371 return t[0] 372 return None 373 374 @classmethod 375 def add_task_template(cls, project, template): 376 """Insert TaskTemplate `template` into Project `project`""" 377 url = "/api/task/task_template/add?override=false&group=false" 378 payload = {"uuId": project["uuId"], "templateList": [template]} 379 api.post(url, payload) 380 381 def reset_duration(self, calendars=None): 382 """Set this task's duration based on its start and end dates while 383 taking into account the calendar for weekends and scheduled time off. 384 385 calendars is expected to be the list of calendar objects for the 386 location of the project that holds this task. You may provide this 387 list yourself for efficiency (recommended) - if not provided, it 388 will be fetched for you by issuing requests to the server. 389 """ 390 if not calendars: 391 if "projectRef" not in self: 392 task = projectal.Task.get(self) 393 project_ref = task["projectRef"] 394 else: 395 project_ref = self["projectRef"] 396 project = projectal.Project.get(project_ref, links=["LOCATION"]) 397 for location in project.get("locationList", []): 398 calendars = location.calendar() 399 break 400 401 start = self.get("startTime") 402 end = self.get("closeTime") 403 if not start or start == DateLimit.Min: 404 return 0 405 if not end or end == DateLimit.Max: 406 return 0 407 408 # Build a list of weekday names that are non-working 409 base_non_working = set() 410 location_non_working = {} 411 location_working = set() 412 for calendar in calendars: 413 if calendar["name"] == "base_calendar": 414 for item in calendar["calendarList"]: 415 if not item["isWorking"]: 416 base_non_working.add(item["type"]) 417 418 if calendar["name"] == "location": 419 for item in calendar["calendarList"]: 420 start_date = datetime.date.fromisoformat(item["startDate"]) 421 end_date = datetime.date.fromisoformat(item["endDate"]) 422 if not item["isWorking"]: 423 delta = start_date - end_date 424 location_non_working[item["startDate"]] = delta.days + 1 425 else: 426 location_working = { 427 (start_date + datetime.timedelta(days=x)).strftime( 428 "%Y-%m-%d" 429 ) 430 for x in range((end_date - start_date).days + 1) 431 } 432 433 start = datetime.datetime.fromtimestamp(start / 1000) 434 end = datetime.datetime.fromtimestamp(end / 1000) 435 minutes = 0 436 current = start 437 while current <= end: 438 if ( 439 current.strftime("%A") in base_non_working 440 and current.strftime("%Y-%m-%d") not in location_working 441 ): 442 current += datetime.timedelta(days=1) 443 continue 444 if current.strftime("%Y-%m-%d") in location_non_working: 445 days = location_non_working[current.strftime("%Y-%m-%d")] 446 current += datetime.timedelta(days=days) 447 continue 448 minutes += 8 * 60 449 current += datetime.timedelta(days=1) 450 451 self["duration"] = minutes
Implementation of the Task API.
75 @classmethod 76 def create( 77 cls, 78 holder, 79 entities, 80 batch_linking=True, 81 disable_system_features=True, 82 enable_system_features_on_exit=True, 83 ): 84 """Create a Task 85 86 `holder`: An instance or the `uuId` of the owner 87 88 `entities`: `dict` containing the fields of the entity to be created, 89 or a list of such `dict`s to create in bulk. 90 """ 91 holder_id = holder["uuId"] if isinstance(holder, dict) else holder 92 params = "?holder=" + holder_id 93 out = super().create( 94 entities, 95 params, 96 batch_linking, 97 disable_system_features, 98 enable_system_features_on_exit, 99 ) 100 101 # Tasks should always refer to their parent and project. We don't get this information 102 # from the creation api method, but we can insert them ourselves because we know what 103 # they are. 104 def add_fields(obj): 105 obj.set_readonly("projectRef", holder_id) 106 obj.set_readonly("parent", obj.get("parent", holder_id)) 107 108 if isinstance(out, dict): 109 add_fields(out) 110 if isinstance(out, list): 111 for obj in out: 112 add_fields(obj) 113 return out
Create a Task
holder
: An instance or the uuId
of the owner
entities
: dict
containing the fields of the entity to be created,
or a list of such dict
s to create in bulk.
115 @classmethod 116 def get(cls, entities, links=None, deleted_at=None): 117 r = super().get(entities, links, deleted_at) 118 if not links: 119 return r 120 # When Predecessor Task links are fetched, 121 # make sure the key matches the name expected 122 # by the predecessor linking REST end point 123 if PredecessorTaskLinker._link_name.casefold() in ( 124 link.casefold() for link in links 125 ): 126 if isinstance(r, dict): 127 r["taskList"] = r.pop("planList", []) 128 r._Entity__old = copy.deepcopy(r) 129 else: 130 for entity in r: 131 entity["taskList"] = entity.pop("planList", []) 132 entity._Entity__old = copy.deepcopy(entity) 133 return r
Get one or more entities of the same type. The entity type is determined by the subclass calling this method.
entities
: One of several formats containing the uuId
s
of the entities you want to get (see bottom for examples):
str
or list ofstr
dict
or list ofdict
(withuuId
key)
links
: A case-insensitive list of entity names to fetch with
this entity. For performance reasons, links are only returned
on demand.
Links follow a common naming convention in the output with
a _List suffix. E.g.:
links=['company', 'location']
will appear as companyList
and
locationList
in the response.
# Example usage:
# str
projectal.Project.get('1b21e445-f29a-4a9f-95ff-fe253a3e1b11')
# list of str
ids = ['1b21e445-f29a...', '1b21e445-f29a...', '1b21e445-f29a...']
projectal.Project.get(ids)
# dict
project = project.Project.create({'name': 'MyProject'})
# project = {'uuId': '1b21e445-f29a...', 'name': 'MyProject', ...}
projectal.Project.get(project)
# list of dicts (e.g. from a query)
# projects = [{'uuId': '1b21e445-f29a...'}, {'uuId': '1b21e445-f29a...'}, ...]
project.Project.get(projects)
# str with links
projectal.Project.get('1b21e445-f29a...', 'links=['company', 'location']')
deleted_at
: Include this parameter to get a deleted entity.
This value should be a UTC timestamp from a webhook delete event.
338 def parents(self): 339 """ 340 Return an ordered list of [name, uuId] pairs of this task's parents, up to 341 (but not including) the root of the project. 342 """ 343 payload = { 344 "name": "Task Parents", 345 "type": "msql", 346 "start": 0, 347 "limit": -1, 348 "holder": "{}".format(self["uuId"]), 349 "select": [ 350 ["TASK(one).PARENT_ALL_TASK.name"], 351 ["TASK(one).PARENT_ALL_TASK.uuId"], 352 ], 353 } 354 list = api.query(payload) 355 # Results come back in reverse order. Flip them around 356 list.reverse() 357 return list
Return an ordered list of [name, uuId] pairs of this task's parents, up to (but not including) the root of the project.
359 def project_uuId(self): 360 """Return the `uuId` of the Project that holds this Task.""" 361 payload = { 362 "name": "Project that holds this task", 363 "type": "msql", 364 "start": 0, 365 "limit": 1, 366 "holder": "{}".format(self["uuId"]), 367 "select": [["TASK.PROJECT.uuId"]], 368 } 369 projects = api.query(payload) 370 for t in projects: 371 return t[0] 372 return None
Return the uuId
of the Project that holds this Task.
374 @classmethod 375 def add_task_template(cls, project, template): 376 """Insert TaskTemplate `template` into Project `project`""" 377 url = "/api/task/task_template/add?override=false&group=false" 378 payload = {"uuId": project["uuId"], "templateList": [template]} 379 api.post(url, payload)
Insert TaskTemplate template
into Project project
381 def reset_duration(self, calendars=None): 382 """Set this task's duration based on its start and end dates while 383 taking into account the calendar for weekends and scheduled time off. 384 385 calendars is expected to be the list of calendar objects for the 386 location of the project that holds this task. You may provide this 387 list yourself for efficiency (recommended) - if not provided, it 388 will be fetched for you by issuing requests to the server. 389 """ 390 if not calendars: 391 if "projectRef" not in self: 392 task = projectal.Task.get(self) 393 project_ref = task["projectRef"] 394 else: 395 project_ref = self["projectRef"] 396 project = projectal.Project.get(project_ref, links=["LOCATION"]) 397 for location in project.get("locationList", []): 398 calendars = location.calendar() 399 break 400 401 start = self.get("startTime") 402 end = self.get("closeTime") 403 if not start or start == DateLimit.Min: 404 return 0 405 if not end or end == DateLimit.Max: 406 return 0 407 408 # Build a list of weekday names that are non-working 409 base_non_working = set() 410 location_non_working = {} 411 location_working = set() 412 for calendar in calendars: 413 if calendar["name"] == "base_calendar": 414 for item in calendar["calendarList"]: 415 if not item["isWorking"]: 416 base_non_working.add(item["type"]) 417 418 if calendar["name"] == "location": 419 for item in calendar["calendarList"]: 420 start_date = datetime.date.fromisoformat(item["startDate"]) 421 end_date = datetime.date.fromisoformat(item["endDate"]) 422 if not item["isWorking"]: 423 delta = start_date - end_date 424 location_non_working[item["startDate"]] = delta.days + 1 425 else: 426 location_working = { 427 (start_date + datetime.timedelta(days=x)).strftime( 428 "%Y-%m-%d" 429 ) 430 for x in range((end_date - start_date).days + 1) 431 } 432 433 start = datetime.datetime.fromtimestamp(start / 1000) 434 end = datetime.datetime.fromtimestamp(end / 1000) 435 minutes = 0 436 current = start 437 while current <= end: 438 if ( 439 current.strftime("%A") in base_non_working 440 and current.strftime("%Y-%m-%d") not in location_working 441 ): 442 current += datetime.timedelta(days=1) 443 continue 444 if current.strftime("%Y-%m-%d") in location_non_working: 445 days = location_non_working[current.strftime("%Y-%m-%d")] 446 current += datetime.timedelta(days=days) 447 continue 448 minutes += 8 * 60 449 current += datetime.timedelta(days=1) 450 451 self["duration"] = minutes
Set this task's duration based on its start and end dates while taking into account the calendar for weekends and scheduled time off.
calendars is expected to be the list of calendar objects for the location of the project that holds this task. You may provide this list yourself for efficiency (recommended) - if not provided, it will be fetched for you by issuing requests to the server.