Skip to main content

How to Use the Python asyncio Library

Tags

How to Use the Python asyncio Library

Python 3 has an excellent library called asyncio that can be used for writing concurrent code on anything from web-servers, to databases, and even distributed task queues.  Asyncio is created from a non-blocking programming model that provides your program the flexibility to delegate data when it is available as instead to waiting for data to be received.  This prevents code in your program from getting bottle-necked and allows data tasks to be run concurrently to get greater performance and processing from your program's I/O.  One question that I often hear when describing asyncio is, "how does writing a program using asyncio look different from writing normal Python 3 code?" And that is why I wanted to write this tutorial, to describe what writing a Python program using asyncio looks like and to provide a few code samples of how I used asyncio in a Python module I was writing.  Let's jump in!  

NOTE:  The code in this tutorial was written and tested on Ubuntu Linux, 16.04 and 18.04 with the Python 3.8 development branch.  Python 3.7.* should work as well, it has just not been tested. 

 

The Network Capture Module 🚀

A few weeks ago I was architecting Network Capture, a Python module written as a wrapper on around TCPDump.  Network Capture allows you to orchestrate a packet capture and analyze the network traffic for advanced text based filtering capabilities. During the architecture and planning phase I needed a fast way to display and analyze the data coming from STDOUT on my packet capture.  My first thought was to use `subprocess.Popen` to read lines coming in from the capture like so:

capture = subprocess.Popen([self.capture_command], 
					shell=True, 
					stdout=subprocess.PIPE)
 
captured_line = capture.stdout.readline()
 
if captured_line is not b'':
	# Analyze and filter the captured line here

Using the Python asyncio Library 🐍

This approach worked, but it did not perform as well as I would have liked it to given the potential for the high volume of data passing through STDOUT.  This approach was also a bit hard to terminate and control the shutdown of the TCPDump process while using Python.  So that's when I turned to Python's asyncio library to help me concurrently process data coming in from STDOUT and shut down TCPDump safely with Python.  Let's take a look at how I did this.

To get started, the first thing I needed was an event loop.  In the code sample below, I am requesting an event loop that is bound to the main thread Network Capture is running on, but not the current event loop Network Capture is running it's program code on.  This is a distinct difference between event loops and also free's up the main thread to schedule execution without blocking Network Capture's event loop.  So now that Network Capture has an event loop, let's take a look at the life cycle of that loop, and to do that, we need to examine how the loop is started and finished. The user experience for the Network Capture module is that a user would execute a capture from the command line and run until they forced a KeyboardInterrupt exception that breaks execution and closes the event loop.  So with this experience in mind, once the event loop is obtained, the try except wrapped around it will catch a KeyboardInterrupt exception and then in finally, close down the event loop and terminate the program.

if __name__ == '__main__':
	# Create a new network capture object and pass in the system arguments
	nc = network_capture(sys.argv)
	# Create a new run loop
	eventloop = asyncio.get_event_loop()
	try:
		# Execute capture until complete.
		eventloop.run_until_complete(nc.dispatch_capture())
	except KeyboardInterrupt as e:
		print("-- Exiting due to keyboard interrupt --")
	finally:
		eventloop.close()

Next, let's look at the logic in the event loop's run_until_complete method call.  In this method I am passing the dispatch_capture() method found in the Network Capture class. This tells the event loop to run this coroutine until complete, or in this case, until a exception is raised. Notice in the code below that the dispatch_capture, capture_process, capture_read_bytes, and kill_process methods are all defined with the async keyword.  This means that these methods are all coroutines in your Python program and when you use the async keyword you then tell the event loop to await the future return of that coroutine by using the await keyword when calling one of these method.  Using the await keyword means that you are is defining a future awaitable object that is not thread safe by default.  Notice that all of the capture_process, capture_read_bytes, and kill_process methods are all futures having the await keyword assigned to them.  Awaiting the return of the coroutine will provide results when available, going back to that asynchronous programming model that the future, or caller, receives data when the data is ready, but does not wait for that data and hold up the program's execution.

This loop containing the calls to capture_process and capture_read_bytes are perfect examples of asynchronous input/output because this is the actual part of the program receiving the data from the TCPDump capture.  A good test would be to measure the response of the input/output coming off of capture_read_bytes using asyncio and just using a straight serial Python loop.

class network_capture(object):
 
	__slots__ = ('keywords', 
		     'capture_cmd',
		     'pcap_file', 
		     'txt_file',
		     'validation_values',
		     'validation_map')
 
	# Constructor
	def __init__(self, *args):
		...
 
	# Async method to start the capture.
	async def dispatch_capture(self):
 
		print("Capturing command: {0}".format(self.capture_cmd))
		print("Capturing keywords: {0}".format(self.keywords))
		line_count = 0
		capture_pid = None
		capture_stdout = None
		keyword_filtering = False
 
 
		if len(self.keywords) > 0:
			keyword_filtering = True
 
		# Open a file and start a read/write context
		with open(self.txt_file, 'w') as txt_file_obj:
 
			try:
				# Just to make sure you do not need to enter your password.
				# Set the mode for the file to full read/write
				os.system("chmod 777 " + self.txt_file)
 
				print("-- Start Capturing Network Traffic --")
 
				capture_pid = await self.capture_process()
				while True:
					captured_line = await self.capture_read_bytes(capture_pid)
 
					if captured_line is not None and captured_line != b'':
 
						captured_line = captured_line.decode("utf-8")
						print("{0} \n".format(captured_line))
 
						if keyword_filtering:
							if any(key in captured_line for key in self.keywords):
								print("** Keyword found. Writing to log **")
								txt_file_obj.write(captured_line + "\n")
								# Removing this until this works
								line_count += 1
						else:
							txt_file_obj.write(captured_line + "\n")
							line_count += 1
 
			except OSError as err:
				print("-- Exiting due to an operating system failure --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
				sys.exit(0)
			except AttributeError as err:
				print("-- Exiting due to an AttributeError --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
			except:
				print("-- Unexpected excpetion received --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Errors: {0}".format(sys.exc_info()[0]))
				sys.exit(0)
			finally:
				txt_file_obj.close()
				await self.kill_process(capture_pid)
 
	# Async method to execute the tcpdump commands and pipe them back to
	# the awaiting pid.
	async def capture_process(self):
		return await asyncio.create_subprocess_shell(self.capture_cmd, 
												  stdout=asyncio.subprocess.PIPE,
												  stderr=asyncio.subprocess.PIPE)
 
	# Async method to read a line from stdout and return it to the awaiting
	# caller.  The line is formatted, printed, and evaluated.
	async def capture_read_bytes(self, capture_pid):
		return await capture_pid.stdout.readline()
 
	# Async method to take kill and wait for the process to be terminated
	# once finally is executed from dispatch_capture.
	async def kill_process(self, capture_pid):
		capture_pid.kill()
		await capture_pid.wait()

Now that I have discussed a very basic example of how coroutines and futures work, let's talk about how to terminate usage of an event loop.  As previously mentioned, working with and attempting to shutdown the TCPDump process safely had proved to be challenging under the hood.  I found that using asyncio to kill the pid (process id) and awaiting the future return that it had completed to be a much safer and cleaner option to take.  For example, in the code below the program catches the error running on the capturing logic and awaits the return of kill_process.  In kill_process a signal is sent to kill the pid and await until this is complete.  From there the outside KeyboardInterrupt exception is caught and this catch block finally closes the event loop running your asyncio scheduling.

class network_capture(object):
	# Async method to start the capture.
	async def dispatch_capture(self):
		# Open a file and start a read/write context
		with open(self.txt_file, 'w') as txt_file_obj:
 
			try:
				...
			except OSError as err:
				print("-- Exiting due to an operating system failure --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
				sys.exit(0)
			except AttributeError as err:
				print("-- Exiting due to an AttributeError --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
			except:
				print("-- Unexpected excpetion received --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Errors: {0}".format(sys.exc_info()[0]))
				sys.exit(0)
			finally:
				txt_file_obj.close()
				# 1) kill_process is awaited
				await self.kill_process(capture_pid)
 
	# Async method to take kill and wait for the process to be terminated
	# once finally is executed from dispatch_capture.
	async def kill_process(self, capture_pid):
		# Signal is sent to kill the pid
		capture_pid.kill()
		await capture_pid.wait()
 
if __name__ == '__main__':
	# Create a new network capture object and pass in the system arguments
	nc = network_capture(sys.argv)
	# Create a new run loop
	eventloop = asyncio.get_event_loop()
	try:
		# Execute capture until complete.
		eventloop.run_until_complete(nc.dispatch_capture())
	except KeyboardInterrupt as e:
		print("-- Exiting due to keyboard interrupt --")
	finally:
		# Finally the event loop is closed
		eventloop.close()

In Summary ⌛️

In summary working with asyncio can be a very simplistic way to add concurrency to a program without a lot of overhead.  Python's asyncio Library has gained a lot of ground of the last year or so and by the looks of it things will only get stronger.  Hopefully now you have learned a bit more about Python's asyncio and how to incorporate it into your program.  Please let me know if you have any questions, comments, or concerns, I'd love to hear from you.  The complete code for this tutorial is up on my Github here.

References:

  1. Futures: https://docs.python.org/3/library/asyncio-future.html#asyncio.Future
  2. asyncio: https://docs.python.org/3/library/asyncio.html
  3. Coroutines: https://docs.python.org/3/library/asyncio-task.html
  4. Event Loops: https://docs.python.org/3/library/asyncio-eventloops.html
  5. Network Capture: https://github.com/agnosticdev/Network-Capture

Member for

3 years 9 months
Matt Eaton

Long time mobile team lead with a love for network engineering, security, IoT, oss, writing, wireless, and mobile.  Avid runner and determined health nut living in the greater Chicagoland area.