Filed Under (Multiprocessing, Python) by Marcin Kuźmiński on July-17-2009

I’d started to play with multiprocessing module that came up with python 2.6. Multiprocessing module is very similar to threading (it has almost the same functions/classes that threading).

Here are by my opinion three advantages over threading.

  • Multiprocessing runs on processes not threads.
  • Overcomes the GIL (global interpreter lock) that threading is using by using sub processes.
  • Processes can be synchronized even remotely so we could write a concurrent calculations over the network

I made a simple example class that uses multiprocessing module to scan ports, just for testing i made the same thing that Lukasz made using threading. This example however is not more efficient than the one presented in http://www.python-blog.com/2009/07/01/python-threaded/ but when we could replace the function check_port with more CPU consuming function we could end up with performance grater by the number of cpus/cores we have. For example in one of my projects recently i made a calculations for popular gambling game in Poland, MultiMulti. I’d made up a calculations of most repeating number in last 50 games. To calculate combination of 9’s over 20’s 50 times with threading i’d got around 1200s with multiprocessing i was able to calculate it in around 700 s with my Core2Duo CPU. I wish i had core2 quad to check the performance :) So i’f you need to make some heavy calculations multiprocessing can give you that performance.

Here’s the code and you can download the port descriptions file port_list to match port with description.

from multiprocessing import Process, Queue, cpu_count, Lock
import socket, sys

class PortScanner(object):
    ''' multiprocessing port scanner with port description'''

    def __init__(self, host = '' , port_range = (1, 100), nr_processes = cpu_count(), port_list_file = ''):
        '''
        port_range=(start,stop) default 1,100
        nr_processes = int default cpu_count() '''
        q = Queue()
        l = Lock()
        port_list = []

        try:
            for i in open(port_list_file).readlines():
                port_list.append([x.strip() for x in i.split('\t')])

        except IOError:
            print 'no port list file specified'
            pass

        for _ in xrange(port_range[0], port_range[1]):
            q.put((host, _))

        #to stop all processes we have to put STOP to queue and break the loop for each process
        for _ in xrange(nr_processes):
            q.put('STOP')

        for _ in xrange(nr_processes):
            p = Process(target = self.check_port, args = (q, l, port_list))
            p.start()

    def check_port(self, q, l, port_list):
        ''' worker class invoked by process '''
        while True:
            queue_ret = q.get()

            if queue_ret == 'STOP':
                break

            s = socket.socket()

            try:

                s.connect((queue_ret))

                #lock for uncorrupted printing to console
                l.acquire()
                print "[INFO] %s on port %s is open" % (queue_ret)

                if len(port_list) > 1:
                    for i in port_list:
                        if int(i[0]) == int(queue_ret[1]):
                            for _ in i:sys.stdout.write(_ + " ")
                            print "\n\n"

                l.release()
            except socket.error:
                #print "[WARNING] %s on port %s is closed" % (queue_ret)
                pass
            s.close()

if __name__ == "__main__":

    PortScanner(host = 'example.com', port_range = (1, 60), nr_processes = 40, port_list_file = 'port_list.data')


Filed Under (Python, Threads) by Łukasz Balcerzak on July-1-2009

Recently I’ve run into some optimizations problem which I once hoped I wouldn’t had to face. After some research on the subject I decided to split some process using threads. I hadn’t have much experience in that area except some classes at PJIIT (Polish-Japanese Institute of Information Technology) so start was painful… but not for long (as with everything in Python).

So I decided to share my thoughts with everyone who would ever try to use threads in Python programming.

Let’s imagine simple case: you want to check some host if it has opened any port from range 1-1000. We create simple script (use whatever host you like)

import socket
import time

def is_port_open(host, port):
    """
    Takes host param as string and port param as int.
    Returns true if port is open and false otherwise.
    """
    #print "[DEBUG] Checking host %s:%s" % (host, port)
    s = socket.socket()
    is_open = True
    try:
        s.connect( (host, port) )
    except socket.error:
        is_open = False
    s.close()
    return is_open

def main():
    start = time.time()

    host = 'some.host'
    ports = range(1,1000) 

    print "[INFO] Will now check host %s for ports between %s and %s" % \
        (host, ports[0], ports[-1]) 

    for port in ports:
        if is_port_open(host, port):
            print "[INFO] Port %s on host %s is open." % (port, host)

    print "Elapsed Time: %s" % (time.time() - start)

if __name__ == '__main__':
    main()

How long was it? Well, try this then:

import threading
import socket
import time
import Queue

THREAD_NUMBER = 20

def is_port_open(host, port):
    """
    Takes host param as string and port param as int.
    Returns true if port is open and false otherwise.
    """
    #print "[DEBUG] Checking host %s:%s" % (host, port)
    s = socket.socket()
    is_open = True
    try:
        s.connect( (host, port) )
    except socket.error:
        is_open = False
    s.close()
    return is_open

class PortChecker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            host, port = self.queue.get()
            if is_port_open(host, port):
                print "[INFO] Port %s on host %s is open." % (port, host)
            self.queue.task_done()

def main():
    start = time.time()

    host = 'some.host'
    ports = range(1,1000)
    queue = Queue.Queue()

    for i in xrange(THREAD_NUMBER):
        pc = PortChecker(queue)
        pc.setDaemon(True)
        pc.start()

    print "[INFO] Will now check host %s for ports between %s and %s" % \
        (host, ports[0], ports[-1]) 

    for port in ports:
        queue.put( (host, port) )

    queue.join()
    print "Elapsed Time: %s" % (time.time() - start)

if __name__ == '__main__':
    main()

Try with different port range/thread number. Ok, this is just an example. But it should give you an idea how to handle with some process that is done inside a loop. You have to define your “handler” (thread that will process on some object(s)), create empy queue and fill it with objects. Try to use only 1 thread (set THREAD_NUMBER to 1) and look at the time it took to finish whole process.

I know that programmer’s time is much more expensive than processor’s time and I try not to break this rule. But sometimes (i.e. when your script runs for over 12 hours) it is very nice if you can cut the time you can generate results. Just remember to catch exceptions inside threads – it is very important as if exception is risen by thread, it will just die without any notification.

Well, thats it. I will proceed tomorrow with some example with SQLAlchemy. Good night folks.