Arcpy, Parallelisation, Python

Using Arcpy with multiprocessing – Part 2

The following Arcpy scripts refer to two GDBs which I have zipped into an archive; click here to download the archive (opens in new tab). If using these scripts from the command line or a Python IDE remember to change the paths to the Feature Classes!

Below is an Arcpy script of the pseudo-code from Part 1:

'''
Step 1 of 4: Example of using Multiprocessing/Parallel Python with Arcpy...

Can be run either:
 1. from the command line/a Python IDE (adjust paths to feature classes, as necessary)
 2. as a Script tool within ArcGIS (ensure 'Run Ptyhon script in Process' is checked when importing)
'''

import arcpy
import time

# Read the parameter values:
#  1: points feature class (string to location, e.g. c:\GIS\Data\points.gdb\points01)
#  2: polygons feature class (string to location)
#  3: search distance for polygons, integer, e.g 500
points_fC = arcpy.GetParameterAsText(0) # required
polygons_fC = arcpy.GetParameterAsText(1) # required
searchDist = arcpy.GetParameterAsText(2) # required

t_start = time.clock()

typeList = [1,2,3,4,5,6] # polygon types to search for...

# ---------------------------------------------------------------------------
## Data extraction
# ---------------------------------------------------------------------------

# run calculation
# if not running from Arc, the input parameters all come out as ''
if '' in [points_fC, polygons_fC, searchDist]:
	arcpy.AddMessage("Resorting to defaults...")
	points_fC = "c:\\Work\\GIS\\Data\\Points.gdb\\points01"
	polygons_fC = "c:\\Work\\GIS\\Data\\Polygons.gdb\\polygons01"
	searchDist = 1000

searchDist = int(searchDist) # convert search distance to integer...

# check all datasets are OK; if not, provide some useful output and terminate
valid_points = arcpy.Exists(points_fC)
arcpy.AddMessage("Points Feature Class: " + points_fC)
valid_polygons = arcpy.Exists(polygons_fC)
arcpy.AddMessage("Polygons Feature Class: " + polygons_fC)
dataCheck  = valid_points & valid_polygons
arcpy.AddMessage("Search Distance: "+str(searchDist) + ' '+str(type(searchDist)))

# Check if Inputs are valid and print message if bad, else start calculation...
if not dataCheck:
	if not valid_points:
		arcpy.AddError("Points database or feature class, %s,  is invalid..." % (points_fC))
	if not valid_polygons:
		arcpy.AddError("Polygons database or feature class, %s,  is invalid..." % (polygons_fC))

else:
	# add fields to Points file
	for polyType in typeList:
		arcpy.AddField_management(points_fC, "polygon_type%s_Sum" % polyType, "DOUBLE")
		arcpy.CalculateField_management(points_fC, "polygon_type%s_Sum" % polyType, 0, "PYTHON")
		arcpy.AddField_management(points_fC, "polygon_type%s_Count" % polyType, "DOUBLE")
		arcpy.CalculateField_management(points_fC, "polygon_type%s_Count" % polyType, 0, "PYTHON")
		arcpy.AddMessage("    Added polygon_type%s_Sum and polygon_type%s_Count fields to Points." % (polyType,polyType))

	# dictionary: pointsDataDict[pointID][Type]=[sum of Polygon type weightings, count of Ploygons of type]
	pointsDataDict = {} 

	for polyType in typeList: # For this specific example
		arcpy.AddMessage("      Calculating for type %s..." % polyType)

		arcpy.MakeFeatureLayer_management(polygons_fC, '%s_%s' % (polygons_fC, polyType))
		arcpy.MakeFeatureLayer_management(points_fC, '%s_%s' % (points_fC, polyType))
		pointsRowList = arcpy.SearchCursor('%s_%s' % (points_fC, polyType))
		
		# for every origin
		for pointRow in pointsRowList: 
			pointID = pointRow.getValue("PointID")

			try:
				pointsDataDict[pointID][polyType] = [0,0]
			except KeyError: # first time within row
				pointsDataDict[pointID] = {}
				pointsDataDict[pointID][polyType] = [0,0]

			# need to iterate Point here...
			arcpy.SelectLayerByAttribute_management('%s_%s' % (points_fC, polyType), 'NEW_SELECTION', '"PointID" = \'%s\'' % pointID)
			arcpy.SelectLayerByLocation_management('%s_%s' % (polygons_fC, polyType), 'INTERSECT', '%s_%s' % (points_fC, polyType), searchDist, 'NEW_SELECTION')
			arcpy.SelectLayerByAttribute_management('%s_%s' % (polygons_fC, polyType), 'SUBSET_SELECTION', '"polyType" = %s' % polyType)
			polygonsRowList = arcpy.SearchCursor('%s_%s' % (polygons_fC, polyType))
			for polygonsRow in polygonsRowList:
				pointsDataDict[pointID][polyType][0] += polygonsRow.getValue("polyWeighting")
				pointsDataDict[pointID][polyType][1] += 1

	# ---------------------------------------------------------------------------
	## Writing data back to points file
	# ---------------------------------------------------------------------------

	arcpy.AddMessage("    Values extracted; writing results to Points...")
	pointsRowList = arcpy.UpdateCursor(points_fC)
	for pointsRow in pointsRowList: # write the values for every point
		pointID = pointsRow.getValue("PointID")
		for polyType in typeList:
			pointsRow.setValue("polygon_type%s_Sum" % polyType, pointsRow.getValue("polygon_type%s_Sum" % polyType) + pointsDataDict[pointID][polyType][0])
			pointsRow.setValue("polygon_type%s_Count" % polyType, pointsRow.getValue("polygon_type%s_Count" % polyType) + pointsDataDict[pointID][polyType][1])

		pointsRowList.updateRow(pointsRow)
		del pointsRow

	del pointsRowList
	# just make sure any locks are cleared...
	del dataCheck, pointID, pointsDataDict, points_fC, polygons_fC, polyType, valid_points, valid_polygons, searchDist, typeList

	arcpy.AddMessage("      ... complete in %s seconds." % (time.clock() - t_start))

Handy tip: I have found that using the arcpy.Exists() function on every input dataset seems to (sometimes) prevent random unexplained failures when running the script within Arc (the kind where running the script repeatedly with identical inputs will mostly work fine, but fail occasionally for no apparent reason).

To be suitable for parallelisation the script must be made modular – that is where the calculation is run within functions, while input/output details are placed outside of the calculation. If you need to know more take a look around on the internet for some more advanced Python tutorials – here is one that has a bit of information on functions: http://www.tutorialspoint.com/python/python_functions.htm.

Brief explanation using an example script:

# someScript.py
n = 10
x = 0
for i in xrange(n):
    x += i
print x
n = 20
x = 0
for i in xrange(n):
    x += i
print x

The part that is run multiple times can be converted into a function. This has many advantages (particularly as things get more complex), primarily: shorter code, as things aren’t replicated, and if you make changes to the function you only have to change it in one place, not every time the piece of code appeared.

# someScript.py
def sumItem(N):
    '''sumItem(N)
    Python module that returns the sum of numbers, up to the specified input integer.
    '''
    sum = 0
    for i in xrange(N):
        sum += i
    return sum

if __name__ == '__main__':
    print sumItem(10)
    print sumItem(20)

The if __name__ == '__main__': block is used to signify parts of the code that are only run if the script is called directly, i.e. python someScript.py. Now our sumItem function can also be imported by other scripts, and (as it is not called directly) the part in the if __name__ == '__main__': block is not run, i.e.:

import someScript

print someScript.sumItem(15)

Now, we can modularise the Arcpy script from earlier giving:

'''
Step 2 of 4: Example of using Multiprocessing/Parallel Python with Arcpy...

Can be run either:
 1. from the command line/a Python IDE (adjust paths to feature classes, as necessary)
 2. as a Script tool within ArcGIS (ensure 'Run Ptyhon script in Process' is checked when importing)
'''

import arcpy
import time

def performCalculation(points_fC, polygons_fC, searchDist, typeList):
	'''performCalculation(pointsFeatureClass, polygonsFeatureClass, searchDistance, typeList, calcPlatform)

	Runs calculation
	'''
	# ---------------------------------------------------------------------------
	## Data extraction
	# ---------------------------------------------------------------------------

	searchDist = int(searchDist) # convert search distance to integer...

	# check all datasets are OK; if not, provide some useful output and terminate
	valid_points = arcpy.Exists(points_fC)
	arcpy.AddMessage("Points Feature Class: "+points_fC)
	valid_polygons = arcpy.Exists(polygons_fC)
	arcpy.AddMessage("Polygons Feature Class: "+polygons_fC)
	dataCheck  = valid_points & valid_polygons
	arcpy.AddMessage("Search Distance: "+str(searchDist)+' '+str(type(searchDist)))

	# Check if Inputs are valid and print message if bad, else start calculation...
	if not dataCheck:
		if not valid_points:
			arcpy.AddError("Points database or feature class, %s,  is invalid..." % (points_fC))
		if not valid_polygons:
			arcpy.AddError("Polygons database or feature class, %s,  is invalid..." % (polygons_fC))

	else:
		for polyType in typeList: # add fields to Points file
			arcpy.AddField_management(points_fC, "polygon_type%s_Sum" % polyType, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Sum" % polyType, 0, "PYTHON")
			arcpy.AddField_management(points_fC, "polygon_type%s_Count" % polyType, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Count" % polyType, 0, "PYTHON")
			arcpy.AddMessage("    Added polygon_type%s_Sum and polygon_type%s_Count fields to Points." % (polyType,polyType))

		# dictionary: pointsDataDict[pointID][Type]=[sum of Polygon type weightings, count of Ploygons of type]
		pointsDataDict = {} 

		for polyType in typeList: # For this specific example
			arcpy.AddMessage("      Calculating for type %s..." % polyType)

			######################################################################
			## START: Calculation intensive part of the script
			######################################################################

			arcpy.MakeFeatureLayer_management(polygons_fC, '%s_%s' % (polygons_fC, polyType))
			arcpy.MakeFeatureLayer_management(points_fC, '%s_%s' % (points_fC, polyType))
			pointsRowList = arcpy.SearchCursor('%s_%s' % (points_fC, polyType))
			for pointRow in pointsRowList:
				pointID = pointRow.getValue("PointID")
				try:
					pointsDataDict[pointID][polyType] = [0,0]
				except KeyError: # first time within row
					pointsDataDict[pointID] = {}
					pointsDataDict[pointID][polyType] = [0,0]
				# need to iterate Point here...
				arcpy.SelectLayerByAttribute_management('%s_%s' % (points_fC, polyType), 'NEW_SELECTION', '"PointID" = \'%s\'' % pointID)
				arcpy.SelectLayerByLocation_management('%s_%s' % (polygons_fC, polyType), 'INTERSECT', '%s_%s' % (points_fC, polyType), searchDist, 'NEW_SELECTION')
				arcpy.SelectLayerByAttribute_management('%s_%s' % (polygons_fC, polyType), 'SUBSET_SELECTION', '"polyType" = %s' % polyType)
				polygonsRowList = arcpy.SearchCursor('%s_%s' % (polygons_fC, polyType))
				for polygonsRow in polygonsRowList:
					pointsDataDict[pointID][polyType][0] += polygonsRow.getValue("polyWeighting")
					pointsDataDict[pointID][polyType][1] += 1

			######################################################################
			## END: Calculation intensive part of the script
			######################################################################

		# ---------------------------------------------------------------------------
		## Writing data back to points file
		# ---------------------------------------------------------------------------

		arcpy.AddMessage("    Values extracted; writing results to Points...")
		pointsRowList = arcpy.UpdateCursor(points_fC)
		for pointsRow in pointsRowList: # write the values for every point
			pointID = pointsRow.getValue("PointID")
			for polyType in typeList:
				pointsRow.setValue("polygon_type%s_Sum" % polyType, pointsRow.getValue("polygon_type%s_Sum" % polyType) + pointsDataDict[pointID][polyType][0])
				pointsRow.setValue("polygon_type%s_Count" % polyType, pointsRow.getValue("polygon_type%s_Count" % polyType) + pointsDataDict[pointID][polyType][1])

			pointsRowList.updateRow(pointsRow)
			del pointsRow

		del pointsRowList
		# just make sure any locks are cleared...
		del dataCheck, pointID, pointsDataDict, points_fC, polygons_fC, polyType, valid_points, valid_polygons, searchDist, typeList

if __name__ == '__main__':
	# Read the parameter values:
	#  1: points feature class (string to location, e.g. c:\GIS\Data\points.gdb\points01)
	#  2: polygons feature class (string to location)
	#  3: search distance for polygons, integer, e.g 500
	pointsFC = arcpy.GetParameterAsText(0) # required
	polygonsFC = arcpy.GetParameterAsText(1) # required
	search_Distance = arcpy.GetParameterAsText(2) # required

	t_start = time.clock()

	type_list = [1,2,3,4,5,6] # polygon types to search for...

	# run calculation
	# if not running from Arc, the input parameters all come out as ''
	if '' in [pointsFC, polygonsFC, search_Distance]:
		arcpy.AddMessage("Resorting to defaults...")
		pointsFC = "c:\\Work\\GIS\\Data\\Points.gdb\\points01"
		polygonsFC = "c:\\Work\\GIS\\Data\\Polygons.gdb\\polygons01"
		search_Distance = 1000
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list)
	else:
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list)

	arcpy.AddMessage("      ... complete in %s seconds." % (time.clock() - t_start))

This in itself doesn’t add much to the process, but the body of the calculation is kept separate from the part of the code that loads the variables from Arc, which means it could be imported by another script – with the variables passed in as the main function, performCalculation(), is called. The most calculation intensive part of the process can likewise be converted into a function, but it is important to be wary of the inputs and outputs of the process. Having multiple processes accessing the same *.GDB at the same time will cause the script to fail. Because of this, models that work in Model Builder or Arcpy cannot necessarily be parallelised; or a reworking of the method/outputs may be necessary…

Above I have highlighted the part of the script that is intensive. This is the bit that we want to pull out into its own function, and will be run in parallel. Note that the part is not just intensive, it is also run repeatedly (in a for loop) and independently (any given iteration does not depend on the result of a previous iteration, for example).

'''
Step 3 of 4: Example of using Multiprocessing/Parallel Python with Arcpy...

Can be run either:
 1. from the command line/a Python IDE (adjust paths to feature classes, as necessary)
 2. as a Script tool within ArcGIS (ensure 'Run Ptyhon script in Process' is checked when importing)
'''

import arcpy
import time

def performCalculation(points_fC, polygons_fC, searchDist, typeList):
	'''performCalculation(pointsFeatureClass, polygonsFeatureClass, searchDistance, typeList, calcPlatform)

	Runs calculation
	'''
	# ---------------------------------------------------------------------------
	## Data extraction
	# ---------------------------------------------------------------------------

	searchDist = int(searchDist) # convert search distance to integer...

	# check all datasets are OK; if not, provide some useful output and terminate
	valid_points = arcpy.Exists(points_fC)
	arcpy.AddMessage("Points Feature Class: "+points_fC)
	valid_polygons = arcpy.Exists(polygons_fC)
	arcpy.AddMessage("Polygons Feature Class: "+polygons_fC)
	dataCheck  = valid_points & valid_polygons
	arcpy.AddMessage("Search Distance: "+str(searchDist)+' '+str(type(searchDist)))

	# Check if Inputs are valid and print message if bad, else start calculation...
	if not dataCheck:
		if not valid_points:
			arcpy.AddError("Points database or feature class, %s,  is invalid..." % (points_fC))
		if not valid_polygons:
			arcpy.AddError("Polygons database or feature class, %s,  is invalid..." % (polygons_fC))

	else:
		for polyType in typeList: # add fields to Points file
			arcpy.AddField_management(points_fC, "polygon_type%s_Sum" % polyType, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Sum" % polyType, 0, "PYTHON")
			arcpy.AddField_management(points_fC, "polygon_type%s_Count" % polyType, "DOUBLE")
			arcpy.CalculateField_management(points_fC, "polygon_type%s_Count" % polyType, 0, "PYTHON")
			arcpy.AddMessage("    Added polygon_type%s_Sum and polygon_type%s_Count fields to Points." % (polyType,polyType))

		# dictionary: pointsDataDict[pointID][Type]=[sum of Polygon type weightings, count of Ploygons of type]
		pointsDataDict = {} 

		for polyType in typeList: # For this specific example
			arcpy.AddMessage("      Calculating for type %s..." % polyType)
			funcOutput = findPolygons(points_fC, polygons_fC, polyType, searchDist) # send jobs to be processed

			if len(pointsDataDict.keys()) == 0: # first output becomes the new dictionary
				pointsDataDict.update(funcOutput)
			else:
				for key in funcOutput: # later outputs should be added per key...
					pointsDataDict[key].update(funcOutput[key])
		del funcOutput

		# ---------------------------------------------------------------------------
		## Writing data back to points file
		# ---------------------------------------------------------------------------

		arcpy.AddMessage("    Values extracted; writing results to Points...")
		pointsRowList = arcpy.UpdateCursor(points_fC)
		for pointsRow in pointsRowList: # write the values for every point
			pointID = pointsRow.getValue("PointID")
			for polyType in typeList:
				pointsRow.setValue("polygon_type%s_Sum" % polyType, pointsRow.getValue("polygon_type%s_Sum" % polyType) + pointsDataDict[pointID][polyType][0])
				pointsRow.setValue("polygon_type%s_Count" % polyType, pointsRow.getValue("polygon_type%s_Count" % polyType) + pointsDataDict[pointID][polyType][1])

			pointsRowList.updateRow(pointsRow)
			del pointsRow

		del pointsRowList
		# just make sure any locks are cleared...
		del dataCheck, key, pointID, pointsDataDict, points_fC, polygons_fC, polyType, valid_points, valid_polygons, searchDist, typeList

############################################################
## New Module - the most calculation intensive part
############################################################
def findPolygons(points_FC, polygons_FC, Type, search_dist):
	funcTempDict = {}
	arcpy.MakeFeatureLayer_management(polygons_FC, '%s_%s' % (polygons_FC, Type))
	arcpy.MakeFeatureLayer_management(points_FC, '%s_%s' % (points_FC, Type))
	pointsRowList = arcpy.SearchCursor('%s_%s' % (points_FC, Type))
	for pointRow in pointsRowList: # for every origin
		pointID = pointRow.getValue("PointID")

		try:
			funcTempDict[pointID][Type] = [0,0]
		except KeyError: # first time within row
			funcTempDict[pointID] = {}
			funcTempDict[pointID][Type] = [0,0]
		# need to iterate Point here...
		arcpy.SelectLayerByAttribute_management('%s_%s' % (points_FC, Type), 'NEW_SELECTION', '"PointID" = \'%s\'' % pointID)
		arcpy.SelectLayerByLocation_management('%s_%s' % (polygons_FC, Type), 'INTERSECT', '%s_%s' % (points_FC, Type), search_dist, 'NEW_SELECTION')
		arcpy.SelectLayerByAttribute_management('%s_%s' % (polygons_FC, Type), 'SUBSET_SELECTION', '"polyType" = %s' % Type)
		polygonsRowList = arcpy.SearchCursor('%s_%s' % (polygons_FC, Type))
		for polygonsRow in polygonsRowList:
			funcTempDict[pointID][Type][0] += polygonsRow.getValue("polyWeighting")
			funcTempDict[pointID][Type][1] += 1

	return funcTempDict

if __name__ == '__main__':
	# Read the parameter values:
	#  1: points feature class (string to location, e.g. c:\GIS\Data\points.gdb\points01)
	#  2: polygons feature class (string to location)
	#  3: search distance for polygons, integer, e.g 500
	pointsFC = arcpy.GetParameterAsText(0) # required
	polygonsFC = arcpy.GetParameterAsText(1) # required
	search_Distance = arcpy.GetParameterAsText(2) # required

	t_start = time.clock()

	type_list = [1,2,3,4,5,6] # polygon types to search for...

	# run calculation
	# if not running from Arc, the input parameters all come out as ''
	if '' in [pointsFC, polygonsFC, search_Distance]:
		arcpy.AddMessage("Resorting to defaults...")
		pointsFC = "c:\\Work\\GIS\\Data\\Points.gdb\\points01"
		polygonsFC = "c:\\Work\\GIS\\Data\\Polygons.gdb\\polygons01"
		search_Distance = 1000
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list)
	else:
		performCalculation(pointsFC, polygonsFC, search_Distance, type_list)

	arcpy.AddMessage("      ... complete in %s seconds." % (time.clock() - t_start))

To summarise: the most calculation intensive part has been converted into a function, findPolygons(), which takes the Points and Polygons Feature Classes, the Type of polygon it is searching for and the search distance. The function returns the data in a Python dictionary, and it is written back to the origins all at once, avoiding the case of multiple processes attempting to write to the same workspace. The update() dictionary method just adds new keys to the existing dictionary. Dictionaries have a lot of other useful features that can make intermediate data processing much easier, but the actual process can be very application-specific.

Advertisements

9 thoughts on “Using Arcpy with multiprocessing – Part 2

  1. Hello there,

    I stumbled across your blog while working through the ESRI Python Forum.

    I applaud your blog, as I think that many GIS specialists and professionals tentatively use Python for basic tasks, but do not get as involved as they should. Python in itself is an almost indescribably powerful language, and I find it unfortunate that more people don’t use some of the more interesting components of the language (OOP, dictionaries / sets / etc. and the methods associated with built-in types, the Standard Library, etc.)

    I’m working right now on a conflation tool to link the directions of OSM with an internally-produced street network, and I’m going to see if I can’t properly implement this in multiprocessor form using your structure here as a basis. Keep up the good work!

    Cheers,

  2. Hi Stacy,
    I came across your post “Using Arcpy with multiprocessing” yesterday. I am working on a walkability index calculation tool through automating network analysis workflow using python scripting. Your sample script seems to be something to start with, which is wonderful. Actually, I was working on a lidar data processing tool back in 2013 using open source modules instead of ESRI arcpy. In order to boost the performance, I had to break an individual .las file (with up to 6 million points in it possibly) into smaller chunks. I wish I read your articles back then.
    Anyway, where are those sample data? “The following Arcpy scripts refer to two GDBs which I have zipped into an archive; it can be downloaded from the right hand side of the page.” I would like to take a look at the data while I am going through your sample script. Many thanks!
    Alex

    1. Hey Alex, there used to be a sidebar on the right hand side that had the files for download. I am travelling at the moment and don’t have access to a computer, but can’t see the sidebar when accessing by phone, and can’t seem to check/change the settings from my phone! I will try again tonight, otherwise it might be a few weeks until I can get hold of a computer again, sorry!

      You should get a notification when I comment, so I will note here once it is done.

    2. Hey Alex, I have looked into this, and can’t do anything about it via mobile, sorry! I will try once I get hold of a computer, in a few weeks.

    3. Hey Alex, FYI – it is the white box on the right hand side of the Page that is used for the download. It should have an icon in it for the file that you can download, but the icon isn’t there! I don’t know why it isn’t working, it may be that the back end Web hosting service is having some temporary (or maybe permanent) issues! As I said, will look into it when I can!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s