Multithreading with external Python Analysis

Multithreading with external Python Analysis

I have been looking at options to allow for an external script to be called in a non-blocking way.

In my case I have a script that post processes data from a sensor when it is parsed and placed in a bucket. However, since I have many devices, processing one at a time can fall behind if there is a lot of traffic. Most of that time is spent waiting on API requests from tago.io, so my CPU utilization is very low.

I plan to use multi threading to allow for a pool of processes to run the analysis.

@Andrés used async to run two different functions from one trigger. This is similar to what I want to do, though I plan to use the threading library instead.



I plan on using ThreadPoolExecutor() so I don't have to manage any queues myself.

Here is a minimum functional example of what I propose. If this analysis is run sequentially, it takes 30 seconds before the next analysis is run, but in this example it allows for up to 4 instances of analysis to run at the same time.


  1. from tagoio_sdk import Analysis, Device, Account
    import concurrent.futures
    import time
    def myAnalysis(context, scope):
        context.log("I did something")
        time.sleep(30)

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

        def parallelAnalysis(context, scope):
            executor.submit(myAnalysis, context, scope)


        Analysis({"token": "MY_TOKEN",  "region": "usa-1"}).init(parallelAnalysis)

Of course, this means that there is potential to cause race conditions if the same device updated quickly enough. Thankfully, my devices update once every hour so it is not an issue, but I think you could build a condition variable structure to prevent the same device being modified at the same time.

Let me know if you see any pitfalls or errors to this approach. I haven't deployed this code and only have tested it in limited circumstances.