Testing of async tornado RequestHandler method in a complex environment

I am trying to write unit testing code for a child of tornado.web.RequestHandler that runs an aggregate query to the database. I have already wasted several days trying to get the tests to work. The tests are using pytest and factoryboy. A lot of the important tornado class have factories for the tests. This is the class that is being tested: class AggregateRequestHandler(StreamlyneRequestHandler): ''' '''

    "GET", "POST", "OPTIONS")

def get(self):

def aggregate(self):
    self.logger.info('api aggregate')
    data = self.data
    print("Data: {0}".format(data))

    pipeline = data['pipeline']

    self.logger.debug('pipeline : {0}'.format(pipeline))
    self.logger.debug('utc tz : {0}'.format(tz_util.utc))

    # execute pipeline query
        cursor_future = self.collection.aggregate(pipeline, cursor={})
        cursor = yield cursor_future
        print("Cursor: {0}".format(cursor))
    except Exception as e:
    documents = yield cursor.to_list(length=None)

    self.logger.debug('results : {0}'.format(documents))

    # process MongoDB JSON extended
    results = json.loads(json_util.dumps(documents))
    pipeline = json.loads(json_util.dumps(pipeline))

    response_data = {
        'pipeline': pipeline,
        'results': results


The method used to test it is here:

def test_time_inside(self):
    current_time = gen_time()
    past_time =  gen_time() - datetime.timedelta(minutes=20)

    test_query = copy.deepcopy(QUERY)
    oid = ObjectId("53a72de12fb05c0788545ed6")
    test_query[0]['$match']['attribute'] = oid
    test_query[0]['$match']['date_created']['$gte'] = past_time
    test_query[0]['$match']['date_created']['$lte'] = current_time

    request = produce.HTTPRequest(
            kwargs = {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "X-Sl-Organization": "test",
                "Hmac": "83275edec557e2a339e0ec624201db604645e1e1",
                "X-Sl-Username": "[email protected]",
                "X-Sl-Expires": 1602011725
            "pipeline": test_query

    self.ARH = produce.AggregateRequestHandler(request=request)

    #io_loop = tornado.ioloop.IOLoop.instance()

    #def stop_test():


    output = self.ARH.get_written_output()

    assert output == ""

This is the way I set up the factory for the Request Handler:

class OutputTestAggregateRequestHandler(slapi.rest.AggregateRequestHandler, tornado.testing.AsyncTestCase):

    _written_output = []

    def write(self, chunk):
        print("Previously written: {0}".format(self._written_output))
        print("Len: {0}".format(len(self._written_output)))
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            print("Going to encode a chunk")
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = escape.utf8(chunk)
        self._written_output = []

    def flush(self, include_footers=False, callback=None):

    def get_written_output(self):
        for_return = self._written_output
        self._written_output = []
        return for_return

class AggregateRequestHandler(StreamlyneRequestHandler):

    class Meta:
        model = OutputTestAggregateRequestHandler

    model = slapi.model.AttributeDatum

When running the tests, the test simply stops in def aggregate(self): somewhere between print(cursor_future) and print("Cursor: {0}".format(cursor)).

The in the stdout you see

MotorCollection(Collection(Database(MongoClient([]), u'test'), u'attribute_datum'))
<tornado.concurrent.Future object at 0x7fbc737993d0>

and nothing else comes out of the test with it failing on

>       assert output == ""
E       AssertionError: assert [] == ''

After a lot of time looking at documentation and examples and stack overflow I managed to get a functioning test by adding the following code to OutputTestAggregateRequestHandler:

def set_io_loop(self):
    self.io_loop = tornado.ioloop.IOLoop.instance()

def ioloop(f):
    def wrapper(self, *args, **kwargs):
        return f(self, *args, **kwargs)
    return wrapper

def runTest(self):

Then copying all of the code from AggregateRequestHandler.aggregate into OutputTestAggregateRequestHandler but with different decorators:

def _aggregate(self):

I then received the output:

assert output == ""
E       AssertionError: assert ['{\n    "pipeline": [\n        {\n            "$match": {\n                "attribute": {\n                    "$oid"...                "$oid": "53cec0e72dc9832c4c4185f2"\n            }, \n            "quality": 9001\n        }\n    ]\n}'] == ''

which is actually a success, but I was just triggering an assertion error on purpose to see the output.

The big problem that I have, is how do I achieve the desired outcome, which is the output received by adding the extra code, and copying the aggregate method. Obviously when copying the code out of the aggregate method the test is no longer useful after I make changes to the actual method. How can I get the actual aggregate method to function properly in the tests instead of stopping seemingly when it encounters asynchronous code? Thanks for any help, Cheers! -Liam