// Author: Ricardas Stoma // Company: Kolmisoft // Year: 2013 // About: Script recalculates user's cdr and balance #define SCRIPT_VERSION "1.27" #define SCRIPT_NAME "m2_cdr_rerate" #define SQL_BATCH_SIZE 100 #define BUFFER_SIZE SQL_BATCH_SIZE * 360 + 600 #define RERATE_BACTHES 10000 #define PROGRESS_TIMER 10 #define M2_SQL_CONNECTIONS 2 #define ES_UPDATE_FILE_PATH "/tmp/m2_elasticsearch_bulk_update_query" #include "m2_functions.c" #include "m2_cdr_rerate.h" /* Main function */ int main(int argc, char *argv[]) { // check if debug is ON if (argc > 1) { if (strcmp(argv[1], "--debug") == 0) { DEBUG_RERATE = 1; } } int i = 0; // mark start time gettimeofday(&t0, NULL); gettimeofday(&_t0, NULL); _t = _t0.tv_sec; _ut0 = _t0.tv_usec; t = _t0.tv_sec; ut0 = _t0.tv_usec; m2_init("Starting M2 CDR Rerate script\n"); // mark task as failed on segmentation fault struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); sigemptyset(&sa.sa_mask); sa.sa_sigaction = error_handle; sa.sa_flags = SA_SIGINFO; sigaction(SIGSEGV, &sa, NULL); sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL); atexit(error_handle); get_elasticsearch_host(); // create empty es bulk update file system("echo '' > " ES_UPDATE_FILE_PATH); // we mark task as failed in case script will be interrupted // at the end of the script we will mark it as completed task_failed = 1; if (m2_task_get(1, NULL, &user_id, date_from, date_till, rerate_supplier_str, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) return 1; if (strlen(rerate_supplier_str)) { rerate_supplier = atoi(rerate_supplier_str); } m2_log("Rerate for: %s\n", rerate_supplier == 1 ? "supplier" : "client"); if (task_id) { get_calls_count(); if (total_calls) { m2_log("Total Calls retrieved: %lli\n", total_calls); } else { m2_log("No calls found\n"); // task did not failed task_failed = 0; if (!DEBUG_RERATE) { m2_task_finish(); } return 0; } if (DEBUG_RERATE == 0) m2_task_lock(); // create threads as 'detached' pthread_attr_t tattr; pthread_attr_init(&tattr); pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); m2_log("Starting rerating\n"); // enable progress timer pthread_t timer; pthread_create(&timer, &tattr, set_timer, NULL); for (i = 0; i < rerate_batches; i++) { m2_log("Getting calls from id: %lld LIMIT %d\n", last_call_id, RERATE_BACTHES); if (calls_get(i)) { calls_rerate(); } call_list_free(); reset_globals(); } // calculate deltas calculate_user_balance_diff(); // update user balance update_user_balance(); m2_log("--------------------------------------------------------------------------------\n"); m2_log("TOTAL CALLS: %lld\n", total_calls); m2_log("RERATED CALLS: %lld\n\n", rerate_supplier == 1 ? provider_diff : user_diff); m2_log("-------------------------------------------------------\n"); m2_log(" PRICE | OLD | NEW \n"); m2_log("-------------------------------------------------------\n"); if (rerate_supplier != 1) { m2_log(" Originator | %9.3f | %9.3f \n", old_user_price, new_user_price); } else { m2_log(" Terminator | %9.3f | %9.3f \n", old_provider_price, new_provider_price); } m2_log("-------------------------------------------------------\n\n"); m2_log("-------------------------------------------------------\n"); m2_log(" BILLSEC | OLD | NEW \n"); m2_log("-------------------------------------------------------\n"); if (rerate_supplier != 1) { m2_log(" Originator | %9lld | %9lld \n", old_user_billsec, new_user_billsec); } else { m2_log(" Terminator | %9lld | %9lld \n", old_provider_billsec, new_provider_billsec); } m2_log("-------------------------------------------------------\n\n"); m2_log("-------------------------------------------------------\n"); m2_log(" DELTA | PRICE DELTA | BILLSEC DELTA \n"); m2_log("-------------------------------------------------------\n"); if (rerate_supplier != 1) { m2_log(" Originator | %9.3f | %7lld \n", user_delta_price, new_user_billsec - old_user_billsec); } else { m2_log(" Terminator | %9.3f | %7lld \n", provider_delta_price, new_provider_billsec - old_provider_billsec); } m2_log("-------------------------------------------------------\n\n"); // terminate progress timer pthread_cancel(timer); pthread_attr_destroy(&tattr); if (DEBUG_RERATE == 0) m2_task_finish(); } m2_log("Closed connection to DB\n"); // mark end time gettimeofday(&t1, NULL); tt = t1.tv_sec; ut1 = t1.tv_usec; // calculate script runtime and rerate speed if (ut0 > ut1) { ut1 += 1000000; tt -= 1; } double duration = (int)(tt-t) + (double)(ut1-ut0)/1000000; if (total_calls > 0) { m2_log("Script run time: %f sec, [%f s/call]\n", duration, duration/total_calls); if (DEBUG_RERATE) printf("Script run time: %f sec. [%f s/call]\n", duration, duration/total_calls); } if (user_balance) { free(user_balance); user_balance = NULL; } // script ended m2_log("M2 CDR Rerate script completed\n"); // task did not failed task_failed = 0; return 0; } /* Functions */ /* rerates all calls */ void calls_rerate() { int counter = 0; call_data *current = NULL; if (rerate_supplier == 1) { strcat(update_query, update_supplier_query_beginning); } else { strcat(update_query, update_client_query_beginning); } current = call_data_start; while (current != NULL) { get_rate_details(current); current = current->next; counter++; if (counter > RERATE_BACTHES) { m2_log("Loop detected while fetching rates!\n"); exit(1); } } // reset counter counter = 0; current = call_data_start; while (current != NULL) { // rerate user part of cdr if (current->user_rate_found) { calculate_call_price(current, 1); } // rerate provider part of cdr if (current->provider_rate_found) { calculate_call_price(current, 2); } current = current->next; counter++; if (counter > RERATE_BACTHES) { m2_log("Loop detected while fetching rates!\n"); exit(1); } } // reset counter counter = 0; // find rerated calls current = call_data_start; while (current != NULL) { int user_need_update = 0; int provider_need_update = 0; m2_round(¤t->user_new_price, current->digits_used_for_price, current->digits_rounding_method); m2_round(¤t->provider_new_price, current->provider_digits_used_for_price, current->provider_digits_rounding_method); // check user price // check difference, because there may be very small rouding errors if (fabs(current->user_price - current->user_new_price) > 0.000001 && !(current->user_price == 0 && current->user_new_price == 0)) { user_need_update = 1; } else { // there may be a very small error due to rounding // so we just assing one to another so that they would be equal current->user_new_price = current->user_price; } // check user rate if (fabs(current->user_rate - current->user_new_rate) > 0.000001 && !(current->user_rate == 0 && current->user_new_rate == 0)) { user_need_update = 1; } // check user billsec if (current->user_billsec != current->user_new_billsec && !(current->user_billsec == 0 && current->user_new_billsec == 0)) { user_need_update = 1; } // check provider price // check difference, because there may be very small rouding errors if (fabs(current->provider_price - current->provider_new_price) > 0.000001 && !(current->provider_price == 0 && current->provider_new_price == 0)) { provider_need_update = 1; } else { // there may be a very small error due to rounding // so we just assing one to another so that they would be equal current->provider_new_price = current->provider_price; } // check provider rate if (fabs(current->provider_rate - current->provider_new_rate) > 0.000001 && !(current->provider_rate == 0 && current->provider_new_rate == 0)) { provider_need_update = 1; } // check provider billsec if (current->provider_billsec != current->provider_new_billsec && !(current->provider_billsec == 0 && current->provider_new_billsec == 0)) { provider_need_update = 1; } // check prefix if (strcmp(current->prefix, current->new_prefix) != 0) { user_need_update = 1; provider_need_update = 1; } if (user_need_update) { user_diff++; } if (provider_need_update) { provider_diff++; } if (DEBUG_RERATE == 0 && (user_need_update || provider_need_update)) { double uprice = current->user_price; double urate = current->user_rate; int ubillsec = current->user_billsec; double pprice = current->provider_price; double prate = current->provider_rate; int pbillsec = current->provider_billsec; if (user_need_update) { uprice = current->user_new_price; ubillsec = current->user_new_billsec; urate = current->user_new_rate; } if (provider_need_update) { pprice = current->provider_new_price; pbillsec = current->provider_new_billsec; prate = current->provider_new_rate; } if (rerate_supplier == 1) { sprintf(buffer, "(%lld,'%s',%f,%f,%d,'%s'),", current->call_id, current->calldate, pprice, prate, pbillsec, current->new_prefix); } else { sprintf(buffer, "(%lld,'%s',%f,%f,%d,'%s'),", current->call_id, current->calldate, uprice, urate, ubillsec, current->new_prefix); } strcat(update_query, buffer); char es_buffer[1024] = ""; FILE *fp = fopen(ES_UPDATE_FILE_PATH, "a+"); if (fp) { // format elasticsearch update query if (rerate_supplier == 1) { sprintf(es_buffer, "{ \"update\" : {\"_id\" : \"%lld\", \"_type\" : \"calls\", \"_index\" : \"m2\"} }\n" "{ \"doc\" : {\"provider_price\" : %f, \"provider_billsec\" : %d, \"prefix\" : \"%s\", \"provider_rate\" : %f} }\n", current->call_id, pprice, pbillsec, current->new_prefix, prate); } else { sprintf(es_buffer, "{ \"update\" : {\"_id\" : \"%lld\", \"_type\" : \"calls\", \"_index\" : \"m2\"} }\n" "{ \"doc\" : {\"user_price\" : %f, \"user_billsec\" : %d, \"prefix\" : \"%s\", \"user_rate\" : %f} }\n", current->call_id, uprice, ubillsec, current->new_prefix, urate); } // add query to file fprintf(fp, "%s", es_buffer); fclose(fp); } batch_counter++; } if (DEBUG_RERATE == 0 && (batch_counter == SQL_BATCH_SIZE)) { batch_counter = 0; // reset batch counter update_record(); // elasticsearch bulk update char system_cmd[1024] = ""; sprintf(system_cmd, "curl --connect-timeout 5 -s -XPOST \"%s:9200/_bulk\" --data-binary @" ES_UPDATE_FILE_PATH " &> /dev/null", elasticsearch_host); system(system_cmd); // clear file system("echo '' > " ES_UPDATE_FILE_PATH); } rerated_calls++; current = current->next; counter++; if (counter > RERATE_BACTHES) { m2_log("Loop detected while fetching rates!\n"); exit(1); } } // reset counter counter = 0; // sum new prices and billsec current = call_data_start; while (current != NULL) { int user_balance_id = current->user_id; double user_balance_price = current->user_new_price; if (rerate_supplier == 1) { user_balance_id = current->provider_id; user_balance_price = current->provider_new_price; } // calculate new user price for single user int user_balance_index = get_user_balance_index(user_balance_id); if (user_balance_index > -1) { update_user_new_balance(user_balance_index, user_balance_price); } new_user_price += current->user_new_price; new_user_billsec += current->user_new_billsec; new_provider_price += current->provider_new_price; new_provider_billsec += current->provider_new_billsec; current = current->next; counter++; if (counter > RERATE_BACTHES) { m2_log("Loop detected while fetching rates!\n"); exit(1); } } user_delta_price = new_user_price - old_user_price; provider_delta_price = new_provider_price - old_provider_price; // update last calls if (DEBUG_RERATE == 0 && (strlen(buffer) > 0)) { update_record(); // elasticsearch bulk update char system_cmd[1024] = ""; sprintf(system_cmd, "curl --connect-timeout 5 -s -XPOST \"%s:9200/_bulk\" --data-binary @" ES_UPDATE_FILE_PATH " &> /dev/null", elasticsearch_host); system(system_cmd); // clear file system("echo '' > " ES_UPDATE_FILE_PATH); } } /* free memory allocated for call_list dynamic list */ void call_list_free() { long long int i = 0; call_data *current, *next_node; current = call_data_start; while (current != NULL) { next_node = current->next; free (current); current = next_node; i++; } m2_log("Call ID Nodes freed: %lli\n", i); } /* get calls's ids from db and put them into dynamic list */ int calls_get(int i) { MYSQL_RES *result; MYSQL_ROW row; char sqlcmd[2048] = ""; char user_sql[64] = ""; char sign[64] = ">"; int connection = 0; long long int retrieved_calls = 0; call_data *node = NULL, *end_node = NULL; if (user_id > -1) { /* SQL to select calls for particular user */ if (rerate_supplier == 1) { sprintf(user_sql, "calls.provider_id = %d AND", user_id); } else { sprintf(user_sql, "calls.user_id = %d AND", user_id); } } if (first_iteration) { strcpy(sign, ">="); first_iteration = 0; last_call_id = 0; } sprintf(sqlcmd, "SELECT calls.id, user_price, devices.op_tariff_id, calldate, billsec, user_billsec, user_rate, prefix, " "calls.user_id, localized_dst, devices.grace_time, currencies.exchange_rate, provider_id, provider_price, dst_device.tp_tariff_id, provider_billsec, provider_rate, " "provider_currencies.exchange_rate, dst_device.grace_time, users.digits_used_for_price, users.digits_rounding_method, " "tp_user.digits_used_for_price, tp_user.digits_rounding_method " "FROM calls " "JOIN users ON users.id = calls.user_id " "JOIN devices ON devices.id = calls.src_device_id " "JOIN devices AS dst_device ON dst_device.id = calls.dst_device_id " "JOIN tariffs AS tariffs ON tariffs.id = devices.op_tariff_id " "JOIN tariffs AS provider_tariffs ON provider_tariffs.id = dst_device.tp_tariff_id " "LEFT JOIN users AS tp_user ON tp_user.id = calls.provider_id " "LEFT JOIN currencies ON (currencies.name = tariffs.currency) " "LEFT JOIN currencies AS provider_currencies ON (provider_currencies.name = provider_tariffs.currency) " "WHERE %s calldate BETWEEN '%s' AND '%s' " "AND calls.id %s %lld AND provider_id > 0 " "AND real_billsec > 0 ORDER BY id LIMIT %d", user_sql, date_from, date_till, sign, last_call_id, RERATE_BACTHES); m2_log("%s\n", sqlcmd); if (m2_mysql_query_multi(sqlcmd, &connection)) { return 0; } else { result = mysql_store_result(&mysql_multi[connection]); mysql_connections[connection] = 0; if (result) { // there are rows while ((row = mysql_fetch_row(result))) { if (row[0]) { // form dynamic list node = (call_data *) malloc(sizeof(call_data)); memset(node, 0, sizeof(call_data)); if (call_data_start == NULL) { call_data_start = node; } if (row[0]) node->call_id = atoll(row[0]); if (row[1]) node->user_price = atof(row[1]); if (row[1]) node->user_new_price = atof(row[1]); if (row[2]) node->user_tariff = atoi(row[2]); if (row[3]) strcpy(node->calldate, row[3]); if (row[4]) node->billsec = atoi(row[4]); if (row[5]) node->user_billsec = atoi(row[5]); if (row[5]) node->user_new_billsec = atoi(row[5]); if (row[6]) node->user_rate = atof(row[6]); if (row[7]) strcpy(node->prefix, row[7]); if (row[7]) strcpy(node->new_prefix, row[7]); if (row[8]) node->user_id = atoi(row[8]); if (row[9]) strcpy(node->dst, row[9]); if (row[10]) node->user_grace_time = atoi(row[10]); else node->user_grace_time = 0; if (row[11]) node->user_exchange_rate = atof(row[11]); else node->user_exchange_rate = 1; if (row[12]) node->provider_id = atoi(row[12]); if (row[13]) node->provider_price = atof(row[13]); if (row[13]) node->provider_new_price = atof(row[13]); if (row[14]) node->provider_tariff = atoi(row[14]); if (row[15]) node->provider_billsec = atoi(row[15]); if (row[15]) node->provider_new_billsec = atoi(row[15]); if (row[16]) node->provider_rate = atof(row[16]); if (row[17]) node->provider_exchange_rate = atof(row[17]); if (row[18]) node->provider_grace_time = atoi(row[18]); if (row[19]) node->digits_used_for_price = atoi(row[19]); if (row[20]) strcpy(node->digits_rounding_method, row[20]); if (row[21]) node->provider_digits_used_for_price = atoi(row[21]); if (row[22]) strcpy(node->provider_digits_rounding_method, row[22]); // fix broken exchange rates if (node->user_exchange_rate == 0) node->user_exchange_rate = 1; if (node->provider_exchange_rate == 0) node->provider_exchange_rate = 1; int user_balance_id = node->user_id; double user_balance_price = node->user_price; if (rerate_supplier == 1) { user_balance_id = node->provider_id; user_balance_price = node->provider_price; } // calculate user price for single user int user_balance_index = get_user_balance_index(user_balance_id); if (user_balance_index > -1) { update_user_old_balance(user_balance_index, user_balance_price); } else { add_user_balance(user_balance_id, user_balance_price); } // calculate total user and provider prices and billsec old_user_price += node->user_price; old_user_billsec += node->user_billsec; old_provider_price += node->provider_price; old_provider_billsec += node->provider_billsec; node->next = NULL; if (end_node) { end_node->next = node; } end_node = node; } retrieved_calls++; } if (node) { last_call_id = node->call_id; } else { last_call_id += RERATE_BACTHES; } mysql_free_result(result); } } m2_log("Total Calls retrieved for batch (%d): %lli\n", i, retrieved_calls); m2_log("Memory taken: %lli bytes (%lu/node)\n", (long long int)sizeof(call_data)*retrieved_calls, (long unsigned int)sizeof(call_data)); return retrieved_calls; } void calculate_expected_time(char *datetime, int seconds) { time_t t; struct tm tmp; char tmp_str[100]; t = time(NULL) + seconds; localtime_r(&t, &tmp); strftime(tmp_str, sizeof(tmp_str), DATE_FORMAT, &tmp); strcpy(datetime, tmp_str); } int is_date(char *date) { int i = 0; if (strlen(date) != 19) return 0; for (i = 0; i < 19; i++) { if (i == 4 || i == 7 || i == 10 || i == 13 || i == 16) continue; if (date[i] > '9' || date[i] < '0') { return 0; } } return 1; } // timer periodically updates current progress void *set_timer() { char sqlcmd[1024] = ""; long int count = 1; long int time_left; double calls_per_sec; double progress_percent; char datetime[100]; int connection = 0; printf("--------------------------------------------------------------------------------\n"); printf(" Progress | Completed | Calls per sec | Time left | Expected to finish at \n"); printf("--------------------------------------------------------------------------------\n"); while (1) { long long int progress_rerated_calls = rerated_calls; if (progress_rerated_calls == 0) progress_rerated_calls = 1; sleep(PROGRESS_TIMER); progress_percent = (double)((double)progress_rerated_calls/total_calls)*100; calls_per_sec = (double)progress_rerated_calls/count/PROGRESS_TIMER; if (calls_per_sec > 0) { time_left = ceil((double)(total_calls - progress_rerated_calls) / calls_per_sec); } else { time_left = 1; } calculate_expected_time(datetime, time_left); if (!is_date(datetime)) { strcpy(datetime, "0000-00-00 00:00:00"); } sprintf(sqlcmd, "UPDATE background_tasks SET status = 2, percent_completed = %.3f, expected_to_finish_at = '%s', updated_at = NOW() WHERE id = %i", progress_percent, datetime, task_id); if (m2_mysql_query_multi(sqlcmd, &connection)) { m2_log("Query failed: %s\n", sqlcmd); exit(1); } mysql_connections[connection] = 0; printf(" %.5lld/%.5lld | %6.2f %% | %9.2f | %4ld sec | %s\n", progress_rerated_calls, total_calls, progress_percent, calls_per_sec, time_left, datetime); count += PROGRESS_TIMER; } pthread_exit(NULL); } void update_record() { update_query[strlen(update_query) - 1] = '\0'; // remove last comma separator if (rerate_supplier) { strcat(update_query, update_supplier_query_ending); } else { strcat(update_query, update_client_query_ending); } int connection = 0; if (m2_mysql_query_multi(update_query, &connection)) { exit(1); } mysql_connections[connection] = 0; memset(update_query, 0, sizeof(update_query)); memset(buffer, 0, sizeof(buffer)); if (rerate_supplier == 1) { strcat(update_query, update_supplier_query_beginning); } else { strcat(update_query, update_client_query_beginning); } } // function that handles segmentation fault and regular returns void error_handle() { static int marked = 0; if (marked == 0) { if (task_failed && DEBUG_RERATE == 0) { m2_task_unlock(4); // mark task as failed } marked = 1; } exit(1); } void get_rate_details(call_data *node) { MYSQL_RES *result; MYSQL_ROW row; char prefix_sql_line[9000] = ""; char calldate[20] = ""; int connection = 0; char sqlcmd[10000] = ""; struct tm tmm; char daytype[3] = "WD"; char call_time[9] = "00:00:00"; // check if call was made during a free day or a work day strptime(node->calldate, DATE_FORMAT, &tmm); if (tmm.tm_wday == 0 || tmm.tm_wday == 6) { strcpy(daytype, "FD"); } strcpy(call_time, node->calldate + 11); strcpy(calldate, node->calldate); int tariff_id; double *rate; int *increment; int *min_time; double *connection_fee; char *prefix = node->new_prefix; int *rate_found; double *exchange_rate; if (rerate_supplier != 1) { tariff_id = node->user_tariff; rate = &node->user_new_rate; increment = &node->user_increment; min_time = &node->user_min_time; connection_fee = &node->user_connection_fee; rate_found = &node->user_rate_found; exchange_rate = &node->user_exchange_rate; } else { tariff_id = node->provider_tariff; rate = &node->provider_new_rate; increment = &node->provider_increment; min_time = &node->provider_min_time; connection_fee = &node->provider_connection_fee; rate_found = &node->provider_rate_found; exchange_rate = &node->provider_exchange_rate; } format_prefix_sql(prefix_sql_line, sizeof(prefix_sql_line), node->dst); sprintf(sqlcmd, "SELECT rates.prefix, ratedetails.rate, ratedetails.increment_s, ratedetails.min_time, ratedetails.connection_fee as 'cf' " "FROM rates " "JOIN ratedetails ON (ratedetails.rate_id = rates.id AND (ratedetails.daytype = '%s' OR ratedetails.daytype = '') AND '%s' BETWEEN ratedetails.start_time AND ratedetails.end_time) " "WHERE rates.tariff_id = %i AND rates.prefix IN (%s) AND (rates.effective_from <= '%s' OR rates.effective_from IS NULL) " "ORDER BY LENGTH(rates.prefix) DESC, rates.effective_from DESC " "LIMIT 1", daytype, call_time, tariff_id, prefix_sql_line, calldate); if (m2_mysql_query_multi(sqlcmd, &connection)) { exit(1); } else { result = mysql_store_result(&mysql_multi[connection]); mysql_connections[connection] = 0; } if (result) { while (( row = mysql_fetch_row(result) )) { if (row[0]) { if (rerate_supplier != 1) { strcpy(prefix, row[0]); } else { if (strlen(row[0]) > strlen(prefix)) { strcpy(prefix, row[0]); } } } if (row[1]) *rate = atof(row[1]); else *rate = 0; if (row[2]) *increment = atoi(row[2]); else *increment = 1; if (row[3]) *min_time = atoi(row[3]); else *min_time = 0; if (row[4]) *connection_fee = atof(row[4]); else *connection_fee = 0; *rate = *rate / *exchange_rate; *rate_found = 1; } mysql_free_result(result); } } void calculate_call_price(call_data *node, int user) { double rate; int min_time; int increment; double connection_fee; int *call_billsec; double *call_price; int billsec = node->billsec; // check grace time if (user == 1) { if (node->user_grace_time > 0) { if (node->user_grace_time >= billsec) { billsec = 0; } } } else { if (node->provider_grace_time > 0) { if (node->provider_grace_time >= billsec) { billsec = 0; } } } if (user == 1) { rate = node->user_new_rate; min_time = node->user_min_time; increment = node->user_increment; connection_fee = node->user_connection_fee; call_billsec = &node->user_new_billsec; call_price = &node->user_new_price;; } else { rate = node->provider_new_rate; min_time = node->provider_min_time; increment = node->provider_increment; connection_fee = node->provider_connection_fee; call_billsec = &node->provider_new_billsec; call_price = &node->provider_new_price; } // possible error fixing if (increment < 1) increment = 1; // count seconds for user wholesale if (!(billsec % increment)) { *call_billsec = ceilf(billsec / increment) * increment; } else { *call_billsec = (ceilf(billsec / increment) + 1) * increment; } if ((min_time) && (*call_billsec < min_time)) { *call_billsec = min_time; } *call_price = (rate * *call_billsec) / 60; *call_price += connection_fee; } void format_prefix_sql(char *prefixes, int prefixes_len, const char *number) { char buffer[256] = ""; int i; memset(buffer, '\0', sizeof(buffer)); memset(prefixes, '\0', prefixes_len); for(i = 0; i < strlen(number); i++) { strcat(prefixes, "'"); strncpy(buffer, number, i + 1); strcat(prefixes, buffer); strcat(prefixes, "'"); if( i < (strlen(number) - 1)) strcat(prefixes, ","); memset(buffer, '\0', sizeof(buffer)); } } void get_calls_count() { MYSQL_RES *result; MYSQL_ROW row; int connection = 0; char sqlcmd[2048] = ""; char user_sql[512] = ""; if (user_id > -1) { // SQL to select calls for particular user if (rerate_supplier == 1) { sprintf(user_sql, "calls.provider_id = %d AND", user_id); } else { sprintf(user_sql, "calls.user_id = %d AND", user_id); } } m2_log("Calculating total calls\n"); sprintf(sqlcmd, "SELECT count(calls.id), min(calls.id) " "FROM calls " "WHERE %s calldate BETWEEN '%s' AND '%s' AND provider_id > 0 AND real_billsec > 0", user_sql, date_from, date_till); m2_log("%s\n", sqlcmd); if (m2_mysql_query_multi(sqlcmd, &connection)) { exit(1); } result = mysql_store_result(&mysql_multi[connection]); mysql_connections[connection] = 0; if (result) { while (( row = mysql_fetch_row(result) )) { if (row[0]) total_calls = atoll(row[0]); if (row[1]) last_call_id = atoll(row[1]); } mysql_free_result(result); } rerate_batches = ceil((total_calls / (float)RERATE_BACTHES)); } void reset_globals() { batch_counter = 0; memset(buffer, 0, 4096); memset(update_query, 0, BUFFER_SIZE); call_data_start = NULL; } int get_user_balance_index(int user_id) { int index = -1; int i; for (i = 0; i < user_balance_count; i++) { if (user_balance[i].user_id == user_id) return i; } return index; } void add_user_balance(int user_id, double user_price) { user_balance = realloc(user_balance, (user_balance_count + 1) * sizeof(user_balance_t)); user_balance[user_balance_count].user_id = user_id; user_balance[user_balance_count].new_price = 0; user_balance[user_balance_count].old_price = user_price; user_balance[user_balance_count].diff_price = 0.0; user_balance_count++; } void update_user_old_balance(int index, double user_price) { user_balance[index].old_price += user_price; } void update_user_new_balance(int index, double user_price) { user_balance[index].new_price += user_price; } void calculate_user_balance_diff() { int i; for (i = 0; i < user_balance_count; i++) { user_balance[i].diff_price = user_balance[i].old_price - user_balance[i].new_price; } } void update_user_balance() { int connection = 0; char sqlcmd[1024] = ""; int i; int updated = 0; m2_log("Updating user balance. Prices will be calculated for period: %s - %s\n", date_from, date_till); for (i = 0; i < user_balance_count; i++) { if (fabs(user_balance[i].diff_price) > 0.000001) { updated = 1; if (user_balance[i].diff_price > 0) { m2_log("user_id: %d, old_price, %f, new_price: %f, balance will be %s by %f\n", user_balance[i].user_id, user_balance[i].old_price, user_balance[i].new_price, rerate_supplier == 1 ? "decreased" : "increased", user_balance[i].diff_price); } else { m2_log("user_id: %d, old_price, %f, new_price: %f, balance will be %s by %f\n", user_balance[i].user_id, user_balance[i].old_price, user_balance[i].new_price, rerate_supplier != 1 ? "decreased" : "increased", -1 * user_balance[i].diff_price); } if (DEBUG_RERATE == 0) { if (rerate_supplier == 1) { sprintf(sqlcmd,"UPDATE users SET balance = balance - %f WHERE id = %d", user_balance[i].diff_price, user_balance[i].user_id); } else { sprintf(sqlcmd,"UPDATE users SET balance = balance + %f WHERE id = %d", user_balance[i].diff_price, user_balance[i].user_id); } if (m2_mysql_query_multi(sqlcmd, &connection)) { exit(1); } mysql_connections[connection] = 0; } } } if (updated == 0) { m2_log("Nothing to update\n"); } } void m2_round(double *price, int digits, char *method) { int divider = 10; if (digits < 2) digits = 2; if (digits > 8) digits = 8; divider = pow(divider, digits); if (strcmp(method, "mathematical") == 0) { *price = roundf(*price * divider) / divider; } else if (strcmp(method, "down") == 0) { *price = floorf(*price * divider) / divider; } else { *price = ceilf(*price * divider) / divider; } }