// Author: Ricardas Stoma // Company: Kolmisoft // Year: 2021 // About: Script handles fraud protection feature #define _GNU_SOURCE #define SCRIPT_VERSION "1.1" #define SCRIPT_NAME "m2_fraud_protection" #define LIMIT_BEFORE_AC_CHECK 10 #include "m2_functions.c" #include #include "cJSON.h" #include "cJSON.c" void get_users(); void get_call_prices(); void get_active_call_prices(); void check_limits(); void get_server_gmt_offset(); void report_warnings(); void report_limits(); void convert_time_by_tz_offset(char *date, char *buffer, float offset); void format_calldate_to_es(char *calldate, char *new_calldate); size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp); struct MemoryStruct { char *memory; size_t size; }; typedef struct users_struct { int id; double daily_spend_limit; double daily_spend_warning; double spent_today; int kill_calls_in_progress; int limit_reached; int limit_reported; int warning_reached; int warning_reported; char tz[50]; int tz_offset; } users_t; users_t *users = NULL; int users_count = 0; // GLOBAL VARIABLES char current_date[20] = ""; double server_offset = 0; int main(int argc, char *argv[]) { int i = 0; // Starting sript m2_init("Starting M2 Fraud Protection script\n"); m2_get_current_date(current_date); get_server_gmt_offset(); get_elasticsearch_host(); // Get users with fraud protection feature enabled get_users(); if (!users_count) { m2_log("No users with fraud protection enabled, exiting...\n"); exit(0); } // Get total call prices for today get_call_prices(); // Get active call prices get_active_call_prices(); // Check limits check_limits(); m2_log("Users with fraud protection enabled:\n"); for (i = 0; i < users_count; i++) { char notice[256] = "- STATUS: OK"; if (users[i].warning_reached) { strcpy(notice, " - STATUS: WARNING REACHED"); } if (users[i].limit_reached) { strcpy(notice, " - STATUS: LIMIT REACHED"); } if (users[i].limit_reported) { strcpy(notice, " - STATUS: LIMIT ALREADY REPORTED"); } m2_log(" id: %d, daily spend limit: %f, daily spend warning: %f, spent today: %f, timezone: %s (offset %d)%s\n", users[i].id, users[i].daily_spend_limit, users[i].daily_spend_warning, users[i].spent_today, users[i].tz, users[i].tz_offset, notice); } report_warnings(); report_limits(); m2_log("Script completed\n"); return 0; } /* Get users with fraud protection feature enabled */ void get_users() { MYSQL_RES *result; MYSQL_ROW row; char query[2048] = ""; sprintf(query, "SELECT users.id, daily_spend_limit, daily_spend_warning, kill_calls_in_progress, time_zone, offset, daily_spend_limit_reached, daily_spend_warning_reached " "FROM users " "JOIN timezones ON timezones.zone = users.time_zone " "WHERE enforce_daily_limit = 1 AND (daily_spend_limit > 0 || daily_spend_warning > 0)" "ORDER BY users.id"); if (m2_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); if (result) { while (( row = mysql_fetch_row(result) )) { users = realloc(users, (users_count + 1) * sizeof(users_t)); memset(&users[users_count], 0, sizeof(users_t)); if (row[0]) users[users_count].id = atoi(row[0]); if (row[1]) users[users_count].daily_spend_limit = atof(row[1]); if (row[2]) users[users_count].daily_spend_warning = atof(row[2]); if (row[3]) users[users_count].kill_calls_in_progress = atof(row[3]); if (row[4]) strcpy(users[users_count].tz, row[4]); if (row[5]) users[users_count].tz_offset = atoi(row[5]); if (row[6]) users[users_count].limit_reported = atoi(row[6]); if (row[7]) users[users_count].warning_reported = atoi(row[7]); users_count++; } mysql_free_result(result); } } /* Get this day call prices for each user */ void get_call_prices() { int i = 0; m2_log("Fetching user call prices\n"); for (i = 0; i < users_count; i++) { CURL *curl_handle; CURLcode res; char current_date_in_user_tz[20] = ""; char json_query[2048] = ""; char http_request_url[400] = ""; struct MemoryStruct chunk; char server_period_start_es[256] = ""; char server_period_end_es[256] = ""; int j = 0; chunk.memory = malloc(1); chunk.size = 0; convert_time_by_tz_offset(current_date, current_date_in_user_tz, users[i].tz_offset); strcpy(server_period_start_es, current_date_in_user_tz); strcpy(server_period_end_es, current_date_in_user_tz); strcpy(server_period_start_es + 10, "T00:00:00"); strcpy(server_period_end_es + 10, "T23:59:59"); if (global_debug) { m2_log("Current user (id: %d) time converted to server time: %s\n", users[i].id, current_date_in_user_tz); m2_log("Fetching calls for user (id: %d) from period %s - %s\n", users[i].id, server_period_start_es, server_period_end_es); } // set url sprintf(http_request_url, "http://%s:9200/m2/calls/_search?search_type=count", elasticsearch_host); // format query sprintf(json_query, "{\"size\":0,\"query\":{\"filtered\":{\"filter\":{\"bool\":{\"must\":[{\"range\":{\"calldate\":{\"gte\":\"%s\",\"lte\":\"%s\"}}},{\"range\":{\"provider_id\":{\"gt\":\"0\"}}},{\"term\":{\"disposition\":\"ANSWERED\"}},{\"term\":{\"user_id\":\"%d\"}}]}}}},\"aggregations\":{\"group_by_user_id\":{\"terms\":{\"field\":\"user_id\",\"size\":0},\"aggregations\":{\"total_billsec\":{\"sum\":{\"field\":\"user_billsec\"}},\"total_price\":{\"sum\":{\"field\":\"user_price\"}}}}}}", server_period_start_es, server_period_end_es, users[i].id); if (global_debug) { m2_log("Elasticsearch query:\n"); m2_log("%s\n", json_query); } curl_global_init(CURL_GLOBAL_ALL); // init the curl session curl_handle = curl_easy_init(); // specify URL to get curl_easy_setopt(curl_handle, CURLOPT_URL, http_request_url); // set body length curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, strlen(json_query)); // send json query curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, json_query); // send all data to this function curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, WriteMemoryCallback); // we pass our 'chunk' struct to the callback function curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&chunk); // some servers don't like requests that are made without a user-agent field, so we provide one curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); // get it! res = curl_easy_perform(curl_handle); // check for errors if (res != CURLE_OK) { m2_log("curl_easy_perform() failed: %s\n", curl_easy_strerror(res)); return; } else { if (global_debug) { m2_log("%lu bytes retrieved from Elasticsearch\n", (long)chunk.size); m2_log("%s\n", chunk.memory); } if (!strlen(chunk.memory)) { m2_log("Elasticsearch response is empty\n"); goto curl_cleanup; } cJSON *root = cJSON_Parse(chunk.memory); if (root == NULL) { m2_log("JSON root is NULL\n"); goto curl_cleanup; } cJSON *hits = cJSON_GetObjectItem(root, "hits"); if (hits == NULL) { m2_log("JSON hits is NULL\n"); cJSON_Delete(root); goto curl_cleanup; } if (cJSON_GetObjectItem(hits, "total") == NULL && cJSON_GetObjectItem(hits, "total")->valueint == 0) { m2_log("Zero hits\n"); cJSON_Delete(root); goto curl_cleanup; } cJSON *aggregations = cJSON_GetObjectItem(root, "aggregations"); if (aggregations == NULL) { m2_log("JSON aggregations is NULL\n"); cJSON_Delete(root); goto curl_cleanup; } cJSON *group_by_user = cJSON_GetObjectItem(aggregations, "group_by_user_id"); if (group_by_user == NULL) { m2_log("JSON group_by_user_id is NULL\n"); cJSON_Delete(root); goto curl_cleanup; } cJSON *user_buckets = cJSON_GetObjectItem(group_by_user, "buckets"); if (user_buckets == NULL) { m2_log("JSON user_buckets is NULL\n"); cJSON_Delete(root); goto curl_cleanup; } for (j = 0; j < cJSON_GetArraySize(user_buckets); j++) { cJSON *subitem = cJSON_GetArrayItem(user_buckets, j); if (subitem) { cJSON *price = cJSON_GetObjectItem(subitem, "total_price"); cJSON *price_value = NULL; if (price) { price_value = cJSON_GetObjectItem(price, "value"); } users[i].spent_today = price_value->valuedouble; } } if (root) cJSON_Delete(root); } curl_cleanup: // cleanup curl stuff curl_easy_cleanup(curl_handle); // free received data free(chunk.memory); // we're done with libcurl, so clean it up curl_global_cleanup(); } } /* Get call prices for active calls */ void get_active_call_prices() { MYSQL_RES *result; MYSQL_ROW row; int i = 0; char query[1024] = ""; for (i = 0; i < users_count; i++) { if (users[i].limit_reported) continue; if (users[i].spent_today + LIMIT_BEFORE_AC_CHECK > users[i].daily_spend_limit) { if (global_debug) { m2_log("Checking additional active calls prices for user: %d\n", users[i].id); } sprintf(query, "SELECT SUM((user_rate * billsec) / 60) AS price " "FROM (SELECT user_rate, TIME_TO_SEC(TIMEDIFF(NOW(), answer_time)) AS billsec FROM activecalls WHERE active = 1 AND user_id = %d) p", users[i].id); if (m2_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); if (result) { while (( row = mysql_fetch_row(result) )) { if (row[0]) { if (global_debug) { m2_log("Adding additonal price %f from active calls for user: %d\n", atof(row[0]), users[i].id); } users[i].spent_today += atof(row[0]); } } mysql_free_result(result); } } } } /* Function from http://curl.haxx.se/libcurl/c/getinmemory.html */ size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) { size_t realsize = size * nmemb; struct MemoryStruct *mem = (struct MemoryStruct *)userp; mem->memory = realloc(mem->memory, mem->size + realsize + 1); if (mem->memory == NULL) { // out of memory! m2_log("Not enough memory (realloc returned NULL)\n"); return 0; } memcpy(&(mem->memory[mem->size]), contents, realsize); mem->size += realsize; mem->memory[mem->size] = 0; return realsize; } /* Check limits */ void check_limits() { int i = 0; for (i = 0; i < users_count; i++) { if (users[i].daily_spend_limit > 0 && users[i].spent_today >= users[i].daily_spend_limit) { users[i].limit_reached = 1; } if (users[i].daily_spend_warning > 0 && users[i].spent_today >= users[i].daily_spend_warning) { users[i].warning_reached = 1; } } } /* Convert time by timezone offset */ void convert_time_by_tz_offset(char *date, char *buffer, float offset) { time_t converted_time; struct tm converted_time_tm, server_ptm; // adjust period_start according to server time memset(&converted_time_tm, 0, sizeof(struct tm)); strptime(date, DATE_FORMAT, &converted_time_tm); converted_time = mktime(&converted_time_tm) - (time_t)round((server_offset - offset)); gmtime_r(&converted_time, &server_ptm); strftime(buffer, 20, DATE_FORMAT, &server_ptm); } /* Get server GMT offset in decimal value */ void get_server_gmt_offset() { // get server gmt offset time_t t = time(NULL); struct tm lt = {0}; localtime_r(&t, <); server_offset = lt.tm_gmtoff / 60.0 / 60.0; m2_log("Server GMT offset: %0.2f\n", server_offset); } /* Report warnings */ void report_warnings() { int i = 0; char query[1024] = ""; for (i = 0; i < users_count; i++) { if (users[i].warning_reached && users[i].warning_reported == 0) { sprintf(query, "UPDATE users SET daily_spend_warning_reached = 1, changes_present = 1 WHERE id = %d", users[i].id); if (m2_mysql_query(query)) { exit(1); } } } } /* Report limits */ void report_limits() { int i = 0; char query[1024] = ""; for (i = 0; i < users_count; i++) { if (users[i].limit_reached && users[i].limit_reported == 0) { sprintf(query, "UPDATE users SET daily_spend_limit_reached = 1, changes_present = 1 WHERE id = %d", users[i].id); if (m2_mysql_query(query)) { exit(1); } } } }