Filed Under (Multiprocessing, Python) by Marcin Kuźmiński on July-17-2009

I’d started to play with multiprocessing module that came up with python 2.6. Multiprocessing module is very similar to threading (it has almost the same functions/classes that threading).

Here are by my opinion three advantages over threading.

  • Multiprocessing runs on processes not threads.
  • Overcomes the GIL (global interpreter lock) that threading is using by using sub processes.
  • Processes can be synchronized even remotely so we could write a concurrent calculations over the network

I made a simple example class that uses multiprocessing module to scan ports, just for testing i made the same thing that Lukasz made using threading. This example however is not more efficient than the one presented in http://www.python-blog.com/2009/07/01/python-threaded/ but when we could replace the function check_port with more CPU consuming function we could end up with performance grater by the number of cpus/cores we have. For example in one of my projects recently i made a calculations for popular gambling game in Poland, MultiMulti. I’d made up a calculations of most repeating number in last 50 games. To calculate combination of 9’s over 20’s 50 times with threading i’d got around 1200s with multiprocessing i was able to calculate it in around 700 s with my Core2Duo CPU. I wish i had core2 quad to check the performance :) So i’f you need to make some heavy calculations multiprocessing can give you that performance.

Here’s the code and you can download the port descriptions file port_list to match port with description.

from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys

class PortScanner(object):
    ''' multiprocessing port scanner with port description'''

    def __init__(self, host = '' , port_range = (1, 100), nr_processes = cpu_count(), port_list_file = ''):
        '''
        port_range=(start,stop) default 1,100
        nr_processes = int default cpu_count() '''
        q = Queue()
        l = Lock()
        port_list = []

        try:
            for i in open(port_list_file).readlines():
                port_list.append([x.strip() for x in i.split('\t')])

        except IOError:
            print 'no port list file specified'
            pass

        for _ in xrange(port_range[0], port_range[1]):
            q.put((host, _))

        #to stop all processes we have to put STOP to queue and break the loop for each process
        for _ in xrange(nr_processes):
            q.put('STOP')

        for _ in xrange(nr_processes):
            p = Process(target = self.check_port, args = (q, l, port_list))
            p.start()

    def check_port(self, q, l, port_list):
        ''' worker class invoked by process '''
        while True:
            queue_ret = q.get()

            if queue_ret == 'STOP':
                break

            s = socket.socket()

            try:

                s.connect((queue_ret))

                #lock for uncorrupted printing to console
                l.acquire()
                print "[INFO] %s on port %s is open" % (queue_ret)

                if len(port_list) > 1:
                    for i in port_list:
                        if int(i[0]) == int(queue_ret[1]):
                            for _ in i:sys.stdout.write(_ + " ")
                            print "\n\n"

                l.release()
            except socket.error:
                #print "[WARNING] %s on port %s is closed" % (queue_ret)
                pass
            s.close()

if __name__ == "__main__":

    PortScanner(host = 'example.com', port_range = (1, 60), nr_processes = 40, port_list_file = 'port_list.data')


Filed Under (Python, Threads) by Marcin Kuźmiński on July-7-2009

If you’re looking how to start with threads in python i found a great pdf to start with.

Download the Python-Threads pdf. Whitch includes the basic, and some more advanced information and tutorials about the threads in python.

In the pdf you can useful find info about:

  • thread and threading modules
  • queue
  • locks
  • gil
  • events
  • debugging threads


Filed Under (Python, Threads) by Łukasz Balcerzak on July-1-2009

Recently I’ve run into some optimizations problem which I once hoped I wouldn’t had to face. After some research on the subject I decided to split some process using threads. I hadn’t have much experience in that area except some classes at PJIIT (Polish-Japanese Institute of Information Technology) so start was painful… but not for long (as with everything in Python).

So I decided to share my thoughts with everyone who would ever try to use threads in Python programming.

Let’s imagine simple case: you want to check some host if it has opened any port from range 1-1000. We create simple script (use whatever host you like)

import socket
import time

def is_port_open(host, port):
    """
    Takes host param as string and port param as int.
    Returns true if port is open and false otherwise.
    """
    #print "[DEBUG] Checking host %s:%s" % (host, port)
    s = socket.socket()
    is_open = True
    try:
        s.connect( (host, port) )
    except socket.error:
        is_open = False
    s.close()
    return is_open

def main():
    start = time.time()

    host = 'some.host'
    ports = range(1,1000) 

    print "[INFO] Will now check host %s for ports between %s and %s" % \
        (host, ports[0], ports[-1]) 

    for port in ports:
        if is_port_open(host, port):
            print "[INFO] Port %s on host %s is open." % (port, host)

    print "Elapsed Time: %s" % (time.time() - start)

if __name__ == '__main__':
    main()

How long was it? Well, try this then:

import threading
import socket
import time
import Queue

THREAD_NUMBER = 20

def is_port_open(host, port):
    """
    Takes host param as string and port param as int.
    Returns true if port is open and false otherwise.
    """
    #print "[DEBUG] Checking host %s:%s" % (host, port)
    s = socket.socket()
    is_open = True
    try:
        s.connect( (host, port) )
    except socket.error:
        is_open = False
    s.close()
    return is_open

class PortChecker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            host, port = self.queue.get()
            if is_port_open(host, port):
                print "[INFO] Port %s on host %s is open." % (port, host)
            self.queue.task_done()

def main():
    start = time.time()

    host = 'some.host'
    ports = range(1,1000)
    queue = Queue.Queue()

    for i in xrange(THREAD_NUMBER):
        pc = PortChecker(queue)
        pc.setDaemon(True)
        pc.start()

    print "[INFO] Will now check host %s for ports between %s and %s" % \
        (host, ports[0], ports[-1]) 

    for port in ports:
        queue.put( (host, port) )

    queue.join()
    print "Elapsed Time: %s" % (time.time() - start)

if __name__ == '__main__':
    main()

Try with different port range/thread number. Ok, this is just an example. But it should give you an idea how to handle with some process that is done inside a loop. You have to define your “handler” (thread that will process on some object(s)), create empy queue and fill it with objects. Try to use only 1 thread (set THREAD_NUMBER to 1) and look at the time it took to finish whole process.

I know that programmer’s time is much more expensive than processor’s time and I try not to break this rule. But sometimes (i.e. when your script runs for over 12 hours) it is very nice if you can cut the time you can generate results. Just remember to catch exceptions inside threads – it is very important as if exception is risen by thread, it will just die without any notification.

Well, thats it. I will proceed tomorrow with some example with SQLAlchemy. Good night folks.



Filed Under (Python) by Marcin Kuźmiński on June-27-2009

UPDATE: i added two small fixes. When a functions is returning let’s pass the return value further, and exception when there’s no parameters.

I began studying more about decorators. I made a simple one for now it can be used to calculate execution time in miliseconds,seconds,minutes or hours of function decorated. It uses closures with wrapper function for handling parameters. Next time i’ll post decorator for running a function in a background using threads.

Here’s the code:

''' @author: marcink '''

import time
from types import FunctionType

class GetExecutionTime:
    '''
    This generic class is meant to be used as decorator for messuring
    simple method execution time it works on class methods as well
    as regular non class functions ;
    Availible parameters:
        'ms' - miliseconds
        's' - seconds
        'm' - minutes
        'h' - hours
    usage: @GetExecutionTime(param)

    samples:
    @GetExecutionTime('s')
    def samplemethod(self,params):
        function body
    will print executions time at end in seconds

    @GetExecutionTime('m')
    def samplemethod(params):
        function body
    will print executions time at end in minutes

     '''

    def __init__(self, *args):
        #in case of calling without parametrs
        #first argument will be the function decorated raise exception than

        if len(args) is 0 or type(args[0]) is FunctionType:
            raise Exception("Calling decorator without parameters is not possible")
        else:
            self.fn = None
            self.d_as = args[0]

        def miliseconds():return 1
        def seconds(): return 10000000000
        def minutes():return seconds() * 60
        def hours(): return minutes() * 60
        def default_():return seconds()

        #this is a cool way to replace switch case with default parameter
        commands = {'ms':miliseconds,
                    's':seconds,
                    'm':minutes,
                    'h':hours}
        self.display_as = commands.get(self.d_as, default_)()

    def __call__(self,fn):

        display_as = self.display_as
        d_as = self.d_as

        def wrapper_function(*args, **kwargs):
            start = time.time()

            ret = fn(*args,**kwargs)

            end = time.time()
            microsec = (end - start) * 10000000000

            print "function %s execution time: %s %s" % (fn.__name__, (microsec) / display_as, d_as)
            if ret:
                return ret
        return wrapper_function

#EXAMPLES:

from time import sleep

@GetExecutionTime('s')
def short_func(max, min):
    for x in range(3):
        sleep(1)
    print max,min
    return (max, min)

short_func(100, 10)

''' prints: function short_func execution time: 3.00342392921 s '''