1.0 Documentation

sleekxmpp.xmlstream.scheduler

Contents

Source code for sleekxmpp.xmlstream.scheduler

# -*- coding: utf-8 -*-
"""
    sleekxmpp.xmlstream.scheduler
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    This module provides a task scheduler that works better
    with SleekXMPP's threading usage than the stock version.

    Part of SleekXMPP: The Sleek XMPP Library

    :copyright: (c) 2011 Nathanael C. Fritz
    :license: MIT, see LICENSE for more details
"""

import time
import threading
import logging
import itertools

from sleekxmpp.util import Queue, QueueEmpty


#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1.0


log = logging.getLogger(__name__)


[docs]class Task(object): """ A scheduled task that will be executed by the scheduler after a given time interval has passed. :param string name: The name of the task. :param int seconds: The number of seconds to wait before executing. :param callback: The function to execute. :param tuple args: The arguments to pass to the callback. :param dict kwargs: The keyword arguments to pass to the callback. :param bool repeat: Indicates if the task should repeat. Defaults to ``False``. :param pointer: A pointer to an event queue for queuing callback execution instead of executing immediately. """ def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): #: The name of the task. self.name = name #: The number of seconds to wait before executing. self.seconds = seconds #: The function to execute once enough time has passed. self.callback = callback #: The arguments to pass to :attr:`callback`. self.args = args or tuple() #: The keyword arguments to pass to :attr:`callback`. self.kwargs = kwargs or {} #: Indicates if the task should repeat after executing, #: using the same :attr:`seconds` delay. self.repeat = repeat #: The time when the task should execute next. self.next = time.time() + self.seconds #: The main event queue, which allows for callbacks to #: be queued for execution instead of executing immediately. self.qpointer = qpointer
[docs] def run(self): """Execute the task's callback. If an event queue was supplied, place the callback in the queue; otherwise, execute the callback immediately. """ if self.qpointer is not None: self.qpointer.put(('schedule', self.callback, self.args, self.kwargs, self.name)) else: self.callback(*self.args, **self.kwargs) self.reset() return self.repeat
[docs] def reset(self): """Reset the task's timer so that it will repeat.""" self.next = time.time() + self.seconds
[docs]class Scheduler(object): """ A threaded scheduler that allows for updates mid-execution unlike the scheduler in the standard library. Based on: http://docs.python.org/library/sched.html#module-sched :param parentstop: An :class:`~threading.Event` to signal stopping the scheduler. """ def __init__(self, parentstop=None): #: A queue for storing tasks self.addq = Queue() #: A list of tasks in order of execution time. self.schedule = [] #: If running in threaded mode, this will be the thread processing #: the schedule. self.thread = None #: A flag indicating that the scheduler is running. self.run = False #: An :class:`~threading.Event` instance for signalling to stop #: the scheduler. self.stop = parentstop #: Lock for accessing the task queue. self.schedule_lock = threading.RLock() #: The time in seconds to wait for events from the event queue, #: and also the time between checks for the process stop signal. self.wait_timeout = WAIT_TIMEOUT
[docs] def process(self, threaded=True, daemon=False): """Begin accepting and processing scheduled tasks. :param bool threaded: Indicates if the scheduler should execute in its own thread. Defaults to ``True``. """ if threaded: self.thread = threading.Thread(name='scheduler_process', target=self._process) self.thread.daemon = daemon self.thread.start() else: self._process()
def _process(self): """Process scheduled tasks.""" self.run = True try: while self.run and not self.stop.is_set(): updated = False if self.schedule: wait = self.schedule[0].next - time.time() else: wait = self.wait_timeout try: if wait <= 0.0: newtask = self.addq.get(False) else: newtask = None while self.run and \ not self.stop.is_set() and \ newtask is None and \ wait > 0: try: newtask = self.addq.get(True, min(wait, self.wait_timeout)) except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting. wait -= self.wait_timeout except QueueEmpty: # Time to run some tasks, and no new tasks to add. self.schedule_lock.acquire() # select only those tasks which are to be executed now relevant = itertools.takewhile( lambda task: time.time() >= task.next, self.schedule) # run the tasks and keep the return value in a tuple status = map(lambda task: (task, task.run()), relevant) # remove non-repeating tasks for task, doRepeat in status: if not doRepeat: try: self.schedule.remove(task) except ValueError: pass else: # only need to resort tasks if a repeated task has # been kept in the list. updated = True else: # Add new task self.schedule_lock.acquire() if newtask is not None: self.schedule.append(newtask) updated = True finally: if updated: self.schedule.sort(key=lambda task: task.next) self.schedule_lock.release() except KeyboardInterrupt: self.run = False except SystemExit: self.run = False log.debug("Quitting Scheduler thread")
[docs] def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): """Schedule a new task. :param string name: The name of the task. :param int seconds: The number of seconds to wait before executing. :param callback: The function to execute. :param tuple args: The arguments to pass to the callback. :param dict kwargs: The keyword arguments to pass to the callback. :param bool repeat: Indicates if the task should repeat. Defaults to ``False``. :param pointer: A pointer to an event queue for queuing callback execution instead of executing immediately. """ try: self.schedule_lock.acquire() for task in self.schedule: if task.name == name: raise ValueError("Key %s already exists" % name) self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) except: raise finally: self.schedule_lock.release()
[docs] def remove(self, name): """Remove a scheduled task ahead of schedule, and without executing it. :param string name: The name of the task to remove. """ try: self.schedule_lock.acquire() the_task = None for task in self.schedule: if task.name == name: the_task = task if the_task is not None: self.schedule.remove(the_task) except: raise finally: self.schedule_lock.release()
[docs] def quit(self): """Shutdown the scheduler.""" self.run = False

Contents