Arcpy, Geocoding, Parallelisation

Parallelised Composite Geocoding script

The code below is an implementation of a Composite Address Locator for geocoding that has multiprocessor parallelisation capabilities and can be resumed if interrupted. I initially developed it because Composite Locators run from Arcpy scripts would crash in ArcGIS 10.1. Documentation of the inputs is provided in the docstring of the parallelCompositeGeocode module. Inputs are modified within the if __name__ == ‘__main__’: indented block at the bottom of the script.

Notes:

  • the script will not modify or destroy inputs, but will overwrite output locations and delete some of its own temporary files
  • Parallel Python is required if you wish to use multiple CPUs (parallelising)
  • if parallelising, the number of processes should ideally be set equal to or slightly less than the number of logical CPU cores (note that a CPU with two physical cores and HyperThreading has four logical cores)
  • the script makes use of the Data Access module, and consequently will only work with ArcGIS 10.1 and above
  • the script can accept any number of input address fields, but these fields must correspond to fields accepted by the address locators
  • all address locators must accept the same input fields
  • there is no type or existence checking on the inputs, so any errors will raise an Exception
  • a WindowsError: [Error 32] exception indicates that a temporary or output GDB is open in ArcMap or ArcCatalog; it may be necessary to exit these programs and use Task Manager to kill any rogue Arc processes, including AppROT.exe

Please leave a comment if you find this script useful, if you have any issues with it (it has not been thoroughly tested), or you think the documentation could be improved.

import arcpy
import time
import os
import shutil

def parallelCompositeGeocode(inputGDBpath, inputFCname, inputAddressFields, addressLocators, locatorFields, completedLocators, outputFolder, outputGDBname, outputFCname, outputType, outputSRS, tempFolder, nProcessesTxtPath, nProcessesDefault, numSplits):

	'''
	parallelCompositeGeocode(inputGDBpath, inputFCname, inputAddressFields, addressLocators, locatorFields, completedLocators, outputFolder, outputGDBname, outputFCname, outputType, outputSRS, tempFolder, nProcessesTxtPath, nProcessesDefault, numSplits)

	Inputs:
		- inputGDBpath <str>
			Full or relative path to the input GDB, includes the GDB name.
			Example: 'C:\\test\\Addresses.gdb'

		- inputFCname <str>
			Name of the input addresses Feature Class.
			Example: 'addresses'

		- inputAddressFields <tuple> of field names <str>
			Field names of address fields in the input feature class.
			!! These fields MUST correspond to the fields accepted by the locators.
			Example: ('streetAddress', 'locality', 'town')

		- addressLocators <tuple> of <tuple> of <str>
			Chained address locators, will be run in the order provided.
			The first part of sub-tuple specifies path to locator, second part defines file name of the locator.
			Example: (('C:\\locators', 'locator1'), ('C:\\locators', 'locator2'))

		- locatorFields <tuple> of field names <str>
			Fields used by the address locators. Can be adjusted in the locator properties under Input Address Fields.
			!! These MUST correspond to (same number of, and referring to the same information as) the inputAddressFields input.
			!! All locators MUST accept at least this set of fields.
			Example: ('Address', 'Suburb', 'Zone')

		- completedLocators <tuple>
			Names of completed locators that should not be re-run (useful if adding an additional locator or previous process was interrupted).
			Example (empty): ()
			Example: ('locator1')

		- outputFolder <str>
			Full or relative path of folder for output GDB.
			!! If this folder does not exist it will be created.
			Example: 'C:\\Data\\geocodedAddresses'

		- outputGDBname <str>
			Name of GDB for output.
			!! If this GDB already exists (in the outputFolder) it will be deleted.
			Example: 'GeocodedAddresses.gdb'

		- outputFCname <str>
			Name of the output addresses Feature Class.
			Example: 'geocodedAddresses'

		- outputType <str>; one of TABLE or POINT
			Type of output features, can only be table (includes x,y coordinate fields) or points (creates point features, rather than writing x,y attribute field).
			Example: 'TABLE'

		- outputSRS <arcpy Spatial Reference>
			Required if output type is set to POINTS, otherwise can be blank or None
			Example: arcpy.SpatialReference(2193)

		- tempFolder <str>
			Full or relative path to a temporary folder where some GDBs are stored and written to/from.
			Best placed on an SSD if available. This folder can be manually deleted after use.
			!! If this folder does not exist it will be created.
			Example: 'D:\\Temp\\geocodingTemp'

		- nProcessesTxtPath <str>
			Full or relative path to a text file containing only a single integer value representing the desired number of processes to be used.
			This allows the number of processes being used to be changed while the program is running.
			!! Updates to this file are only propagated when the next locator is initialised.
			Example: '_nCPUs.txt'

		- nProcessesDefault <int>
			Number of processes to use initially, will be used if the text file specified by nProcessesTxtPath does not exist or cannot be read.
			Example: 4

		- numSplits <int>
			Number of parts to split the input addresses into for geocoding; the value should be a multiple of the maximum number of desired processes.
			Increasing the number of splits will decrease maximum memory consumption for the same set of addresses.
			Example: 16

	Notes:
		No existence or type checking is carried out
		Requires ArcGIS 10.1

	Author: StacyR
			https://pythongisandstuff.wordpress.com/

	Version: 310713
    '''

	nProcesses = nProcessesDefault

	infoDict = {'nInputRows': 0, 'nOutputRows': 0}

	t0 = time.time()

	inputFC = os.path.join(inputGDBpath, inputFCname)

	# paths/names for temp GDBs, FCs and layers
	address_tableName = 'address'
	addressGeocoded_tableName = 'addresses_geocoded'
	tempAddresses_layerName = 'addresses_temp'
	splitGDBNames = ['Split_%s.gdb' % i for i in range(numSplits)]
	masterAddresses_gdbName = 'masterAddresses.gdb'
	masterAddresses_gdbPath = os.path.join(tempFolder, masterAddresses_gdbName)
	masterAddress_table = os.path.join(masterAddresses_gdbPath, address_tableName)

	# output path and FC name
	outputGDB = os.path.join(outputFolder, outputGDBname)
	outputFC = os.path.join(outputGDB, outputFCname)

	# create tempFolder if it does not currently exist
	if not os.path.exists(tempFolder):
		os.makedirs(tempFolder)

	# create outputFolder if it does not currently exist
	if not os.path.exists(outputFolder):
		os.makedirs(outputFolder)

	if len(completedLocators) == 0:
		# delete/create master address GDB, add fields
		# delete output files for writing
		if os.path.exists(masterAddresses_gdbPath):
			shutil.rmtree(masterAddresses_gdbPath)
			arcpy.AddMessage('  Deleted %s.' % masterAddresses_gdbPath)
		arcpy.CreateFileGDB_management(tempFolder, masterAddresses_gdbName, "CURRENT")
		arcpy.AddMessage('    Created GDB: %s' % masterAddresses_gdbPath)
		arcpy.CreateTable_management(masterAddresses_gdbPath, address_tableName)
		for field in locatorFields:
			arcpy.AddField_management(masterAddress_table, field, 'TEXT')
		arcpy.AddField_management(masterAddress_table, 'matchAddress', 'TEXT')
		arcpy.AddField_management(masterAddress_table, 'x', 'FLOAT')
		arcpy.AddField_management(masterAddress_table, 'y', 'FLOAT')
		arcpy.AddField_management(masterAddress_table, 'locatorNum', 'SHORT')

		arcpy.AddMessage('  Master address table created and fields added in %s.' % timeDisp(time.time() - t0))
		t1 = time.time()
		arcpy.AddMessage('  Extracting data from input address database (%s), adding to master table.' % inputFC)

		inputFields = [field for field in inputAddressFields]

		with arcpy.da.SearchCursor(inputFC, inputFields) as sC:

			masterAddressFields = [field for field in locatorFields]
			masterAddressFields.append('locatorNum')

			with arcpy.da.InsertCursor(masterAddress_table, masterAddressFields) as iC:
				for values in sC:
					listValues = list(values)
					listValues.append(0)
					iC.insertRow(listValues)
					infoDict['nInputRows'] += 1

		arcpy.AddMessage('    %i addresses extracted and added to master table in %s.' % (infoDict['nInputRows'], timeDisp(time.time() - t1)))

	else:
		# overwrite locatorNum in master table (for locators that are not defined as completed) with value 0
		## this means that some addresses previously geocoded may be run again (if their locator is listed as completed)
		with arcpy.da.SearchCursor(masterAddress_table, ('OID@', )) as sC:
			for row in sC:
				infoDict['nInputRows'] += 1
		arcpy.AddMessage('    %i input addresses' % infoDict['nInputRows'])

		for addressLocator in addressLocators:
			if addressLocator[1] not in completedLocators:
				locatorNum = addressLocators.index(addressLocator) + 1

				with arcpy.da.UpdateCursor(masterAddress_table, 'locatorNum', '"locatorNum" = %i' % locatorNum) as uC:
					for row in uC:
						row[0] = 0
						uC.updateRow(row)
		arcpy.AddMessage('    Partially completed locators set to run again in %s.' % timeDisp(time.time() - t0))

	locatorNum = 1
	t2 = time.time()

	for addressLocator in addressLocators:

		if addressLocator[1] not in completedLocators:

			tG0 = time.time()

			arcpy.AddMessage('  Starting geocode with address locator (%i of %i): %s' % (locatorNum, len(addressLocators), os.path.join(*addressLocator)))

			# make view of the non-geocoded addresses from Master table to Temp table
			arcpy.MakeTableView_management(masterAddress_table, tempAddresses_layerName, ' "locatorNum" = 0')

			arcpy.AddField_management(tempAddresses_layerName, 'SplitID', 'LONG')
			with arcpy.da.UpdateCursor(tempAddresses_layerName, ('SplitID', )) as uC:
				splitID = 0
				for row in uC:
					row[0] = splitID
					uC.updateRow(row)
					splitID += 1
					if splitID == numSplits:
						splitID = 0
				del splitID

			# copy splits to split GDBs
			for i in range(numSplits):
				splitGDB_path = os.path.join(tempFolder, splitGDBNames[i])
				if os.path.exists(splitGDB_path):
					shutil.rmtree(splitGDB_path)
				arcpy.CreateFileGDB_management(tempFolder, splitGDBNames[i], "CURRENT")
				splitOutTablePath = os.path.join(splitGDB_path, address_tableName)
				arcpy.SelectLayerByAttribute_management(tempAddresses_layerName, "NEW_SELECTION", ' "SplitID" = %s' % i)
				arcpy.CopyRows_management(tempAddresses_layerName, splitOutTablePath)
			arcpy.Delete_management(tempAddresses_layerName)
			arcpy.AddMessage('    %i split GDBs created and data added in %s' % (numSplits, timeDisp(time.time() - tG0)))

			tG1 = time.time()

			nProcesses, nProcInfo = updProcesses(nProcesses, nProcessesTxtPath)
			if len(nProcInfo) > 0: arcpy.AddMessage('  ' + nProcInfo)
			# set up PP
			if nProcesses > 1:
				import pp
				jobs = []
				job_server = pp.Server(nProcesses, ppservers=())
				arcpy.AddMessage('    Job server started with %i processes.' % nProcesses)

			resultsDict = {}

			# run geocoding
			for i in range(numSplits):
				inputAddresses = os.path.join(tempFolder, splitGDBNames[i], address_tableName)
				outputAddresses = os.path.join(tempFolder, splitGDBNames[i], addressGeocoded_tableName)
				tempSplitAddresses_layerName = 'addresses_temp_%i' % i

				arcpy.MakeTableView_management(inputAddresses, tempSplitAddresses_layerName)
				inputAddresses_count = int(arcpy.GetCount_management(tempSplitAddresses_layerName).getOutput(0))
				arcpy.Delete_management(tempSplitAddresses_layerName)

				if nProcesses > 1:
					arcpy.AddMessage('      Submitting %i addresses in %s to job server' % (inputAddresses_count, inputAddresses))
					jobs.append(job_server.submit(geocodeAddresses, (inputAddresses, inputAddresses_count, outputAddresses, os.path.join(*addressLocator)), (), ('arcpy', 'time')))
				else:
					arcpy.AddMessage('      Geocoding %i addresses in %s' % (inputAddresses_count, inputAddresses))
					resultsDict_part, resultStr, resultTime = geocodeAddresses(inputAddresses, inputAddresses_count, outputAddresses, os.path.join(*addressLocator))
					arcpy.AddMessage('  ' + resultStr + timeDisp(resultTime) + '.')
					try: resultsDict.update(resultsDict_part)
					except TypeError: arcpy.AddMessage('      No output from geocode; %s' % resultStr)
					del resultsDict_part, resultStr, resultTime 

			# get result from PP...
			if nProcesses > 1:
				for job in jobs:
					resultsDict_part, resultStr, resultTime = job()
					arcpy.AddMessage('  ' + resultStr + timeDisp(resultTime) + '.')
					try: resultsDict.update(resultsDict_part)
					except TypeError: arcpy.AddMessage('      No output from geocode; %s' % resultStr)
					del resultsDict_part, resultStr, resultTime
				job_server.destroy()
				del job, jobs

			# write results to master address table...
			count_thisLocator = len(resultsDict)
			with arcpy.da.UpdateCursor(masterAddress_table, ('OID@', 'matchAddress', 'x', 'y', 'locatorNum')) as uC:
				for row in uC:
					rowID = row[0]
					if rowID in resultsDict:
						row[1] = resultsDict[rowID][2]
						row[2] = resultsDict[rowID][0]
						row[3] = resultsDict[rowID][1]
						row[4] = locatorNum
						uC.updateRow(row)
				del resultsDict
			arcpy.AddMessage('    %i addresses (%.2f percent of all addresses) geocoded by this locator in %s.' % (count_thisLocator, 100.0*(float(count_thisLocator)/float(infoDict['nInputRows'])), timeDisp(time.time() - tG1)))
		else:
			arcpy.AddMessage('    !! Skipping locator (inputs state it has already completed) (%i of %i): %s' % (locatorNum, len(addressLocators), addressLocator[1]))

		arcpy.MakeTableView_management(masterAddress_table, tempAddresses_layerName, ' "locatorNum" > 0')
		allGeocodedAddresses_count = int(arcpy.GetCount_management(tempAddresses_layerName).getOutput(0))
		arcpy.Delete_management(tempAddresses_layerName)
		arcpy.MakeTableView_management(masterAddress_table, tempAddresses_layerName, ' "locatorNum" = 0')
		nonGeocodedAddresses_count = int(arcpy.GetCount_management(tempAddresses_layerName).getOutput(0))
		arcpy.Delete_management(tempAddresses_layerName)

		arcpy.AddMessage('    Total addresses geocoded thus far (all locators): %i; %.2f percent of all addresses.' % (allGeocodedAddresses_count, 100.0*(float(allGeocodedAddresses_count)/float(infoDict['nInputRows']))) )

		locatorNum += 1

	# delete split GDBs
	for i in range(numSplits):
		splitGDB_path = os.path.join(tempFolder, splitGDBNames[i])
		if os.path.exists(splitGDB_path):
			shutil.rmtree(splitGDB_path)

	arcpy.AddMessage('  Completed geocoding in %s' % (timeDisp(time.time() - t2)))
	arcpy.AddMessage('    Input addresses: %i' % infoDict['nInputRows'])
	arcpy.AddMessage('    Final addresses geocoded (all locators): %i (%.2f percent of all addresses)' % (allGeocodedAddresses_count, 100.0*(float(allGeocodedAddresses_count)/float(infoDict['nInputRows']))) )
	arcpy.AddMessage('    Non-geocoded addresses: %i' % nonGeocodedAddresses_count)

	t3 = time.time()
	arcpy.AddMessage('  Starting export of geocoded addresses to %s' % outputGDB)

	# delete output files for writing
	if os.path.exists(outputGDB):
		shutil.rmtree(outputGDB)
		arcpy.AddMessage('  Deleted %s.' % outputGDB)

	# export geocoded addresses
	arcpy.CreateFileGDB_management(outputFolder, outputGDBname, "CURRENT")
	arcpy.AddMessage('    Created GDB: %s' % outputGDB)

	if outputType == 'POINTS':
		arcpy.CreateFeatureclass_management(outputGDB, outputFCname, 'POINT', '', '', '', outputSRS)
	else:
		arcpy.CreateTable_management(outputGDB, outputFCname)

	for field in inputAddressFields:
		arcpy.AddField_management(outputFC, field, 'TEXT')
	arcpy.AddField_management(outputFC, 'matchAddress', 'TEXT')
	arcpy.AddField_management(outputFC, 'locator', 'TEXT')

	outputFields = [field for field in inputAddressFields]
	outputFields.append('matchAddress')

	if outputType == 'POINTS':
		outputFields.extend(['SHAPE@X', 'SHAPE@Y'])
	else:
		arcpy.AddField_management(outputFC, 'x', 'FLOAT')
		arcpy.AddField_management(outputFC, 'y', 'FLOAT')
		outputFields.extend(['x', 'y'])

	outputFields.append('locator')

	copyFields = [field for field in locatorFields]
	copyFields.extend(['matchAddress', 'x', 'y', 'locatorNum'])

	with arcpy.da.InsertCursor(outputFC, outputFields) as iC:
		with arcpy.da.SearchCursor(masterAddress_table, copyFields) as sC:

			for values in sC:

				if values[-1] == 0:
					locatorName = 'None'
				else:
					locatorName = addressLocators[values[-1] -1][1]

				listValues = list(values)
				listValues[-1] = locatorName
				iC.insertRow(listValues)
				del locatorName
				infoDict['nOutputRows'] += 1

	arcpy.AddMessage('    Exported %i addresses in %s.' % (infoDict['nOutputRows'], timeDisp(time.time() - t3)))

def geocodeAddresses(input_Addresses, input_Addresses_count, output_Addresses, address_Locator):
	'''geocodes a table of addresses and returns it as a Python dictionary
	suitable for parallelisation'''
	t_G0 = time.time()

	arcpy.GeocodeAddresses_geocoding(input_Addresses, address_Locator, '', output_Addresses)

	# select geocoded features, searchcursor through and write to a dictionary
	results_Dict = {}
	with arcpy.da.SearchCursor(output_Addresses, ('OID@', 'X', 'Y', 'Match_addr'), ' "X" > 0') as sC:
		for row in sC:
			results_Dict[row[0]] = (row[1], row[2], row[3])

	output_Addresses_count = len(results_Dict)
	geocodeStr = '      Geocoded addresses: %i of %i (%.2f percent) addresses in ' % (output_Addresses_count, input_Addresses_count, 100*(float(output_Addresses_count)/float(input_Addresses_count)))

	del input_Addresses, input_Addresses_count, output_Addresses, address_Locator
	return results_Dict, geocodeStr, time.time() - t_G0

def timeDisp(t):
	'''timeDisp(t)

	Converts seconds to minutes, hours or days, if appropriate

	Inputs:
		- t: time difference <float>
	Returns:
		- converted string <str>

	Usage example:
		print 'Program complete in %s.' % timeDisp(time.clock() - t0)
	'''
	L = [[24*60.0*60, 'days'], [60.0*60.0, 'hours'], [60.0, 'minutes']]
	for l in L:
		if t > 2*l[0]: return '%.2f %s' % (t/l[0], l[1])
	return '%.2f seconds' % t

def updProcesses(nProc, fPath):
	'''nCPUs_update(nProcesses, fPath)

	Checks a text file for a number, returns if error, number changed or not

	Inputs:
		- nProc: current number of Processes <int>
		- fPath: path to file containing number <str>
	Returns:
		- new processes number <int>
		- text explaining change or error <str>
	'''
	try:
		with open(fPath, 'r') as fObj:
			nProcesses_upd = int(fObj.read())
	except:
		return nProc, ' !! WARNING: cannot read from %s, number of Processes (currently %s) unchanged.' % (fPath, nProc)

	if nProcesses_upd != nProc:
		return nProcesses_upd, ' !! WARNING: number of Processes changed from %s to %s.' % (nProc, nProcesses_upd)
	else:
		return nProc, ''

if __name__ == '__main__':

	tMain = time.time()

	inputGDBpath = 'D:\\_TEMP\\inputAddresses.gdb'
	inputFCname = 'addresses'
	inputAddressFields = ('adds', 'zoneSub', 'Zone')
	addressLocators = (('C:\\_TEMP_data extracts\\Geocoders\\NZ_Geocoder\\', 'USSty_NZAddsSDC_City'), ('C:\\_TEMP_data extracts\\Geocoders\\NZ_Geocoder\\', 'USSty_NZAdds_City'), ('C:\\_TEMP_data extracts\\Geocoders\\NZ_Geocoder\\', 'NZSty_NZAddsSDC_City'))
	locatorFields = ('Address', 'Suburb', 'City')

	# completedLocators = ('USSty_NZAddsSDC_City', 'USSty_NZAdds_City', 'NZSty_NZAddsSDC_City')
	completedLocators = ()

	outputFolder = 'D:\\_TEMP\\output\\'
	outputGDBname = 'Geoadds.gdb'
	outputFCname = 'geoAdds'
	outputType = 'TABLE'

	outputSRS = arcpy.SpatialReference(2193)
	# outputSRS = None

	tempFolder = 'D:\\_TEMP\\geoTemp\\'
	nProcessesTxtPath =  'D:\\_TEMP\\_nCPUs.txt'
	nProcessesDefault = 1
	numSplits = 4

	arcpy.AddMessage('Starting parallel composite geocode...')
	parallelCompositeGeocode(inputGDBpath, inputFCname, inputAddressFields, addressLocators, locatorFields, completedLocators, outputFolder, outputGDBname, outputFCname, outputType, outputSRS, tempFolder, nProcessesTxtPath, nProcessesDefault, numSplits)
	arcpy.AddMessage('Geocode complete and written to output in: %s' % timeDisp(time.time() - tMain))
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s