Публикации с меткой «threading»

Python for SEO

Threading и KeyboardInterrupt.

Как организовать пул потоков? Допустим у меня есть 100 данных (пусть они находятся в каком-либо списке) и все их нужно переработать в 10 потоков. Стартуем 10 потоков, в каждый передаем по 1 данному, следим за потоками, как только один отработал, стартуем новый и так пока не кончатся данные… В общем, это не то, чтобы бред, но есть способ поудобней.

За всё время работы скрипта, стартует только 10 потоков, просто каждый из них, отработав со своим 1 данным, не дохнет, а берет из списка следующее 1 данное и делает с ним эту же работу:

# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback

class Worker(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.__queue = queue

    def run(self):
        while True:

            try: item = self.__queue.get_nowait() # ждём данные
            except Queue.Empty: break # данные закончились, прекращаем работу

            try: self.work(item) # работа
            except Exception: traceback.print_exc()

            time.sleep(0.5)
            self.__queue.task_done() # задача завершена
        return
    def work(self,item):
        print item
        #pass

def main():
    # Выводим в 5 потоков цифры от 1 до 100.
    queue = Queue.Queue()
    num_threads = 5 # 5 потоков

    for x in xrange(100):
        queue.put(x) # заносим данные в очередь
    for i in xrange(num_threads):
        t = Worker(queue) # создаем поток
        t.start() # стартуем
        time.sleep(0.1) # чтобы в консоли друг на друга не накладывались

    queue.join() #блокируем выполнение программы, пока не будут израсходованы данные.

    print "Done!"
if __name__ == '__main__':
    main()

И, в общем, хотелось бы еще, чтобы можно всё это в любой момент можно было бы остановить. Но поскольку мы выполнение программы блокируем с помощью queue.join(), то скрипт наши Ctr+C банально «не услышит». Я начал искать, что делать в таком случае, нашел вот такое решение, но у меня оно работало абы как. А потом, поразмыслив, я сам придумал решение, которое оказалось ужасно простым.

# -*- coding: utf-8 -*-
import threading, Queue, time
import traceback

class Worker(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.__queue = queue

        self.kill_received = False # флаг прекращения работы
    def run(self):
        while not self.kill_received:

            try: item = self.__queue.get_nowait() # ждём данные
            except Queue.Empty: break

            try: self.work(item)
            except Exception: traceback.print_exc()

            time.sleep(0.5)
            self.__queue.task_done() # задача завершена
            self.__queue.put(item) # зациклим
        return
    def work(self,item):
        print item

def main():
    queue = Queue.Queue()
    num_threads = 5 # 5 потоков

    threads = []
    for x in xrange(100):
        queue.put(x) # заносим данные в очередь
    for i in xrange(num_threads):
        t = Worker(queue) # создаем нить
        threads.append(t)
        t.start() # стартуем
        time.sleep(0.1)

    #Пока в "живых" не останется только главный поток, ждем.
    while threading.activeCount()>1:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            print "Ctrl-c received! Sending kill to threads..."
            for t in threads:
                t.kill_received = True # даем сигнал о завершении всем потокам
    print "Done!"
if __name__ == '__main__':
    main()

Блог python на хабрахабре

Python / [Ссылка] Еще раз о GIL

Подробное и вдумчивое изложение того, как работает GIL, и чем отличается реализация, появившаяся в Python 3.2.

Python for SEO

Пул потоков. Модуль Threading.

Я думаю,  пример из SEO-софта на тему пула потоков долго искать не придется. Взять тот же Хрумер. Указал количество потоков, например 20, запустил – все 20 работают, как только один из них завершит работу – запускается следующий и так, пока не закончатся ссылки. Наверное в 9 из 10 случаев я пишу скрипт с использованием пула потоков.  Рассмотрим некторые примеры.

Пример 1. «В лоб».

    threads = []
    for x in range(THREADS):
        new_thread = threading.Thread(target=step_one)
        threads.append(new_thread)
        new_thread.start()
        time.sleep(1)
    while True: # какое-то условие
        for y in threads[:]:
            if not y.isAlive():
                threads.remove(y)
                new_thread = threading.Thread(target=step_one)
                threads.append(new_thread)
                new_thread.start()
        time.sleep(0.1)
    while threading.activeCount()>0:
        time.sleep(5)

Функцию step_one запускаем в несколько потоков, все созданные потоки храним в списке. Затем провереяем в цикле какие потоки завершили работу и, если такие есть, создаем новые и так, пока не достигнем какого-либо результата. Криво, неудобно, зато сразу понятно. Достаточно долго пользовался такой реализацией, а всё из-за лени.

Пример 2. Семафоры.

import time
import threading
max_potokov = 20
maxconnections = 5
akkiQueue = threading.BoundedSemaphore(value=maxconnections)

def doform():
    akkiQueue.acquire()
    # код функции
    akkiQueue.release()

for i in xrange(max_potokov):
    time.sleep(1)
    p = threading.Thread(target=doform)
    p.setDaemon(True)
    p.start()
while threading.activeCount()>0:
    time.sleep(5)

Уже лучше, количество потоков контролируется, но надо заранее знать сколько будет потоков, в общем как-то изворачиваться если тебе надо будет прервать все эти потоки по условию.

Пример 3. Очереди.

queue = Queue.Queue() # создаем очередь

def register():
    '''Функция с основным кодом'''
    return
def repeat():
    while True:
        try:
            item = queue.get_nowait() # ждём данные
        except Queue.Empty:
            break
        register(item) # передаем данные в нашу функцию
        time.sleep(0.5)
        queue.task_done() # задача завершена
for i in range(MAX_THREADS):
    queue.put(item) # заносим данные в очередь
for i in xrange(THREADS):
    t = threading.Thread(target=repeat) # создаем нить
    t.start() # стартуем
    time.sleep(0.5)
queue.join() # блокируем очередь до завершения

Здесь используется модуль Queue. На данный для меня самый удобный варинат – и выглядит красиво, и данные передавать можно.

Помимо тех вариантов, что я описал, можно погуглить на тему threadpool и найти какие-нибудь модули для организации пула потоков.

Метки

.net .NET C# .sort 1.2 2009 2010 404 error admin ajax amazon analytics and apache api archlinux asp.net async asynchronous autocomplete bash blender blog blogengine blogs book bootstrap bot bpython buildout byteflow bzr C c plus plus C++ cache cbv Chaco checkio chrome ci ckeditor class based views clojure closure cms cms с удобной админкой code coding style collectd COM comet competition conference ConfigParser contest Context continuous integration CouchDB coverage CppCMS cpyext cpython crud csrf CSS ctypes curl custom model fields cx_freeze cython database db dbm dbqueries debian debug debugging decorator decorators deploy deployment descriptor design dev devconf developers development diveintopython Django django 1.2 django 1.3 django advent django framework django template django trunk django weblog django-admin-tools django-cms django-compressor django-hosts django-piston django-registration django-sphinx django.admin djangoadvent djangocms djangodash doc documentation drupal e-legion eclipse EGit emacs encoding Enthought epoll erlang event exception ExtJS fabric facebook fastcgi finaloption fixtures fonts forms formset fp framework freebsd freeswitch fs2web ftp fun funcparserlib functional gae gamin gandi generic views gettext gevent gil git github gitosis Google Google App Engine google picasa Google Translate google wave Google Web Toolkit grab grablab greenlet gtd gui haskell hg hgshelve highlighter host hosting how-to howto html html5lib Hudson humor i18n icfpc ide idiomatic image-scripting improvements Internet interpreter ipython ironpython izmenimsya.ru jabber java javascript jenkins jetbrains JIT job jquery json jstree jython kde kiev kiyv kyivpy l10n ldap library libs Life Links linux Linux & Unix LLVM logging logs lxml Mac OS X magic mail markdown Matplotlib Mayavi maybe mediavirus meetup memcache Memcached memory messages metaclass middleware migration mikrotik mkd model models mod_python mod_wsgi mongodb monitoring mptt musicmans.ru musicx mvc my-projects mysql netCDF networkx newforms newforms-admin news nginx Nhibernate nix nose NoSQL numpy oop open source OpenID openoffice opster optimization oracle orm os pagination parsing path patterns pdf PDF-принтер PEP PEP8 performance performance optimization perl personality photo php picture-driven computing PIL pinax pingback pip plasma plone plugin plugins postgresql programming progress bar psycopg2 py2exe pybb pybbm pycamp pycharm pycon pycow pycurl pydev pygtk pylons PyNGL pypy pyqt PyQt4 pyrad pyramid PySide Python Python 2.5 python 2.7 python 3 python c api python speed python-mssql python3 pywinauto Qt Qt4 queue rabbitmq radius raw sql re redis redsolution redsolution cms regexp regular expressions release repoze.bfg RequestContext reusable apps robokassa rss ru ruby ruby-on-rails sample satchmo scalability SciPy scraping screencast search selenium self.error seo server setattr settings setuptools shell sikuli sms snippet socket.io software sorting south sphinx spider sql sqlalchemy sqlite ssh startup step-by-step subdomain subversion svn SyntaxHighlighter system tags tdd tddspry teh drama template templates templatetags test testing thinkpad threading threads tips tips and tricks tools tornadio tornado tornado server tricks tutorial tweepy twisted twitter typography uapycon Ubuntu ucsvlog uml Uncategorized unicode unit test unit testing UnitTest Unladen Swallow upload urllib urls utf-8 uwsgi validation vcs versioning video vim virtualenv Visual Studio vkontakte voip wave web web-devel web-services web-разработка webdev webfaction webkit webpy websockets webtest widget widgets Win API windows Wirbel work wrapper wsgi wxPython wxWidgets wysiwyg xapian xml xmonad xmpp xpath yandex youtube zip zomg zope [cdata[cbv]] [cdata[ci]] [cdata[class based views]] [cdata[continuous integration]] [cdata[django framework]] [cdata[django-sphinx]] [cdata[django]] [cdata[nginx]] [cdata[python]] [cdata[virtualenv]] [cdata[программирование]] автоматизация администрирование администрирование django админка алгоритмы архитектура атрибуты базы данных Без рубрики безопасность библиотеки блоге бот веб-разработка видео Визуализация данных вконтакте Все записи гвидо ван россум граббер графика графы декоратор декораторы дескриптор дескрипторы документация заметки игра жизнь идея интересное киев Клиентам книги конференция личное математика метаклассы модели модули монады морфология мысли невозможное новости о облачные вычисления обо мне Обработка данных оптимизация оптимизация кода Основная лента основы парсинг парсинг сайтов перевод песочница Питон поебень поиск правила кодирования программирование Проектирование производительность работа рабочее размышлизмы Разное разработка разработка приложений разработки регулярные выражения сайт событие события ссылки статьи тестирование тесты Тюмень убунтариум фигня философия формы форум Хабрахабр хакинг хостинг шаблоны шаблоны проектирования эксперимент Эксперименты юмор я пиарюсь Яндекс