Friendly LWM2M client
WppTaskQueue.cpp
Go to the documentation of this file.
1 #include "WppTaskQueue.h"
2 
3 #include <algorithm>
4 #include <cstring>
5 #include "WppPlatform.h"
6 #include "WppLogs.h"
7 
8 namespace wpp {
9 
10 WppGuard WppTaskQueue::_taskQueueGuard;
11 WppGuard WppTaskQueue::_handleTaskGuard;
12 WppTaskQueue WppTaskQueue::_instance;
13 
14 
15 WppTaskQueue::WppTaskQueue() : _nextTaskId(WPP_ERR_TASK_ID) {}
16 
17 WppTaskQueue::~WppTaskQueue() {
18  hardReset();
19 }
20 
21 /* ------------- Tasks management ------------- */
22 WppTaskQueue::task_id_t WppTaskQueue::addTask(time_t delaySec, task_t task) {
23  return addTask(WPP_TASK_DEF_CTX, delaySec, task);
24 }
25 
26 WppTaskQueue::task_id_t WppTaskQueue::addTask(void *ctx, time_t delaySec, task_t task) {
27  if (delaySec < WPP_TASK_MIN_DELAY_S || WPP_TASK_MAX_DELAY_S < delaySec) return WPP_ERR_TASK_ID;
28 
29  _taskQueueGuard.lock();
30 
31  task_id_t id = _instance.getNextTaskId();
32  if (id == WPP_ERR_TASK_ID) {
33  WPP_LOGE(TAG_WPP_TASK, "Can't add task becouse task id is WPP_ERR_TASK_ID, looks like all task ids are busy.");
34  _taskQueueGuard.unlock();
35  return WPP_ERR_TASK_ID;
36  }
37 
38  TaskInfo *newTask = new TaskInfo;
39  newTask->id = id;
40  newTask->task = task;
41  newTask->delaySec = delaySec;
42  newTask->nextCallTime = WppPlatform::getTime() + newTask->delaySec;
43  newTask->ctx = ctx;
44  newTask->ctxSize = 0;
45  newTask->state = IDLE;
46 
47  _instance._tasks.push_back(newTask);
48  _taskQueueGuard.unlock();
49 
50  return id;
51 }
52 
53 WppTaskQueue::task_id_t WppTaskQueue::addTaskWithCopy(const void *ctx, size_t size, time_t delaySec, task_t task) {
54  if (!ctx || !size || delaySec < WPP_TASK_MIN_DELAY_S || WPP_TASK_MAX_DELAY_S < delaySec) return WPP_ERR_TASK_ID;
55 
56  _taskQueueGuard.lock();
57 
58  task_id_t id = _instance.getNextTaskId();
59  if (id == WPP_ERR_TASK_ID) {
60  WPP_LOGE(TAG_WPP_TASK, "Can't add task becouse task id is WPP_ERR_TASK_ID, looks like all task ids are busy.");
61  _taskQueueGuard.unlock();
62  return WPP_ERR_TASK_ID;
63  }
64 
65  TaskInfo *newTask = new TaskInfo;
66  newTask->id = id;
67  newTask->task = task;
68  newTask->delaySec = delaySec;
69  newTask->nextCallTime = WppPlatform::getTime() + newTask->delaySec;
70  newTask->ctx = new uint8_t[size];
71  memcpy(newTask->ctx, ctx, size);
72  newTask->ctxSize = size;
73  newTask->state = IDLE;
74 
75  _instance._tasks.push_back(newTask);
76  _taskQueueGuard.unlock();
77 
78  return id;
79 }
80 
81 size_t WppTaskQueue::getTaskCnt() {
82  return _instance._tasks.size();
83 }
84 
85 bool WppTaskQueue::isTaskExist(task_id_t id) {
86  _taskQueueGuard.lock();
87 
88  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [id](TaskInfo *task) { return task->id == id; });
89  bool isExist = task != _instance._tasks.end();
90 
91  _taskQueueGuard.unlock();
92  return isExist;
93 }
94 
95 bool WppTaskQueue::isTaskIdle(task_id_t id) {
96  _taskQueueGuard.lock();
97 
98  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [id](TaskInfo *task) { return task->id == id; });
99  if (task == _instance._tasks.end()) {
100  _taskQueueGuard.unlock();
101  return false;
102  }
103  bool isIdle = (*task)->state & IDLE;
104 
105  _taskQueueGuard.unlock();
106  return isIdle;
107 }
108 
109 bool WppTaskQueue::isTaskExecuting(task_id_t id) {
110  _taskQueueGuard.lock();
111 
112  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [id](TaskInfo *task) { return task->id == id; });
113  if (task == _instance._tasks.end()) {
114  _taskQueueGuard.unlock();
115  return false;
116  }
117  bool isExecuting = (*task)->state & EXECUTING;
118 
119  _taskQueueGuard.unlock();
120  return isExecuting;
121 }
122 
123 bool WppTaskQueue::isTaskShouldBeDeleted(task_id_t id) {
124  _taskQueueGuard.lock();
125 
126  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [id](TaskInfo *task) { return task->id == id; });
127  if (task == _instance._tasks.end()) {
128  _taskQueueGuard.unlock();
129  return false;
130  }
131  bool isShouldBeDeleted = (*task)->state & SHOULD_BE_DELETED;
132 
133  _taskQueueGuard.unlock();
134  return isShouldBeDeleted;
135 }
136 
137 void WppTaskQueue::requestToRemoveTask(task_id_t id) {
138  _taskQueueGuard.lock();
139 
140  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [id](TaskInfo *task) { return task->id == id; });
141  if (task == _instance._tasks.end()) {
142  _taskQueueGuard.unlock();
143  return;
144  }
145  (*task)->state = (TaskState)((*task)->state | SHOULD_BE_DELETED);
146 
147  _taskQueueGuard.unlock();
148 }
149 
150 void WppTaskQueue::requestToRemoveEachTask() {
151  _taskQueueGuard.lock();
152  for (auto task : _instance._tasks) task->state = (TaskState)(task->state | SHOULD_BE_DELETED);
153  _taskQueueGuard.unlock();
154 }
155 
156 void WppTaskQueue::hardReset() {
157  _handleTaskGuard.lock();
158  _taskQueueGuard.lock();
159 
160  for (auto task : _instance._tasks) {
161  if (task->ctxSize > 0) {
162  delete[] (uint8_t *)(task->ctx);
163  task->ctxSize = 0;
164  }
165  delete task;
166  }
167  _instance._tasks.clear();
168 
169  _handleTaskGuard.unlock();
170  _taskQueueGuard.unlock();
171 }
172 
173 time_t WppTaskQueue::handleEachTask(WppClient& client) {
174  _handleTaskGuard.lock();
175 
176  _taskQueueGuard.lock();
177  // We create copy for be sure that adding new task not corrupted begin()
178  // and end() iterators inside _tasks list.
179  std::list<TaskInfo *> tasksCopy = _instance._tasks;
180  _taskQueueGuard.unlock();
181 
182  for (auto task : tasksCopy) {
183  // Be sure that we do not override SHOULD_BE_DELETED state
184  _taskQueueGuard.lock();
185  if (task->state & SHOULD_BE_DELETED || task->nextCallTime > WppPlatform::getTime()) {
186  _instance._taskQueueGuard.unlock();
187  continue;
188  }
189  task->state = EXECUTING;
190  _taskQueueGuard.unlock();
191 
192  // Here state can be changed to SHOULD_BE_DELETED but it is
193  // not matter becouse we have already set EXECUTING state look
194  // at description of requestToRemoveTask() and requestToRemoveEachTask()
195  bool isFinished = task->task(client, task->ctx);
196 
197  // Be sure that we do not override SHOULD_BE_DELETED state
198  _taskQueueGuard.lock();
199  if (isFinished || task->state & SHOULD_BE_DELETED) {
200  task->state = SHOULD_BE_DELETED;
201  } else {
202  task->state = IDLE;
203  task->nextCallTime = WppPlatform::getTime() + task->delaySec;
204  }
205  _taskQueueGuard.unlock();
206  }
207 
208  _instance.deleteFinishedTasks();
209  time_t nextCallInterval = _instance.updateNextCallTimeForTasks();
210  _handleTaskGuard.unlock();
211  return nextCallInterval;
212 }
213 
214 void WppTaskQueue::deleteFinishedTasks() {
215  _taskQueueGuard.lock();
216  for (auto task = _tasks.begin(); task != _tasks.end();) {
217  if (((*task)->state & SHOULD_BE_DELETED) == 0) {
218  task++;
219  continue;
220  }
221  if ((*task)->ctxSize > 0) {
222  delete[] (uint8_t *)((*task)->ctx);
223  (*task)->ctxSize = 0;
224  }
225  delete (*task);
226  task = _tasks.erase(task);
227  }
228  _taskQueueGuard.unlock();
229 }
230 
231 
232 time_t WppTaskQueue::updateNextCallTimeForTasks() {
233  _taskQueueGuard.lock();
234  time_t nextCallInterval = WPP_TASK_MAX_DELAY_S;
235  for (auto task: _tasks) {
236  time_t taskCallInterval = std::max(task->nextCallTime - WppPlatform::getTime(), (time_t)0);
237  if (taskCallInterval < nextCallInterval) nextCallInterval = taskCallInterval;
238  }
239  _taskQueueGuard.unlock();
240  return nextCallInterval;
241 }
242 
243 WppTaskQueue::task_id_t WppTaskQueue::getNextTaskId() {
244  task_id_t baseId = _nextTaskId;
245  task_id_t newId = WPP_ERR_TASK_ID;
246  bool isExist = true;
247 
248  do {
249  newId = _nextTaskId++;
250  if (newId == WPP_ERR_TASK_ID) continue;
251  auto task = std::find_if(_instance._tasks.begin(), _instance._tasks.end(), [newId](TaskInfo *task) { return task->id == newId; });
252  isExist = task != _instance._tasks.end();
253  } while (isExist && baseId != _nextTaskId);
254 
255  if (baseId == _nextTaskId) newId = WPP_ERR_TASK_ID;
256 
257  return newId;
258 }
259 
260 } // namespace wpp
#define WPP_LOGE(TAG, FMT,...)
Definition: WppLogs.h:49
#define TAG_WPP_TASK
Definition: WppLogs.h:15
#define WPP_TASK_MAX_DELAY_S
Definition: WppTaskQueue.h:19
#define WPP_TASK_MIN_DELAY_S
Definition: WppTaskQueue.h:17
#define WPP_ERR_TASK_ID
Definition: WppTaskQueue.h:22
#define WPP_TASK_DEF_CTX
Definition: WppTaskQueue.h:21
Represents a client interface for Wpp library.
Definition: WppClient.h:37
std::function< bool(WppClient &, void *)> task_t
Definition: WppTaskQueue.h:61
uint32_t task_id_t
Definition: WppTaskQueue.h:52
The WppConnection class represents a connection interface for the Wpp library.
Definition: WppClient.cpp:14