# This live demo was first used at the Berkeley MSRI workshop on Interactive Parallel Computing
# on January 29th. It requires IPython/IPython1 and its dependencies. Information about installing
# these things can be found at this URL:
#
# http://ipython.scipy.org/moin/Parallel_Computing
#
# Once installation is done:
#
# 1. Do ipcluster -n 2 to start 2 engines and a controller
# 2. Type the following commands into an interactive IPython session
# 3. You can also run the entire script by typing: irunner MSRIdemo.ipy
#
# please email question/comments to Brian Granger
#
# The tags are meant to be used with IPython's demo capabilities.
# silent
import numpy
%autoindent off
# --- stop ---
# Creating a RemoteController instance.
import ipython1.kernel.api as kernel
rc = kernel.RemoteController(('127.0.0.1',10105))
# --- stop ---
# Get the IDs of the connected engines
print rc.getIDs()
# --- stop ---
# Running python commands: blocking mode
rc.executeAll('a=5') # On all engines
rc.executeAll('import math')
rc.execute([0, 1], 'c=a*math.pi; print c') # Only on engines [0, 1]
# --- stop ---
# Running python commands: non-blocking mode
rc.executeAll('import time')
rc.executeAll('time.sleep(10)',block=False) # Submit and return immediately
# --- stop ---
# Another non-blocking example
# Don't block
rc.execute(0, 'y = sum(x**100 for x in xrange(100000))',block=False) # Submit this
print "Execute has been called in non-blocking mode." # Print this
rc.execute(0, 'print y', block=True) # Then block
# --- stop ---
# Parallel Magic Commands
# First make everything non-blocking
rc.block = False
# %px foo ~ rc.executeAll('foo')
%px import numpy
%px a = numpy.random.rand(4,4)
%px print numpy.linalg.eigvals(a)
# --- stop ---
# Get the last result
%result
# --- stop ---
# More magic
# %pn 0 foo ~ rc.execute(0, 'foo')
%pn 0 print numpy.linspace(0,1.0,10)
%pn 1 print numpy.random.rand(2,2)
# --- stop ---
# Get the last result
%result
# --- stop ---
# Auto Magic
# %autopx makes everything parallel
%autopx
max_evals = []
for i in range(100):
a = numpy.random.rand(10,10)
a = a + a.transpose()
evals = numpy.linalg.eigvals(a)
max_evals.append(evals[0].real)
%autopx
# --- stop ---
# See what the result was
rc.block=True
%px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)
# --- stop ---
# Moving Python objects around
a = 1.035345345 # Local a
rc.pushAll(a=a) # Send a as a to all engines
%px print a
local_a = rc.pullAll('a') # Pull a on all engines back
print local_a # Print local copy
# --- stop ---
# Push and pull can also take engine ids
rc.push(0, c=numpy.random.rand(2,2)) # Push 2x2 random matrix to 0 as c
# Take transpose
%pn 0 c = c.transpose()
local_c = rc.pull(0, 'c') # Pull back
print local_c # Print local copy
# --- stop ---
# Dictionary Interface
rc['d'] = 3564567 # Like rc.pushAll(d=3564567)
%px print d
print rc['d'] # Like rc.pullAll('d')
# --- stop ---
# Array/Dictionary Interface
# Remote controller objects have both a dictionary
# and array interface that can be combined:
for i in range(2):
rc[i]['id'] = i
%px print id
# --- stop ---
# Scatter/Gather
rc.scatterAll('a', range(8)) # Partition and distribute [0,...,8]
%px print a
local_a = rc.gatherAll('a') # Gather partitions back
print local_a
# --- stop ---
# Parallelized map
# Apply x**10 element by element
%time r1 = map(lambda x: x**10, range(100000))
# Do it in parallel (scatter/map/gather)
%time r2 = rc.mapAll('lambda x: x**10', range(100000))
print sum(r1), sum(r2)
# --- stop ---
# Let's compute pi
from __future__ import division
def pi(n):
"""Compute pi using n terms of Wallis' product.
pi(n) = 2 \prod_{i=1}^{n}\frac{4i^2}{4i^2-1}."""
num = 1
den = 1
for i in xrange(1,n+1):
tmp = 4*i*i
num *= tmp
den *= tmp-1
return 2.0*(num/den)
%time pi(5000)
# --- stop ---
# silent
rc.block = False
%autopx
def wpi_nd(range_spec):
n1,n2 = range_spec
num = 1
den = 1
for i in xrange(n1,n2):
tmp = 4*i*i
num *= tmp
den *= tmp-1
return num,den
%autopx
# Local functions
def wpi_nd(range_spec):
n1,n2 = range_spec
num = 1
den = 1
for i in xrange(n1,n2):
tmp = 4*i*i
num *= tmp
den *= tmp-1
return num,den
def part_range(n1,n2,nchunks):
"""Partition a range specification in nchunks"""
size,rem = divmod(n2-n1,nchunks)
sizes = [size]*nchunks
while rem > 0:
for i in range(nchunks):
sizes[i] += 1
rem -= 1
if rem == 0:
break
ranges = []
start=n1
for size in sizes:
ranges.append((start,start+size))
start += size
return ranges
rc.block = True
# --- stop ---
# Parallelized calculation of pi
def par_pi(n, rc):
"""Compute pi using n terms of Wallis' product.
pi(n) = 2 \prod_{i=1}^{n}\frac{4i^2}{4i^2-1}.
Parallel version."""
# How many partitions should we use
num_engines = len(rc.getIDs())
# Compute partial product on each engine
m = rc.mapAll('wpi_nd',part_range(1,n+1,num_engines))
# Compute total product of num, den
num,den = reduce(lambda x,y:(x[0]*y[0],x[1]*y[1]),m)
return 2.0*(num/den)
%time pi(5000)
%time par_pi(5000, rc)
# --- stop ---
# A larger calculation
%time pi(10000)
%time par_pi(10000, rc)
# --- stop ---
# Parallel list comprehensions
rc.scatterAll('a', range(100000)) # Scatter
%px result = [2*x for x in a]
local_result = rc.gatherAll('a') # Gather to assemble result
print local_result[0:100]