DocsUser GuidesTesting Dynamic Interaction Deployments

Introduction

There are a number of ways to test your Dynamic Interaction deployments before putting them live in production. These include:

  1. Simulation using assumed take up rates or historical logs
  2. Run the deployment in parallel in production
  3. Use a network runtime to route a portion of the traffic to the new deployment.

In this lesson we will outline how to run a deployment in parallel in production.

Configure your Dynamic Interaction deployment

Configure your Dynamic Interaction and Deployment as you normally would. Push the deployment to an endpoint in the environment in which you want to run the test - ideally the production environment.

In order to test the Deployment we will be calling the /invocations API for the Dynamic Interaction Deployment when /invocations is called for the existing Deployment and we will be simulating /response calls based on the responses received by the existing deployment.

Calling the invocations API

The invocations API for the Dynamic Deployment will be called from the post scoring logic of the existing deployment using the callParallelDeployment method as shown in this code snippet:

        /** Get top scores and test for explore/exploit randomization */
        predictModelMojoResult = getTopScores(params, predictModelMojoResult);
 
        /** Call parallel deployment if specified in in_params or corpora */
        callParallelDeployment(params);

callParallelDeployment is a method in the PostScoreSuper class so your post scoring logic will need to extend PostScoreSuper in order to use this method.

In order to successfully make the call to the parallel deployment two parameters need to be provided; the URL of the parallel deployment endpoint and the campaign. These parameters can be provided in two ways:

  1. As part of the params JSONObject in the /invocations API call by adding parallel_url and parallel_campaign to the params JSONObject.
  2. As an Additional Corpora named parallel_deployment with fields url and campaign.

An example params:

{
    "parallel_url":"http://ecosystem-runtime:8091",
    "parallel_campaign":"recommender_dynamic_bayes"
}

If both methods are used the values in the params JSONObject will be used. By default, the remainder of the /invocations call will be the same as the existing deployment. These defaults can be overridden using the same structure. For example, to set the subcampaign either add parallel_subcampaign to params or add a field subcampaign to the parallel_deployment Additional Corpora.

In order to mitigate the increase in latency from adding the additional API call, callParallelDeployment is asynchronous and will not delay the execution of the post scoring logic. The additional calls will increase the load on the databases used for logging and lookups to the same degree as a standard Dynamic Interaction deployment.

🔥
Note

As the callParallelDeployment method is asynchronous it cannot be included in a static post scoring logic class. If your existing deployment is using static post scoring logic you will need to adjust this when including the callParallelDeployment method.

🔥
Note

callParallelDeployment is available from version 0.9.6.0 and later. If you are using an earlier version of the runtime you will need to implement your own asynchronous method to call the parallel deployment.

Calling the response API

As the offers recommended by the Dynamic Interaction deployment are not presented to the customer, the learning will have to be an approximation of the actual learning. There are a number of ways to simulate offer take up in the form of /response calls. Here we will use an approach utilizing the take up rate of the offers generated by the existing deployment, but other methods may be more appropriate for your use case.

To simulate responses we will set up logic to determine which /response calls to make in one minute intervals. This scheduling could be done using cron or a pipelining tool, here we use python to both schedule the check and to make the required calls.

#Import the required packages
from prediction.apis import data_management_engine as dme
from prediction import jwt_access
from runtime.apis import predictor_engine as o
from runtime import access
import time
import getpass
from datetime import datetime, timedelta
 
# Connect to the server
ecosystem_password = getpass.getpass("Enter your ecosystem password")
auth = jwt_access.Authenticate("http://ecosystem-server:3001/api", "user@ecosystem.ai", ecosystem_password)
 
# Connect to the runtime configured with the parallel deployment
auth_runtime = access.Authenticate("http://ecosystem-runtime:8091")
 
# Set up the initial schedule interval
now = datetime.now()
# Set the difference between the notebook and runtime time zones in minutes
time_zone_shift = 120
# Set the scheduling interval in minutes
interval = 1
# The starting point for the interval
now_less_schedule = datetime.strftime(now - timedelta(minutes = time_zone_shift+interval),"%Y-%m-%d %H:%M:%S")
# Loop through the response call check until interrupted
while True:
    # Get an acceptance count for each offer in the interval
    offer_count_list = dme.post_mongo_db_aggregate_pipeline(
        auth,
        {
        # Assumes the standard logging configuration, adjust if required
        "database":"logging","collection":"ecosystemruntime_response"
        ,"pipeline":[
            {"$match":{"$expr":{"$and":[
                {"$eq":["$predictor","offer_recommend_single_model"]}
                ,{"$ne":["$response_log.final_result",[]]}
                ,{"$gte":["$date_log",{"$toDate":now_less_schedule}]}
            ]}}}
            ,{"$unwind":"$response_log.final_result"}
            ,{"$project":{"offer":"$response_log.final_result.result.offer","_id":0}}
            ,{"$group":{"_id":"$offer","count":{"$sum":1}}}
        ]
        }
    )
 
    # Loop through offers accepted and make the corresponding response calls
    for offer_iter in offer_count_list:
        offers_for_response = dme.post_mongo_db_aggregate_pipeline(
            auth,
            {
            # Assumes the standard logging configuration, adjust if required
            "database":"logging","collection":"ecosystemruntime"
            ,"pipeline":[
                # Get the offers made by the dynamic interaction deployment
                {"$match":{"$expr":{"$and":[
                    {"$eq":["$predictor","offer_recommend_dynamic"]}
                    ,{"$ne":["$final_result",[]]}
                    ,{"$gte":["$date_log",{"$toDate":now_less_schedule}]}
                ]}}}
                # Unwind final result to cater for multiple offers in a sinlge response
                ,{"$unwind":"$final_result"}
                # Match the offer being simulated
                ,{"$match":{"$expr":{"$eq":["$final_result.result.offer",offer_iter["_id"]]}}}
                # Sort the offers based on the modified offer score. Here we have added some random noise, this can be removed if not required
                ,{"$addFields":{"randomised_score":{"$add":["$final_result.result.modified_offer_score",{"$divide":[{"$rand":{}},20]}]}}}
                # Sort based on the score and limit to the number of offers accepted. Note that if the number of times the offer has been recommended by the dynamic interaction deployment is less than the number made by the existing deployment, then the result will be less than offer_iter["count"]
                ,{"$sort":{"randomised_score":-1}}
                ,{"$limit":offer_iter["count"]}
                # Structure the output as required by the response API
                ,{"$project":{
                    "uuid":1
                    ,"offer":"$final_result.result.offer"
                    ,"offers_accepted":[{"offer_name":"$final_result.result.offer"}]
                    ,"channel":"simulation"
                    ,"_id":0
                }}
            ]
            }
        )
        # Call the response API for each offer with simulated take up
        for response_iter in offers_for_response:
            o.put_offer_recommendations(auth_runtime, response_iter, " ")
 
    #Set the starting point for the next interval
    now_less_schedule = datetime.strftime(datetime.now() - timedelta(minutes = time_zone_shift),"%Y-%m-%d %H:%M:%S")
    #Wait for the interval duration before checking again
    time.sleep(interval*60)

Explore the results

Thanks to the calls to the /invocations and /response APIs, you should now have a reasonable approximation of the behaviour of the Dynamic Interaction deployment in the logging collections configured for the deployment. The behaviour of the Dynamic Interaction deployment then can be explored using the standard Grafana dashboards or by running any other desired analysis on the data.