Testing of async tornado RequestHandler method in a complex environment

I am trying to write unit testing code for a child of tornado.web.RequestHandler that runs an aggregate query to the database. I have already wasted several days trying to get the tests to work. The tests are using pytest and factoryboy. A lot of the important tornado class have factories for the tests. This is the class that is being tested: class AggregateRequestHandler(StreamlyneRequestHandler): ''' '''

SUPPORTED_METHODS = (
    "GET", "POST", "OPTIONS")


def get(self):
    self.aggregate()


@auth.hmac_auth
#@tornado.web.asynchronous
@tornado.web.removeslash
@tornado.gen.coroutine
def aggregate(self):
    '''
    '''
    self.logger.info('api aggregate')
    data = self.data
    print("Data: {0}".format(data))


    pipeline = data['pipeline']


    self.logger.debug('pipeline : {0}'.format(pipeline))
    self.logger.debug('utc tz : {0}'.format(tz_util.utc))


    # execute pipeline query
    print(self.collection)
    try:
        cursor_future = self.collection.aggregate(pipeline, cursor={})
        print(cursor_future)
        cursor = yield cursor_future
        print("Cursor: {0}".format(cursor))
    except Exception as e:
        print(e)
    documents = yield cursor.to_list(length=None)


    self.logger.debug('results : {0}'.format(documents))


    # process MongoDB JSON extended
    results = json.loads(json_util.dumps(documents))
    pipeline = json.loads(json_util.dumps(pipeline))


    response_data = {
        'pipeline': pipeline,
        'results': results
    }


    self.respond(response_data)

The method used to test it is here:

#@tornado.testing.gen_test
def test_time_inside(self):
    current_time = gen_time()
    past_time =  gen_time() - datetime.timedelta(minutes=20)


    test_query = copy.deepcopy(QUERY)
    oid = ObjectId("53a72de12fb05c0788545ed6")
    test_query[0]['$match']['attribute'] = oid
    test_query[0]['$match']['date_created']['$gte'] = past_time
    test_query[0]['$match']['date_created']['$lte'] = current_time


    request = produce.HTTPRequest(
        method="GET",
        headers=produce.HTTPHeaders(
            kwargs = {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "X-Sl-Organization": "test",
                "Hmac": "83275edec557e2a339e0ec624201db604645e1e1",
                "X-Sl-Username": "[email protected]",
                "X-Sl-Expires": 1602011725
            }
        ),
        uri="/api/v1/attribute-data/aggregate?{0}".format(json_util.dumps({
            "pipeline": test_query
        }))
    )


    self.ARH = produce.AggregateRequestHandler(request=request)


    #io_loop = tornado.ioloop.IOLoop.instance()
    self.io_loop.run_sync(self.ARH.get)


    #def stop_test():
        #self.stop()


    #self.ARH.test_get(stop_test)
    #self.wait()


    output = self.ARH.get_written_output()


    assert output == ""

This is the way I set up the factory for the Request Handler:

class OutputTestAggregateRequestHandler(slapi.rest.AggregateRequestHandler, tornado.testing.AsyncTestCase):
    '''
    '''


    _written_output = []




    def write(self, chunk):
        print("Previously written: {0}".format(self._written_output))
        print("Len: {0}".format(len(self._written_output)))
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            print("Going to encode a chunk")
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = escape.utf8(chunk)
        print("Writing")
        self._written_output = []
        self._written_output.append(chunk)
        print(chunk)




    def flush(self, include_footers=False, callback=None):
        pass




    def get_written_output(self):
        for_return = self._written_output
        self._written_output = []
        return for_return




class AggregateRequestHandler(StreamlyneRequestHandler):
    '''
    '''


    class Meta:
        model = OutputTestAggregateRequestHandler


    model = slapi.model.AttributeDatum


When running the tests, the test simply stops in def aggregate(self): somewhere between print(cursor_future) and print("Cursor: {0}".format(cursor)).

The in the stdout you see

MotorCollection(Collection(Database(MongoClient([]), u'test'), u'attribute_datum'))
<tornado.concurrent.Future object at 0x7fbc737993d0>

and nothing else comes out of the test with it failing on

>       assert output == ""
E       AssertionError: assert [] == ''

After a lot of time looking at documentation and examples and stack overflow I managed to get a functioning test by adding the following code to OutputTestAggregateRequestHandler:

def set_io_loop(self):
    self.io_loop = tornado.ioloop.IOLoop.instance()


def ioloop(f):
    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        print(args)
        self.set_io_loop()
        return f(self, *args, **kwargs)
    return wrapper


def runTest(self):
    pass



Then copying all of the code from AggregateRequestHandler.aggregate into OutputTestAggregateRequestHandler but with different decorators:

@ioloop
@tornado.testing.gen_test
def _aggregate(self):
    ......

I then received the output:

assert output == ""
E       AssertionError: assert ['{\n    "pipeline": [\n        {\n            "$match": {\n                "attribute": {\n                    "$oid"...                "$oid": "53cec0e72dc9832c4c4185f2"\n            }, \n            "quality": 9001\n        }\n    ]\n}'] == ''

which is actually a success, but I was just triggering an assertion error on purpose to see the output.

The big problem that I have, is how do I achieve the desired outcome, which is the output received by adding the extra code, and copying the aggregate method. Obviously when copying the code out of the aggregate method the test is no longer useful after I make changes to the actual method. How can I get the actual aggregate method to function properly in the tests instead of stopping seemingly when it encounters asynchronous code? Thanks for any help, Cheers! -Liam