Arcpy, Parallelisation, Python

Using Arcpy with multiprocessing – Part 3

The following post builds upon the script and methods developed in Part 1 and Part 2, so read them first!

The purpose of this series is not to give you a one line example that you can copy and paste to your code, but step through the process and make the underlying principles clear. Please note that Esri also has a blog post describing use of Python’s Multiprocessing library with Arcpy (in particular Spatial Analyst) which did not work for me in a complex situation involving Network Analyst, but is worth checking out if you are doing intensive tasks with Spatial Analysis.

The following Arcpy scripts refer to two GDBs which I have zipped into an archive; click here to download the archive (opens in new tab). If using these scripts from the command line or a Python IDE remember to change the paths to the Feature Classes!

Now that the most calculation intensive part of the process has been made into a parallelisable function we can actually implement the multiprocessing bit. Here I will describe basic use cases for two multiprocessing libraries; the inbuilt Python Multiprocessing, and Parallel Python. I include both as neither seems to work for all situations; Parallel Python has caused problems when ArcGIS extensions are checked out and I cannot get Multiprocessing to work when geocoding addresses. If you have odd problems with one library I would suggest trying the other.

Both libraries can use a Python list as the data structure to which jobs are appended, and from which the result is extracted. Each job is processed in turn by the first available worker (CPU/core). So, if there are six jobs in the list on a system with two CPUs, the first two jobs will start near simultaneously, then as each CPU finishes its task and becomes free it will start working on the next job in the list. This is known as asynchronous processing, as the order and timing of completion are unimportant. Both methods can use all available CPUs on the system, or they can be limited to a predefined number (see below), or they may be limited by the method itself – in this example, as there are only 6 Polygon types, a maximum of only 6 CPUs would be used.

'''
Step 4 of 4: Example of using Multiprocessing/Parallel Python with Arcpy...

Can be run either:
 1. from the command line/a Python IDE (adjust paths to feature classes, as necessary)
 2. as a Script tool within ArcGIS (ensure 'Run Ptyhon script in Process' is NOT checked when importing)

 The Parallel Python library must be installed before it can be used.
'''

import arcpy
import multiprocessing
import time
try:
	import pp
	forceMP = False
except ImportError:
	forceMP = True

def performCalculation(points_fC, polygons_fC, searchDist, typeList, calcPlatform_input=None):
	'''performCalculation(pointsFeatureClass, polygonsFeatureClass, searchDistance, typeList, calcPlatform)

	All inputs are specific.

	calcPlatform is either 'mp', to use the inbuilt Multiprocessing library, or 'pp', to use Parallel Python (if it exists, otherwise defaults to mp)

	'''

	# ---------------------------------------------------------------------------
	## Pre-calculation checks
	# ---------------------------------------------------------------------------

	# Check calculation platform is valid, or resort to default...
	defaultCalcTpye = 'mp'
	calcTypeExplainDict = {'pp':'Parallel Python', 'mp':'Multiprocessing'}
	if forceMP: # unable to import Parallel Python library (it has to be installed seperately)
		arcpy.AddMessage("   WARNING: Cannot load Parallel Python library, forcing Multiprocessing library...")
		calcPlatform = 'mp'
	elif (calcPlatform_input not in ['mp', 'pp']) or (calcPlatform_input == None): # Invalid/no input, resort to default
		arcpy.AddMessage("   WARNING: Input calculation platform '%s' invalid; should be either 'mp' or 'pp'. Defaulting to %s..." % (calcPlatform_input, calcTypeExplainDict[defaultCalcTpye]))
		calcPlatform = defaultCalcTpye
	else:
		calcPlatform = calcPlatform_input

	# ---------------------------------------------------------------------------
	## Data extraction (parallel execution)
	# ---------------------------------------------------------------------------

	searchDist = int(searchDist) # convert search distance to integer...

	# check all datasets are OK; if not, provide some useful output and terminate
	valid_points = arcpy.Exists(points_fC)
	arcpy.AddMessage("Points Feature Class: "+points_fC)
	valid_polygons = arcpy.Exists(polygons_fC)
	arcpy.AddMessage("Polygons Feature Class: "+polygons_fC)
	dataCheck  = valid_points & valid_polygons

	# Check if Inputs are valid and print message if bad, else start calculation...
	if not dataCheck:
		if not valid_points:
			arcpy.AddError("Points database or feature class, %s,  is invalid..." % (points_fC))
		if not valid_polygons:
			arcpy.AddError("Polygons database or feature class, %s,  is invalid..." % (polygons_fC))

	else:
		for type in typeList: # add fields to Points file
			arcpy.AddField_management(points_fC, "polygon_type%s_Sum" % type, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Sum" % type, 0, "PYTHON")
			arcpy.AddField_management(points_fC, "polygon_type%s_Count" % type, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Count" % type, 0, "PYTHON")
			arcpy.AddMessage("    Added polygon_type%s_Sum and polygon_type%s_Count fields to Points." % (type,type))

		# dictionary: pointsDataDict[pointID][Type]=[sum of Polygon type weightings, count of Ploygons of type]
		pointsDataDict = {}
		# jobs are added to this list, then processed by the available workers (CPUs)
		jobs = [] 

		if calcPlatform == 'mp':
			arcpy.AddMessage("    Utilising Python Multiprocessing library:")
			# initate the processing pool - use all available resources
			pool = multiprocessing.Pool()
			## pool = multiprocessing.Pool(2) # Example: limit the processing pool to 2 CPUs...
			for type in typeList:
				arcpy.AddMessage("      Passing %s to processing pool..." % type)
				# send jobs to be processed
				jobs.append(pool.apply_async(findPolygons, (points_fC, polygons_fC, type, searchDist))) 

			# collect results from the job server (waits until all the processing is complete)
			for job in jobs:
				# first output becomes the new dictionary
				if len(pointsDataDict.keys()) == 0:
					pointsDataDict.update(job.get())
				else:
					# later outputs should be added per key...
					for key in job.get():
						pointsDataDict[key].update(job.get()[key])
			del jobs

		elif calcPlatform == 'pp':
			ppservers=()
			# initate the job server - use all available resources
			job_server = pp.Server(ppservers=ppservers)
			## job_server = pp.Server(2, ppservers=ppservers) # Example: limit the processing pool to 2 CPUs...
			arcpy.AddMessage("    Utilising Parallel Python library:")
			for type in typeList: # For this specific example, it would only initate three processes anyway...
				arcpy.AddMessage("      Passing %s to processing pool..." % type)
				# send jobs to be processed
				jobs.append(job_server.submit(findPolygons, (points_fC, polygons_fC, type, searchDist), (), ("arcpy",))) 

			# collect results from the job server (waits until all the processing is complete)
			for job in jobs:
				if len(pointsDataDict.keys()) == 0: # first output becomes the new dictionary
					pointsDataDict.update(job())
				else:
					for key in job():
						pointsDataDict[key].update(job()[key]) # later outputs should be added per key...
			del jobs

		# ---------------------------------------------------------------------------
		## Writing data back to points file
		# ---------------------------------------------------------------------------

		arcpy.AddMessage("    Values extracted; writing results to Points...")
		pointsRowList = arcpy.UpdateCursor(points_fC)
		for pointsRow in pointsRowList:
			pointID = pointsRow.getValue("PointID")
			for type in typeList:
				pointsRow.setValue("polygon_type%s_Sum" % type, pointsRow.getValue("polygon_type%s_Sum" % type) + pointsDataDict[pointID][type][0])
				pointsRow.setValue("polygon_type%s_Count" % type, pointsRow.getValue("polygon_type%s_Count" % type) + pointsDataDict[pointID][type][1])

			pointsRowList.updateRow(pointsRow)
			del pointsRow

		del pointsRowList
		# just make sure any locks are cleared...
		del calcPlatform, calcPlatform_input, calcTypeExplainDict, dataCheck, defaultCalcTpye, job, key, pointID, pointsDataDict, points_fC, polygons_fC, type, valid_points, valid_polygons, searchDist, typeList

def findPolygons(points_FC, polygons_FC, Type, search_dist):
	funcTempDict = {}
	arcpy.MakeFeatureLayer_management(polygons_FC, '%s_%s' % (polygons_FC, Type))
	arcpy.MakeFeatureLayer_management(points_FC, '%s_%s' % (points_FC, Type))
	pointsRowList = arcpy.SearchCursor('%s_%s' % (points_FC, Type))
	for pointRow in pointsRowList: # for every origin
		pointID = pointRow.getValue("PointID")

		try:
			funcTempDict[pointID][Type] = [0,0]
		except KeyError: # first time within row
			funcTempDict[pointID] = {}
			funcTempDict[pointID][Type] = [0,0]

		arcpy.SelectLayerByAttribute_management('%s_%s' % (points_FC, Type), 'NEW_SELECTION', '"PointID" = \'%s\'' % pointID)
		arcpy.SelectLayerByLocation_management('%s_%s' % (polygons_FC, Type), 'INTERSECT', '%s_%s' % (points_FC, Type), search_dist, 'NEW_SELECTION')
		arcpy.SelectLayerByAttribute_management('%s_%s' % (polygons_FC, Type), 'SUBSET_SELECTION', '"polyType" = %s' % Type)
		polygonsRowList = arcpy.SearchCursor('%s_%s' % (polygons_FC, Type))
		for polygonsRow in polygonsRowList:
			funcTempDict[pointID][Type][0] += polygonsRow.getValue("polyWeighting")
			funcTempDict[pointID][Type][1] += 1

	return funcTempDict

if __name__ == '__main__':
	# Read the parameter values:
	#  1: points feature class (string to location, e.g. c:\GIS\Data\points.gdb\points01)
	#  2: polygons feature class (string to location)
	#  3: search distance for polygons, integer, e.g 500
	#  4: calculation type ('mp' to use Multiprocessing library, 'pp' to use Parallel Python library (if available, otherwise defaults to mp))
	pointsFC = arcpy.GetParameterAsText(0) # required
	polygonsFC = arcpy.GetParameterAsText(1) # required
	search_Distance = arcpy.GetParameterAsText(2) # required
	calcType = arcpy.GetParameterAsText(3) # optional (will default to MP)

	t_start = time.clock()

	type_list = [1,2,3,4,5,6] # polygon types to search for...

	# run calculation
	if '' in [pointsFC, polygonsFC, search_Distance]:# if not running from Arc, the input parameters all come out as ''
		pointsFC = "c:\\Work\\GIS\\Data\\Points.gdb\\points01"
		polygonsFC = "c:\\Work\\GIS\\Data\\Polygons.gdb\\polygons01"
		search_Distance = 1000
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list)
	else:
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list, calcType)

	arcpy.AddMessage("      ... complete in %s seconds." % (time.clock() - t_start))

When importing to use it as an Arc script tool, ‘Run Python script in Process’ must be unchecked. If you don’t do this, the script will try to open new instances of ArcMap/ArcCatalog and fail.

Execution times:
On my computer, the scripts posted in Part 2 all take about 300 seconds to execute if run from the command line, and about 400 seconds if run as a tool within ArcMap or ArcCatalog. The script above takes about 170 seconds to execute if run from the command line, and 340 seconds if run as a tool within ArcMap or ArcCatalog. The first set of processes through the queue (i.e. two if two CPUs are being used, four if four CPUs are being used) are slow, but later iterations run a lot faster. This is called overhead, and can often mean that parallelised operations will not run as fast as expected (i.e. going from one process to four processes does not speed up the total execution time by a factor of four). The overall effect of overhead obviously reduces as the number of iterations increases.

Feel free to ask questions or point out improvements with regard to any of these posts. The following is a list of tips/issues I have noticed with regards to Arcpy scripting, and multiprocessed applications in particular (note: multiporcessing refers to the general concept, Multiprocessing refers to the inbuilt Python library).

Notes on using multiprocessing:

  1. Creating and debugging complex Arcpy scripts is a real pain in the ass, and it is even worse when you are using multiprocesssing. I wouldn’t recommend doing it unless it is really necessary and you will be able to use the code extensively in the future. Errors are a little harder to debug, as error message output isn’t produced until the job result is being accessed.
  2. Python Multiprocessing vs. Parallel Python:There doesn’t seem to be much speed difference between the two, however:
    1. Parallel Python requires an extra package to be installed (Multiprocessing is included with Python)
    2. Parallel Python will fail if you use arcpy.addMessage() or arcpy.addError() within the parallelised function (but you can just pass back a string and print it from the result)
    3. Parallel Python has issues if you are using tools that must be ‘Checked Out’ before execution (such as Network Analyst)
    4. I have not been able to get the Multiprocessing library to work with Geocoding
  3. arcpy.Exists() is practically essential – it should be used to check every input dataset. Not just because it will catch input errors, but without it, Arc will regularly fail to do basic operations for no apparent reason (I assume this is related to data locks). It may be that checking for existence forces a refresh that clears locks, but it is certainly not foolproof either.
  4. Try…Except– I have found that using these statements, as ESRI seem to like doing in the Arcpy help/tutorials where they wrap the entire script in a block, will often cause the script to hang indefinitely when running from Arc. The best thing to do is test rigorously in Python (passing the paths as strings and printing them), where you get useful outputs, then try the script out in Arc. If it ran fine directly, but has problems at this stage, make sure to check that the paths coming out of Arc are what you were actually expecting by using print.
  5. Cancelling a multiprocessing script running in Arc does not kill all the Python processes – unlike hitting Ctrl-C when running from the command line. Use task manager to find them all and end them.

If you are doing Python development, you may be interested in my Windows Dev Stack, which describes my development environment from high level technologies down to specific apps, and how they all work together.

13 thoughts on “Using Arcpy with multiprocessing – Part 3

  1. A very useful article, will definitely be using the multiprocessing aspect of Python in ArcGIS in future numbering crunching projects, thanks!

  2. StacyR, great blog post. I have just started doing this myself and came across your blog. I’m using PP for the parallel tool, as I am interested in running across multiple machines. For some reason, when submitting a job to the server, PP ends early (i.e. the work is not carried out), but without any error when used on local machine only. When using another PC as a node, it results in a threading error – I intend to post that on the forum.

    ArcGIS and all the developer stuff is installed and tested for each node. Importing arcpy in the script is not the problem, rather when passing “arcpy” as a module dependency when submitting a job. I am using PP exactly the way you have, so it’s a bit perplexing. Did you encounter any similar problems along the way?

    import pp, time, math, arcpy

    print “test code for parallel python”

    def factorialpp(n):
    f_ans = math.factorial(n)
    return f_ans

    ppservers = (“172.22.0.251″,)
    #ppservers = ()
    job_server = pp.Server(ppservers=ppservers, secret=”squirrel”)

    start_time = time.time()

    i = 1
    inputs_list = []
    while i <= 10000:
    inputs_list.append(i)
    i = i + 1

    inputs_tuple = tuple(inputs_list)
    jobs = [(input, job_server.submit(factorialpp,(input,), (), ("math","time","arcpy"))) for input in inputs_tuple]

    duration = time.time() – start_time

    print "done"
    print "time taken:", duration, "seconds"

    job_server.print_stats()

    1. Hi Matt,

      I had a bit of trouble with PP and Arc, which is why I use Multiprocessing. However, I have only ever had to do stuff on one machine! The trouble I had was with extensions (i.e. Network Analyst).

      Not sure exactly what or why it would be causing problems, but Arc is a big messy thing. When importing arcpy it loads a bunch of DLLs, etc. As you are importing Arcpy on one node then passing the processing to another, the Arcpy package is probably not being passed the way the other imports are…

      My only idea would be to take the import arcpy out of the imports statement, and put it within your function, like so:

      import pp, time, math

      print “test code for parallel python”

      def factorialpp(n):
      import arcpy
      f_ans = math.factorial(n)
      return f_ans

      # etc., etc.

      This is so that it is actually being imported on the target machine, not just attempting to pass the imported library…

      If you need arcpy prior to/after the function as well there is no reason you could not leave it where it was, thus having it in both places. I believe if it has already been imported on a machine, importing again doesn’t actually occur (i.e. takes no time)…

      See what happens if you just make that change. If it still has problems, you could try not passing arcpy when submitting to the job server, i.e.:

      jobs = [(input, job_server.submit(factorialpp,(input,), (), (“math”,”time”))) for input in inputs_tuple]

      Let me know how you get on!

  3. Hi StacyR, great blog and it gave me a lot to think about… I’m rather new to python scripting but I have been trying to implement multiprocessing in my scripts without success unfortunatley (see http://stackoverflow.com/questions/8419489/multiprocessing-attribute-error).

    I did see that you chose to use the pool statement before def findPolygons which is interesting as I applied it in the other order as suggested by ESRI at http://blogs.esri.com/Dev/blogs/geoprocessing/archive/2011/08/29/Multiprocessing.aspx.

    Nonetheless I appear to be a dead endwith my problem as it will not resolve itself – I work with a lot of raster datasets and if I could use multiple cpu’s that would most definatley speed up things.

    Sincerely,
    Bjorn

    1. Bjorn,

      Your code looks a little more complicated than it needs to be, however it ran OK when I simply copied what you had (and changed the contents of project() to some generic math).

      When I tested it with actual ProjectRaster I realised that you don’t specify an output location, and if you are using pool.map() you can only pass a single argument to the function. Consequently, I had to change the code around to allow two things (input and output paths) to be passed to project()… To do this I used the job server notation that was in my multiprocessing post.

      I also took out the unnecessary function main() that you had – scripted things will run just fine from within the if __name__ == ‘__main__’: statement. This code works perfectly for me (tested with 5 reasonably large rasters):

      import os, multiprocessing, arcpy

      def project(rasterin, rasterout):
      arcpy.ProjectRaster_management(rasterin, rasterout,#…
      # just return something; will print once done
      return ‘converted: %s and made output: %s’ % (rasterin,rasterout)

      if __name__ == ‘__main__’:
      workspace = r’d:\Temp\dems’
      arcpy.env.workspace = workspace

      fcs = arcpy.ListRasters(‘*’)

      pool = multiprocessing.Pool()
      jobs = []

      # iterate through features, add input and output paths
      for fc in fcs:
      rIn = os.path.join(workspace, fc)
      rOut = os.path.join(r’d:\Temp\demsOut’, fc)

      jobs.append(pool.apply_async(project, (rIn, rOut)))

      for job in jobs:
      print job.get()

    2. Thanks StacyR for your advice!

      I actually did get my original example to work by strangely using the same code but running it through the IDLE (Python GUI) instead of the python shell execution in Wing IDE!?!?

      I have however, as suggested, modifed it to implement the pool.apply_async code as opposed to the pool.map so that I can utilize several arguments. I still need to do some tweeking to my script but it is utilizing all cores fluctuating between 10 to 99% CPU.

      Bjorn

  4. Hi Stacy R, just came across your blog. Thanks for all the detail and effort. There’s very little material out there on multiprocessing and arcpy. I have an arcpy script which I’ve successfully setup to run with multiprocessing. However, it only runs successfully from a command prompt. I can’t get it to run from a script tool. I’ve unchecked “run in process”. It runs but then hangs or crashes at odd places. One of my processes just won’t run from the script tool but runs fine from cmd prompt. When I comment that out it tells me a SA tool is not licensed when I know it is. Have you had success running multiprocessing scripts from a script tool within an ArcGIS toolbox?

    1. Hi Nils,

      I found that getting complex scripts to run in Arc as tools was not really worth the effort of trying to get them to run, so I don’t bother with that any more!

      I have got multiprocessing scripts to run within ArcGIS (i.e. where you click on them, enter the inputs and click run), but I have never tried them as tools within models.

      If you really need to get it to run as a tool I suggest going through the inputs with a fine tooth comb, i.e. the inputs you enter into the Arc dialog box when running the script often don’t come through exactly as you might expect. So, what you might need to do is use arcpy.AddMessage(input) for all of your inputs, run it from the command line and use printscreen so you know exactly what they are. Then run it from Arc as a normal tool and check that they are identical. Hopefully this will help you locate the problem!

      If not, try posting in the ArcGIS Python forum (http://forums.arcgis.com/forums/117-Python), there are some really knowledgeable people there.

    2. Thanks Stacy, I’ve got my script stripped down to 3 commands. I’ve totally removed the multiprocessing functionality for testing. Basically, the script work when “run in process” is checked and fails on the first spatial analyst function when “run in process is unchecked”. I’m beginning to thing spatial analyst functions will not work at all with “run in process” unchecked.

  5. I’m sorry, but this example is needlessly convoluted. You spend most of the time going over your vector analysis rather than actually demonstrating how to use the multiprocessing capabilities. Then you muddle the picture with two different approaches to accomplish the same thing. ESRI’s 21-line example is much more concise and easier to follow: http://blogs.esri.com/esri/arcgis/2011/08/29/multiprocessing/.

    1. Hi Fakey Fakerton, thanks for the comment.

      The purpose of this post is to describe how to use Multiprocessing with a practical example. Expanding upon the 21-line example provided by ESRI did not work for me when doing complex analysis (involving Network Analyst). This method does. I watered it down to do a simple vector analysis to protect IP contained in the original code. The three posts together also describe the process of developing a given analysis to be parallelisable. Presumably you are using Spatial Analyst in your work? The ESRI post describes multiprocessing with Spatial Analyst, and if their method actually works for your application, then I suggest you use it.

      As described in the post Multiprocessing works in some applications but not in others, and Parallel Python works in some applications but not others. I have included basic, equivalent, use cases for both libraries using asynchronous multiprocessing.

      I have added a link to the ESRI post in my posts, as well as some improved description as to why I include both Parallel Python and Multiprocessing.

  6. Hi Starcy, first I would like to thank you for your posts. They are of great help. And I have a question concerning the multiprocessing. Does it need more licenses of ArcGIS (one lincese for each processing)? Previously I opened two ArcGIS application in the same computer and tried to run the same tool. However, it doesn’t allow me to run the same tool simultaneously. The other application only gets to work on the tool when the first application finishes the working based on the same tool. Will multiprocessing face the same problem?

Leave a reply to Nils Babel (@nbabel) Cancel reply