Thread’ом java не испортишь: часть iii

Multithreaded Priority Queue

The Queue module allows you to create a new queue object that can hold a specific number of items. There are following methods to control the Queue −


  • get() − The get() removes and returns an item from the queue.

  • put() − The put adds item to a queue.

  • qsize() − The qsize() returns the number of items that are currently in the queue.

  • empty() − The empty( ) returns True if queue is empty; otherwise, False.

  • full() − the full() returns True if queue is full; otherwise, False.

Example

#!/usr/bin/python

import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q
   def run(self):
      print "Starting " + self.name
      process_data(self.name, self.q)
      print "Exiting " + self.name

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
         if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
         else:
            queueLock.release()
         time.sleep(1)

threadList = 
nameList = 
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# Fill the queue
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
   pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
   t.join()
print "Exiting Main Thread"

When the above code is executed, it produces the following result −

Starting Thread-1
Starting Thread-2
Starting Thread-3
Thread-1 processing One
Thread-2 processing Two
Thread-3 processing Three
Thread-1 processing Four
Thread-2 processing Five
Exiting Thread-3
Exiting Thread-1
Exiting Thread-2
Exiting Main Thread

Previous Page Print Page

Next Page  

Классы байтовых потоков

Класс Назначение
BufferedlnputStream Буферизированный входной поток.
BufferedOutputStream Буферизированный выходной поток.
ByteArraylnputStream Входной поток, читающий из массива байт.
ByteArrayOutputStream Выходной поток, записывающий в массив байт.
DatalnputStream Входной поток, включающий методы для чтения стандартных типов  данных Java.
DataOutputStream Выходной поток, включающий методы для записи стандартных типов данных Java.
FilelnputStream Входной поток, читающий из файла.
FileOutputStream Выходной поток, записывающий в файл.
FilterlnputStream Реализация InputStream.
FilterOutputStream Реализация OutputStream.
ObjectlnputStream Входной поток для объектов.
ObjectOutputStream Выходной поток для объектов.
OutputStream Абстрактный класс, описывающий поток вывода.
PipedlnputStream Входной канал (например, межпрограммный).
PipedOutputStream Выходной канал.
PrintStream Выходной поток, включающий print() и println().
PushbacklnputStream Входной поток, реализующий операцию pushback (вернуть назад).
RandomAccessFile Позволяет перемещаться по файлу, читать из него или писать в него.
SequencelnputStream Входной поток, представляющий собой комбинацию двух и более входных потоков, которые читаются совместно  один после другого.

Thread Methods

Following is the list of important methods available in the Thread class.

Sr.No. Method & Description
1

public void start()

Starts the thread in a separate path of execution, then invokes the run() method on this Thread object.

2

public void run()

If this Thread object was instantiated using a separate Runnable target, the run() method is invoked on that Runnable object.

3

public final void setName(String name)

Changes the name of the Thread object. There is also a getName() method for retrieving the name.

4

public final void setPriority(int priority)

Sets the priority of this Thread object. The possible values are between 1 and 10.

5

public final void setDaemon(boolean on)

A parameter of true denotes this Thread as a daemon thread.

6

public final void join(long millisec)

The current thread invokes this method on a second thread, causing the current thread to block until the second thread terminates or the specified number of milliseconds passes.

7

public void interrupt()

Interrupts this thread, causing it to continue execution if it was blocked for any reason.

8

public final boolean isAlive()

Returns true if the thread is alive, which is any time after the thread has been started but before it runs to completion.

The previous methods are invoked on a particular Thread object. The following methods in the Thread class are static. Invoking one of the static methods performs the operation on the currently running thread.

Sr.No. Method & Description
1

public static void yield()

Causes the currently running thread to yield to any other threads of the same priority that are waiting to be scheduled.

2

public static void sleep(long millisec)

Causes the currently running thread to block for at least the specified number of milliseconds.

3

public static boolean holdsLock(Object x)

Returns true if the current thread holds the lock on the given Object.

4

public static Thread currentThread()

Returns a reference to the currently running thread, which is the thread that invokes this method.

5

public static void dumpStack()

Prints the stack trace for the currently running thread, which is useful when debugging a multithreaded application.

Example

The following ThreadClassDemo program demonstrates some of these methods of the Thread class. Consider a class DisplayMessage which implements Runnable

// File Name : DisplayMessage.java
// Create a thread to implement Runnable

public class DisplayMessage implements Runnable {
   private String message;
   
   public DisplayMessage(String message) {
      this.message = message;
   }
   
   public void run() {
      while(true) {
         System.out.println(message);
      }
   }
}

Following is another class which extends the Thread class −

// File Name : GuessANumber.java
// Create a thread to extentd Thread

public class GuessANumber extends Thread {
   private int number;
   public GuessANumber(int number) {
      this.number = number;
   }
   
   public void run() {
      int counter = 0;
      int guess = 0;
      do {
         guess = (int) (Math.random() * 100 + 1);
         System.out.println(this.getName() + " guesses " + guess);
         counter++;
      } while(guess != number);
      System.out.println("** Correct!" + this.getName() + "in" + counter + "guesses.**");
   }
}

Following is the main program, which makes use of the above-defined classes −

// File Name : ThreadClassDemo.java
public class ThreadClassDemo {

   public static void main(String [] args) {
      Runnable hello = new DisplayMessage("Hello");
      Thread thread1 = new Thread(hello);
      thread1.setDaemon(true);
      thread1.setName("hello");
      System.out.println("Starting hello thread...");
      thread1.start();
      
      Runnable bye = new DisplayMessage("Goodbye");
      Thread thread2 = new Thread(bye);
      thread2.setPriority(Thread.MIN_PRIORITY);
      thread2.setDaemon(true);
      System.out.println("Starting goodbye thread...");
      thread2.start();

      System.out.println("Starting thread3...");
      Thread thread3 = new GuessANumber(27);
      thread3.start();
      try {
         thread3.join();
      } catch (InterruptedException e) {
         System.out.println("Thread interrupted.");
      }
      System.out.println("Starting thread4...");
      Thread thread4 = new GuessANumber(75);
      
      thread4.start();
      System.out.println("main() is ending...");
   }
}

This will produce the following result. You can try this example again and again and you will get a different result every time.

Output

Starting hello thread...
Starting goodbye thread...
Hello
Hello
Hello
Hello
Hello
Hello
Goodbye
Goodbye
Goodbye
Goodbye
Goodbye
.......

Starting a New Thread

To spawn another thread, you need to call following method available in thread module −

thread.start_new_thread ( function, args )

This method call enables a fast and efficient way to create new threads in both Linux and Windows.

The method call returns immediately and the child thread starts and calls function with the passed list of args. When function returns, the thread terminates.

Here, args is a tuple of arguments; use an empty tuple to call function without passing any arguments. kwargs is an optional dictionary of keyword arguments.

Example

#!/usr/bin/python

import thread
import time

# Define a function for the thread
def print_time( threadName, delay):
   count = 0
   while count < 5:
      time.sleep(delay)
      count += 1
      print "%s: %s" % ( threadName, time.ctime(time.time()) )

# Create two threads as follows
try:
   thread.start_new_thread( print_time, ("Thread-1", 2, ) )
   thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
   print "Error: unable to start thread"

while 1:
   pass

When the above code is executed, it produces the following result −

Thread-1: Thu Jan 22 15:42:17 2009
Thread-1: Thu Jan 22 15:42:19 2009
Thread-2: Thu Jan 22 15:42:19 2009
Thread-1: Thu Jan 22 15:42:21 2009
Thread-2: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:23 2009
Thread-1: Thu Jan 22 15:42:25 2009
Thread-2: Thu Jan 22 15:42:27 2009
Thread-2: Thu Jan 22 15:42:31 2009
Thread-2: Thu Jan 22 15:42:35 2009

Although it is very effective for low-level threading, but the thread module is very limited compared to the newer threading module.

Взаимодействие потоков

В многопоточной среде часто возникают задачи, требующие приостановки и возобновления работы одних потоков в зависимости от работы других. В частности это задачи, связанные с предотвращенем конфликтов доступа при использовании одних и тех же данных или устройств из параллельно исполняемых потоков. Для решения таких задач используются специальные объекты для взаимодействия потоков, такие как взаимоисключения (мьютексы), семафоры, критические секции, события и т.п. Многие из этих объектов являются объектами ядра и могут применяться не только между потоками одного процесса, но и для взаимодействия между потоками разных процессов.

  • Взаимоисключения (mutex, мьютекс) — это объект синхронизации, который устанавливается в особое сигнальное состояние, когда не занят каким-либо потоком. Только один поток владеет этим объектом в любой момент времени, отсюда и название таких объектов (от английского mutually exclusive access — взаимно исключающий доступ) — одновременный доступ к общему ресурсу исключается. После всех необходимых действий мьютекс освобождается, предоставляя другим потокам доступ к общему ресурсу. Объект может поддерживать рекурсивный захват второй раз тем же потоком, увеличивая счётчик, не блокируя поток, и требуя потом многократного освобождения. Такова, например, критическая секция в Win32. Тем не менее, есть и такие реализации, которые не поддерживают такое и приводят к Взаимная блокировка|взаимной блокировке потока при попытке рекурсивного захвата. Например, это FAST_MUTEX в ядре Windows.
  • Семафоры представляют собой доступные ресурсы, которые могут быть приобретены несколькими потоками в одно и то же время, пока пул ресурсов не опустеет. Тогда дополнительные потоки должны ждать, пока требуемое количество ресурсов не будет снова доступно.
  • ERESOURCE. Мьютекс, поддерживающий рекурсивный захват, с семантикой разделяемого или эксклюзивного захвата. Семантика: объект может быть либо свободен, либо захвачен произвольным числом потоков разделяемым образом, либо захвачен всего одним потоком эксклюзивным образом. Любые попытки осуществить захваты, нарушающее это правило, приводят к блокировке потока до тех пор, пока объект не освободится так, чтобы сделать захват разрешённым. Также есть операции вида TryToAcquire — никогда не блокирует поток, либо захватывает, либо (если нужна блокировка) возвращает FALSE, ничего не делая. Используется в ядре Windows, особенно в файловых системах — так, например, любому кем-то открытому дисковому файлу соответствует структура FCB, в которой есть 2 таких объекта для синхронизации доступа к размеру файла. Один из них — paging IO resource — захватывается эксклюзивно только в пути обрезания файла, и гарантирует, что в момент обрезания на файле нет активного ввода-вывода от кэша и от отображения в память.

Настройка размера пула

Настраивая размер пула потоков, важно избежать двух ошибок: слишком мало потоков или слишком много потоков. К счастью, для большинства приложений спектр между слишком большим и слишком малым количеством потоков довольно широк

Если вы помните, есть два основных преимущества в организации поточной обработки сообщений в приложениях: возможность продолжения процесса во время ожидания медленных операций, таких, как I/O (ввод — вывод), и использование возможностей нескольких процессоров. В приложениях с ограничением по скорости вычислений, функционирующих на N-процессорной машине, добавление дополнительных потоков может улучшить пропускную способность, по мере того как количество потоков подходит к N, но добавление дополнительных потоков свыше N не оправдано. Действительно, слишком много потоков разрушают качество функционирования из-за дополнительных издержек переключения процессов

Оптимальный размер пула потоков зависит от количества доступных процессоров и природы задач в рабочей очереди. На N-процессорной системе для рабочей очереди, которая будет выполнять исключительно задачи с ограничением по скорости вычислений, вы достигните максимального использования CPU с пулом потоков, в котором содержится N или N+1 поток.

Для задач, которые могут ждать осуществления I/O (ввода — вывода) — например, задачи, считывающей HTTP-запрос из сокета – вам может понадобиться увеличение размера пула свыше количества доступных процессоров, потому, что не все потоки будут работать все время. Используя профилирование, вы можете оценить отношение времени ожидания (WT) ко времени обработки (ST) для типичного запроса. Если назвать это соотношение WT/ST, для N-процессорной системе вам понадобится примерно N*(1+WT/ST) потоков для полной загруженности процессоров.

Использование процессора – не единственный фактор, важный при настройке размера пула потоков. По мере возрастания пула потоков, можно столкнуться с ограничениями планировщика, доступной памяти, или других системных ресурсов, таких, как количество сокетов, дескрипторы открытого файла, или каналы связи базы данных

Типы реализации потоков

  1. Отсутствие прерывания по таймеру внутри одного процесса
  2. При использовании блокирующего системного запроса для процесса все его потоки блокируются.
  3. Сложность реализации
  • Поток в пространстве ядра. Наряду с таблицей процессов в пространстве ядра имеется таблица потоков.
  • «Волокна» (англ. fibers). Несколько потоков режима пользователя, исполняющихся в одном потоке режима ядра. Поток пространства ядра потребляет заметные ресурсы, в первую очередь физическую память и диапазон адресов режима ядра для стека режима ядра. Поэтому было введено понятие «волокна» — облегчённого потока, выполняемого исключительно в режиме пользователя. У каждого потока может быть несколько «волокон».

Аппаратная реализация

На обычном процессоре управление потоками осуществляется операционной системой. Поток исполняется до тех пор, пока не произойдёт аппаратное прерывание, системный вызов или пока не истечёт отведённое для него операционной системой время. После этого процессор переключается на код операционной системы, который сохраняет состояние потока (его контекст) или переключается на состояние другого потока, которому тоже выделяется время на исполнение. При такой многопоточности достаточно большое количество тактов процессора тратится на код операционной системы, переключающий контексты. Если поддержку потоков реализовать аппаратно, то процессор сам сможет переключаться между потоками, а в идеальном случае — выполнять несколько потоков одновременно за каждый такт. Для операционной системы и пользователя один такой физический процессор будет виден как несколько логических процессоров.

Различают две формы многопоточности, которые могут быть реализованы в процессорах аппаратно:

  • Временная многопоточность(англ. Temporal multithreading).
  • Одновременная многопоточность (англ. Simultaneous multithreading).

2.1 Потоки данных

Любая программа редко существует сама по себе. Обычно она как-то взаимодействует с «внешним миром». Это может быть считывание данных с клавиатуры, отправка сообщений, загрузка страниц из интернета или, наоборот, загрузка файлов на удалённый сервер.

Все эти вещи мы можем назвать одним словом — процесс обмена данными между программой и внешним миром. Хотя это уже не одно слово.


Сам процесс обмена данными можно разделить на два типа: получение данных и отправка данных. Например, вы считываете данные с клавиатуры с помощью объекта — это получение данных. И выводите данные на экран с помощью команды — это отправка данных.

Для описания процесса обмена данными в программировании используется термин поток. Откуда вообще взялось такое название?

В реальной жизни им может быть поток воды или поток людей (людской поток). В программировании же под потоком подразумевают поток данных.

Потоки — это универсальный инструмент. Они позволяют программе получать данные откуда угодно (входящие потоки) и отправляют данные куда угодно (исходящие потоки). Делятся на два вида:

  • Входящий поток (Input): используется для получения данных
  • Исходящий поток (Output): используется для отправки данных

Чтобы потоки можно было «потрогать руками», разработчики Java написали два класса: и .

У класса есть метод , который позволяет читать из него данные. А у класса есть метод , который позволяет записывать в него данные. У них есть и другие методы, но об этом после.

Байтовые потоки

Что же это за данные и в каком виде их можно читать? Другими словами, какие типы данных поддерживаются этими классами?

О, это универсальные классы, и поэтому они поддерживают самый распространённый тип данных — byte. В можно записывать байты (и массивы байт), а из объекта можно читать байты (или массивы байт). Все — никакие другие типы данных они не поддерживают.

Поэтому такие потоки еще называют байтовыми потоками.

Особенность потоков в том, что данные из них можно читать (писать) только последовательно. Вы не можете прочитать данные из середины потока, не прочитав все данные перед ними.

Именно так работает чтение с клавиатуры через класс Scanner: вы читаете данные с клавиатуры последовательно: строка за строкой. Прочитали строку, прочитали следующую строку, прочитали следующую строку и т.д. Поэтому метод чтения строки и называется (дословно — «следующая срока»).

Запись данных в поток тоже происходит последовательно. Хороший пример — вывод на экран. Вы выводите строку, за ней еще одну и еще одну. Это последовательный вывод. Вы не можете вывести 1-ю строку, затем 10-ю, а затем вторую. Все данные записываются в поток вывода только последовательно.

Символьные потоки

Недавно вы изучали, что строки — второй по популярности тип данных, и это действительно так. Очень много информации передается в виде символов и целых строк. Компьютер отлично бы передавал все в виде байт, но люди не настолько идеальны.

Java-программисты учли этот факт и написали еще два класса: и . Класс — это аналог класса , только его метод читает не байты, а символы — . Класс соответствует классу , и так же, как и класс , работает с символами (), а не байтами.

Если сравнить эти четыре класса, мы получим такую картину:

Байты (byte) Символы (char)
Чтение данных
Запись данных

Практическое применение

Сами классы , , и в явном виде никто не использует: они не присоединены ни к каким внешним объектам, из которых можно читать данные (или в которые можно писать данные). Однако у этих четырех классов много классов-наследников, которые умеют очень многое.

Взамоисключающие блокировки

int pthread_mutex_init (pthread_mutex_t *mp, const pthread_mutex_attr_t *mattrp)

инициализирует взаимоисключающую блокировку, выделяя необходимую память. Если mattrp=NULL, то создается блокировка с атрибутами «по умолчанию». В настоящее время атрибут один — область действия блокировки, его умолчательное значение — PTHREAD_PROCESS_PRIVATE (а может быть еще PTHREAD_PROCESS_SHARED).

int pthread_mutex_destroy (pthread_mutex_t *mp)

разрушает блокировку, освобождая выделенную память.

int pthread_mutex_lock (pthread_mutex_t *mp)
int pthread_mutex_unlock (pthread_mutex_t *mp)
int pthread_mutex_trylock (pthread_mutex_t *mp)

С помощью pthread_mutex_lock() поток пытается захватить блокировку. Если же блокировка уже принадлежит другому потоку, то вызывающий поток ставится в очередь (с учетом приоритетов потоков) к блокировке. После возврата из функции pthread_mutex_lock() блокировка будет принадлежать вызывающему потоку.

Функция pthread_mutex_unlock() освобождает захваченную ранее блокировку. Освободить блокировку может только ее владелец.

Функция pthread_mutex_trylock() — неблокирующая версия функции pthread_mutex_lock(). Если на момент обращения к этой функции блокировка уже захвачена, то происходит немедленный возврат из функции со значением EBUSY.

Закрытие потоков

Последнее обновление: 25.04.2018

При завершении работы с потоком его надо закрыть с помощью метода close(), который определен в интерфейсе Closeable. Метод close имеет следующее определение:

void close() throws IOException

Этот интерфейс уже реализуется в классах InputStream и OutputStream, а через них и во всех классах потоков.

При закрытии потока освобождаются все выделенные для него ресурсы, например, файл. В случае, если поток окажется не закрыт, может происходить утечка памяти.

Есть два способа закрытия файла. Первый традиционный заключается в использовании блока . Например, считаем данные из файла:

import java.io.*;

public class Program {

    public static void main(String[] args) {
        
        FileInputStream fin=null;
        try
        {
            fin = new FileInputStream("C://SomeDir//notes.txt");
            
            int i=-1;
            while((i=fin.read())!=-1){
            
                System.out.print((char)i);
            }
        }
        catch(IOException ex){
            
            System.out.println(ex.getMessage());
        } 
        finally{
            
            try{
            
                if(fin!=null)
                    fin.close();
            }
            catch(IOException ex){
            
                System.out.println(ex.getMessage());
            }
        }  
    } 
}

Поскольку при открытии или считывании файла может произойти ошибка ввода-вывода, то код считывания помещается в блок try. И чтобы быть уверенным, что поток в любом случае закроется, даже если при работе с ним возникнет ошибка, вызов метода помещается в блок . И, так как метод также в случае ошибки может генерировать исключение IOException, то его вызов также помещается во вложенный блок

Начиная с Java 7 можно использовать еще один способ, который автоматически вызывает метод close. Этот способ заключается в использовании конструкции try-with-resources (try-с-ресурсами). Данная конструкция работает с объектами, которые реализуют интерфейс . Так как все классы потоков реализуют интерфейс , который в свою очередь наследуется от , то их также можно использовать в данной конструкции

Итак, перепишем предыдущий пример с использованием конструкции try-with-resources:

import java.io.*;

public class Program {

    public static void main(String[] args) {
        
        try(FileInputStream fin=new FileInputStream("C://SomeDir//notes.txt"))
        {
            int i=-1;
            while((i=fin.read())!=-1){
            
                System.out.print((char)i);
            }   
        }
        catch(IOException ex){
            
            System.out.println(ex.getMessage());
        } 
    } 
}

Синтаксис конструкции следующий: . Данная конструкция также не исключает использования блоков .

После окончания работы в блоке try у ресурса (в данном случае у объекта ) автоматически вызывается метод close().

Если нам надо использовать несколько потоков, которые после выполнения надо закрыть, то мы можем указать объекты потоков через точку с запятой:

try(FileInputStream fin=new FileInputStream("C://SomeDir//Hello.txt"); 
        FileOutputStream fos = new FileOutputStream("C://SomeDir//Hello2.txt"))
{
	//..................
}

НазадВперед

Использование Queues


Очередь(Queues Python) может быть использована для стековых реализаций «пришел первым – ушел первым» (first-in-first-out (FIFO)) или же «пришел последним – ушел последним» (last-in-last-out (LILO)) , если вы используете их правильно.

В данном разделе, мы смешаем потоки и создадим простой скрипт файлового загрузчика, чтобы продемонстрировать, как работает Queues Python со случаями, которые мы хотим паралеллизировать. Чтобы помочь объяснить, как работает Queues, мы перепишем загрузочный скрипт из предыдущей секции для использования Queues. Приступим!

Python

# -*- coding: utf-8 -*-

import os import threading import urllib.request from queue import Queue

class Downloader(threading.Thread): «»»Потоковый загрузчик файлов»»» def __init__(self, queue): «»»Инициализация потока»»» threading.Thread.__init__(self) self.queue = queue def run(self): «»»Запуск потока»»» while True: # Получаем url из очереди url = self.queue.get() # Скачиваем файл self.download_file(url) # Отправляем сигнал о том, что задача завершена self.queue.task_done()

def download_file(self, url): «»»Скачиваем файл»»» handle = urllib.request.urlopen(url) fname = os.path.basename(url) with open(fname, «wb») as f: while True: chunk = handle.read(1024) if not chunk: break f.write(chunk)

def main(urls): «»» Запускаем программу «»» queue = Queue() # Запускаем потом и очередь for i in range(5): t = Downloader(queue) t.setDaemon(True) t.start() # Даем очереди нужные нам ссылки для скачивания for url in urls: queue.put(url)

# Ждем завершения работы очереди queue.join()

if __name__ == «__main__»: urls = [«http://www.irs.gov/pub/irs-pdf/f1040.pdf», «http://www.irs.gov/pub/irs-pdf/f1040a.pdf», «http://www.irs.gov/pub/irs-pdf/f1040ez.pdf», «http://www.irs.gov/pub/irs-pdf/f1040es.pdf», «http://www.irs.gov/pub/irs-pdf/f1040sb.pdf»] main(urls)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67

# -*- coding: utf-8 -*-  

importos

importthreading

importurllib.request

fromqueueimportQueue

classDownloader(threading.Thread)

«»»Потоковый загрузчик файлов»»»

def__init__(self,queue)

«»»Инициализация потока»»»

threading.Thread.__init__(self)

self.queue=queue

defrun(self)

«»»Запуск потока»»»

whileTrue

# Получаем url из очереди

url=self.queue.get()

# Скачиваем файл

self.download_file(url)

# Отправляем сигнал о том, что задача завершена

self.queue.task_done()

defdownload_file(self,url)

«»»Скачиваем файл»»»

handle=urllib.request.urlopen(url)

fname=os.path.basename(url)

withopen(fname,»wb»)asf

whileTrue

chunk=handle.read(1024)

ifnotchunk

break

f.write(chunk)

defmain(urls)

«»»

    Запускаем программу     «»»

queue=Queue()

# Запускаем потом и очередь

foriinrange(5)

t=Downloader(queue)

t.setDaemon(True)

t.start()

# Даем очереди нужные нам ссылки для скачивания

forurl inurls

queue.put(url)

# Ждем завершения работы очереди

queue.join()

if__name__==»__main__»

urls=»http://www.irs.gov/pub/irs-pdf/f1040.pdf»,

«http://www.irs.gov/pub/irs-pdf/f1040a.pdf»,

«http://www.irs.gov/pub/irs-pdf/f1040ez.pdf»,

«http://www.irs.gov/pub/irs-pdf/f1040es.pdf»,

«http://www.irs.gov/pub/irs-pdf/f1040sb.pdf»

main(urls)

Давайте притормозим. В первую очередь, нам нужно взглянуть на определение главной функции для того, чтобы увидеть, как все протекает. Здесь мы видим, что она принимает список url адресов. Далее, функция main создаете экземпляр очереди, которая передана пяти демонизированным потокам. Основная разница между демонизированным и недемонизированным потоком в том, что вам нужно отслеживать недемонизированные потоки и закрывать их вручную, в то время как поток «демон» нужно только запустить и забыть о нем. Когда ваше приложение закроется, закроется и поток. Далее мы загрузили очередь (при помощи метода put) вместе с переданными url. Наконец, мы указываем очереди подождать, пока потоки выполнят свои процессы через метод join. В классе download у нас есть строчка self.queue.get(), которая выполняет функцию блока, пока очередь делает что-либо для возврата. Это значит, что потоки скромно будут дожидаться своей очереди. Также это значит, чтобы поток получал что-нибудь из очереди, он должен вызывать метод очереди под названием get. Таким образом, добавляя что-нибудь в очередь, пул потоков, поднимет или возьмет эти объекты и обработает их. Это также известно как dequeing. После того, как все объекты в очередь обработаны, скрипт заканчивается и закрывается. На моем компьютере были загружены первые 5 документов за секунду.

Блокировки (Lock)

В следующем примере будут созданы три потока, каждый из которых будет считывать стартовую страницу по указанному Web-адресу. В примере имеется глобальный ресурс – список урлов – – доступ к которому будет блокироваться с помощью блокировки . Объект имеет методы:

– делает запрос на запирание замка. Если параметр не указан или является истиной, то поток будет ожидать освобождения замка.

Если параметр не был задан, метод не возвратит значения.

Если был задан и истинен, метод возвратит True (после успешного овладения замком).

Если блокировка не требуется (т.е. задан ), метод вернет , если замок не был заперт и им успешно овладел данный поток. В противном случае будет возвращено .

– запрос на отпирание замка.

– возвращает текущее состояние замка ( – заперт, – открыт).

import threading
from urllib import urlopen

class WorkerThread(threading.Thread):
  def __init__(self,url_list,url_list_lock):
    super(WorkerThread,self).__init__()
    self.url_list=url_list
    self.url_list_lock=url_list_lock
    
  def run(self):
    while (1):
      nexturl = self.grab_next_url()
      if nexturl==None:break
      self.retrieve_url(nexturl)
        
  def grab_next_url(self):
    self.url_list_lock.acquire(1)
    if len(self.url_list)<1:
      nexturl=None
    else:
      nexturl = self.url_list
      del self.url_list
    self.url_list_lock.release()
    return nexturl  
        
        
  def retrieve_url(self,nexturl):
    text = urlopen(nexturl).read()
    print text
    print '################### %s #######################' % nexturl
    
url_list=['http://linux.org.ru','http://kernel.org','http://python.org']
url_list_lock = threading.Lock()
workerthreadlist=[]
for x in range(0,3):
  newthread = WorkerThread(url_list,url_list_lock)
  workerthreadlist.append(newthread)
  newthread.start()
for x in range(0,3):
  workerthreadlist.join()

С этим читают