// Author: Zilvinas Verseckas // Company: Kolmisoft // Year: 2017 // About: Script compares local and external CDRs #include "m2_cdr_compare.h" int main(int argc, char **argv) { m2_init("Starting M2 CDR Compare script\n"); // Releases resources at exit atexit(cleanup_memory); // Status flag for some function calls int status = 0; // Retrieves a background task if there are any in a queue char data1[12] = ""; if (m2_task_get(8, NULL, NULL, data1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL) == 0 && task_id) { m2_log("CDR Compare Tasks found id: %d\n", task_id); m2_task_lock(); } else { m2_log("No CDR Compare Tasks found\n"); return 1; } dispute_id = atoi(data1); // Stores information about a dispute struct cdr_dispute dispute; if (get_cdr_dispute(&dispute, dispute_id)) { m2_task_unlock(4); m2_log("ERROR: Could not load CDR Dispute id: %d\n", dispute_id); return 1; } m2_log("CDR Dispute id: %d loaded\n", dispute_id); update_dispute_status(2, dispute_id); print_dispute(&dispute); dispute_in_progress = 1; // Checks if a dispute is valid and is possible to proceed with if (validate_dispute(&dispute)) { return 1; } m2_log("CDR Dispute in progress\n"); m2_log("Retrieving CDRs from %s till %s\n", dispute.period_start, dispute.period_end); // Prepares a virtual file for local CDRs struct vfile *local_cdrs = get_local_cdrs(&dispute, &status); if (local_cdrs == NULL || status) { dispute_message_id = 1; m2_log("ERROR: Could not retrieve Local CDRs\n"); return 1; } m2_log("Virtual file for Local CDRs created with %d lines. Size on heap: %luB\n", local_cdrs->size, sizeof(struct vfile) + local_cdrs->size * sizeof(struct vline)); if (local_cdrs->size >= VFILE_MAX_SIZE) { m2_log("WARNING: Maximum number of CDRs on file reached: %d\n", VFILE_MAX_SIZE); } // Prepares a virtual file for external CDRs struct vfile *external_cdrs = get_external_cdrs(&dispute, &status); if (external_cdrs == NULL || status) { dispute_message_id = 2; m2_log("ERROR: Could not retrieve External CDRs\n"); return 1; } m2_log("Virtual file for External CDRs created with %d lines. Size on heap: %luB\n", external_cdrs->size, sizeof(struct vfile) + external_cdrs->size * sizeof(struct vline)); if (external_cdrs->size >= VFILE_MAX_SIZE) { m2_log("WARNING: Maximum number of CDRs on file reached: %d\n", VFILE_MAX_SIZE); } // Marks the duplicates m2_log("Checking for duplicates in Local and External CDRs...\n"); int local_dups = mark_duplicates(local_cdrs, 70); if (local_dups >= 0) { m2_log("Local duplicates found: %d\n", local_dups); } else { m2_log("ERROR: Local duplicates check failed\n"); } int external_dups = mark_duplicates(external_cdrs, 72); if (external_dups >= 0) { m2_log("External duplicates found: %d\n", external_dups); } else { m2_log("ERROR: External duplicates check failed\n"); } // Detects time shift dispute.time_shift = find_time_shift(local_cdrs, external_cdrs, &dispute, &status); if (status) { dispute_message_id = 3; m2_log("ERROR: Could not find time shift\n"); return 1; } m2_log("Time shift between Local and External CDRs: %lds\n", dispute.time_shift); // External time shift from UTC time_t t = time(NULL); struct tm lt = { 0 }; localtime_r(&t, <); // Time zone logic (L_utc/E_utc - Local/External offset from UTC, dt = E_utc - L_utc): // E_utc = dt - |L_utc| if L_utc < 0 // E_utc = dt + L_utc if L_utc >= 0 dispute.time_zone = dispute.time_shift + abs(lt.tm_gmtoff) * (lt.tm_gmtoff < 0 ? -1 : 1); m2_log("External CDRs offset from UTC: %lds\n", dispute.time_zone); // Comparing CDRs m2_log("Comparing CDRs...\n"); compare_cdrs(local_cdrs, external_cdrs, &dispute); // Deallocate resources free_vfile(local_cdrs); free_vfile(external_cdrs); m2_log("Dispute completed\n"); finish_dispute(&dispute); m2_task_finish(); dispute_in_progress = 0; return 0; } /* Detects a time shift between the local and external CDRs. */ time_t find_time_shift(struct vfile *local_cdrs, struct vfile *external_cdrs, const struct cdr_dispute *dispute, int *status) { // Loop indexes int i = 0, j = 0; // Array of time shifts and their frequencies pair_t **shifts = NULL; // Most frequent shift index int max_freq_shift = 0; int shift_found = 0; int shift_range = SHIFT_CHECK_RANGE; // Gets the first local CDRs line struct vline *local_cdr = local_cdrs->head; if (SHIFT_CHECK_RANGE > local_cdrs->size) { m2_log("WARNING: Number of External CDRs is lower than %d. Time shift might not be determined or incorrect!\n", SHIFT_CHECK_RANGE); shift_range = local_cdrs->size / 2; } while (local_cdr != NULL && !shift_found) { // Finds a probably matching external CDR struct vline *matched_external = find_vline(external_cdrs, local_cdr); i++; if (matched_external != NULL) { // Possible time shift int shift = matched_external->start_time - local_cdr->start_time, k; // Increments a difference if it already exists. // j represents the size of a shifts array for (k = 0; k < j; k++) { if (shifts[k]->fst == shift) { shifts[k]->snd++; break; } } // Appends a new possible shift to a list. // If k == j it means that the shift is not yet present if (k == j) { shifts = realloc(shifts, (j + 1) * sizeof(pair_t*)); if (shifts == NULL) { *status = 1; break; } shifts[j] = malloc(sizeof(pair_t)); if (shifts[j] == NULL) { *status = 1; break; } shifts[j]->fst = shift; shifts[j]->snd = 1; j++; } // Checks if we have a time shift that is more frequent than 50% of checked CDRs. for (k = 0; k < j; k++) { if (shifts[k]->snd > shifts[max_freq_shift]->snd) { max_freq_shift = k; } } if (i >= shift_range && shifts[max_freq_shift]->snd > i / 2 + 1) { shift_found = 1; } } local_cdr = local_cdr->next; } time_t result = 0; if (shift_found) { result = shifts[max_freq_shift]->fst; *status = 0; } else if (!*status) { *status = 1; } // Release memory for (i = 0; i < j; i++) { free(shifts[i]); } free(shifts); return result; } /* Performs the comparison, data mapping and database update operations. */ int compare_cdrs(struct vfile *local_cdrs, struct vfile *external_cdrs, const struct cdr_dispute *dispute) { struct vline *local_cdr = local_cdrs->head; int count = 0; // Creates an empty map of CDR triples [id, call_id, code] cdr_tripple_t *map = calloc(SQL_BATCH_SIZE, sizeof(cdr_tripple_t)); // Memory error if (map == NULL) { return 1; } // Finds, maps and deletes exactly matching CDRs while (local_cdr != NULL) { struct vline *matched_external = exactly_match_vline(external_cdrs, local_cdr, dispute); if (matched_external != NULL) { // Circular index from range [0;SQL_BATCH_SIZE] int index = count % SQL_BATCH_SIZE; // Maps a matching external CDR to a local CDR map[index].id = matched_external->id; map[index].call_id = local_cdr->id; map[index].code = 10; count++; // Updates the MySQL database if a batch is full if (index == SQL_BATCH_SIZE - 1) { update_disputed_cdrs(map, index + 1, dispute->id); } // Reduces the file size (removes a match_vline from a list) free(matched_external); // Marks the local cdr to skip local_cdr->skip = 1; } local_cdr = local_cdr->next; } // Rewinds the local list after marking exact matches local_cdr = local_cdrs->head; // Finds, maps and deletes all other CDRs (tolerated matches, mismatches) while (local_cdr != NULL) { if (!local_cdr->skip) { int code = 0; struct vline *matched_external = match_vline(external_cdrs, local_cdr, dispute, &code); if (matched_external != NULL) { // Circular index from range [0;SQL_BATCH_SIZE] int index = count % SQL_BATCH_SIZE; // Maps a matching external CDR to a local CDR map[index].id = matched_external->id; map[index].call_id = local_cdr->id; map[index].code = code; count++; // Updates the MySQL database if a batch is full if (index == SQL_BATCH_SIZE - 1) { update_disputed_cdrs(map, index + 1, dispute->id); } // Reduces the file size (removes the match_vline from a list) free(matched_external); local_cdr->skip = 1; } } local_cdr = local_cdr->next; } // Updates the last batch (might be not full) update_disputed_cdrs(map, count % SQL_BATCH_SIZE, dispute->id); m2_log("Matched CDRs: %d\n", count); // Updates the CDRs with no match store_faulty_cdrs(external_cdrs, dispute->id, 1); // 1 - indicates External CDRs store_faulty_cdrs(local_cdrs, dispute->id, 0); // 0 - indicates Local CDRs free(map); return 0; } /* Marks the duplicates on a file and returns the number of them */ int mark_duplicates(struct vfile *file, const int code) { if (file == NULL) { return -1; } struct vline *curr_i = file->head; int dups = 0; while(curr_i != NULL) { // Skip lines which already have a code (errors) if (curr_i->code == 0) { // Start time for knowing when to break the search int search_limit = curr_i->start_time; struct vline *curr_j = curr_i->next; while(curr_j != NULL) { // Since data is sorted we can skip all the lines if start time not matched if (curr_j->start_time != search_limit) { break; } // If a line is identical to another line, give it a duplicate code if (!curr_j->code && curr_i->connected == curr_j->connected && curr_i->billsec == curr_j->billsec && curr_i->start_time == curr_j->start_time && curr_i->answer_time == curr_j->answer_time && dequal(curr_i->cost, curr_j->cost) && !strcmp(curr_i->src, curr_j->src) && !strcmp(curr_i->dst, curr_j->dst)) { curr_j->code = code; dups++; } curr_j = curr_j->next; } } curr_i = curr_i->next; } return dups; } /* Exactly matches a virtual line in a virtual file with respect to dispute settings. Set appropriate codes. */ struct vline *exactly_match_vline(struct vfile *file, const struct vline *to_find, const struct cdr_dispute *dispute) { if (file == NULL || to_find == NULL) { return NULL; } if (file->tail == NULL) { return NULL; } struct vline *curr = file->head; struct vline *prev = NULL; while (curr != NULL) { // If all fields are identical if (curr->code == 0 && curr->start_time - to_find->start_time == dispute->time_shift && !strcmp(curr->src, to_find->src) && !strcmp(curr->dst, to_find->dst) && dequal(to_find->cost, curr->cost) && to_find->billsec == curr->billsec && to_find->connected == curr->connected) { // Removes a matched CDR if (curr == file->head) { file->head = curr->next; } else if (curr == file->tail) { file->tail = prev; prev->next = NULL; } else { prev->next = curr->next; } file->size--; return curr; } prev = curr; curr = curr->next; } return NULL; } /* Matches a virtual line in a virtual file with respect to dispute settings. Set appropriate codes. */ struct vline *match_vline(struct vfile *file, const struct vline *to_find, const struct cdr_dispute *dispute, int *code) { if (file == NULL || to_find == NULL) { return NULL; } if (file->tail == NULL) { return NULL; } struct vline *curr = file->head; struct vline *prev = NULL; *code = 0; while (curr != NULL) { // If start_time, source, and destination are identical if (curr->code == 0 && curr->start_time - to_find->start_time == dispute->time_shift && !strcmp(curr->src, to_find->src) && !strcmp(curr->dst, to_find->dst)) { double cost_delta = fabs(to_find->cost - curr->cost); // Disposition mismatches if (to_find->connected > 0 && curr->connected == 0) { *code = 42; // Connected only externally } else if (to_find->connected == 0 && curr->connected > 0) { *code = 40; // Connected only locally // Tolerated mismatches } else if (to_find->billsec == curr->billsec && (dequal(cost_delta, dispute->cost_tolerance) || cost_delta < dispute->cost_tolerance)) { *code = 21; // Tolerated cost } else if (dequal(to_find->cost, curr->cost) && abs(to_find->billsec - curr->billsec) <= dispute->billsec_tolerance) { *code = 22; // Tolerated billsec } else if (abs(to_find->billsec - curr->billsec) <= dispute->billsec_tolerance && (dequal(cost_delta, dispute->cost_tolerance) || cost_delta < dispute->cost_tolerance)) { *code = 23; // Tolerated billsec and cost // Mismatches } else if (cost_delta > dispute->cost_tolerance) { *code = 31; // Cost mismatch } else if (abs(to_find->billsec - curr->billsec) > dispute->billsec_tolerance) { *code = 32; // Billsec mismatch } else if (abs(to_find->billsec - curr->billsec) > dispute->billsec_tolerance && cost_delta > dispute->cost_tolerance) { *code = 33; // Billsec and cost mismatch } } // Removes a matched CDR if (*code != 0) { if (curr == file->head) { file->head = curr->next; } else if (curr == file->tail) { file->tail = prev; prev->next = NULL; } else { prev->next = curr->next; } file->size--; return curr; } prev = curr; curr = curr->next; } return NULL; } /* Initializes an empty virtual file. */ struct vfile *init_vfile(void) { struct vfile *file = calloc(1, sizeof(struct vfile)); if (file == NULL) { return NULL; } file->tail = NULL; file->head = NULL; file->size = 0; return file; } /* Rewinds a virtual file by resetting skipped lines. */ void rewind_vfile(struct vfile *file, int depth) { if (file == NULL) { return; } struct vline *curr = file->head; while(curr != NULL && depth > 0) { if (curr->skip) { curr->skip = 0; --depth; } curr = curr->next; } } /* Makes a virtual line from a MySQL result row */ struct vline *make_vline(const MYSQL_ROW line) { struct vline *new_vline = calloc(1, sizeof(struct vline)); if (new_vline == NULL || !line) { return NULL; } new_vline->errors = 0; // All lines are visible by default new_vline->skip = 0; // Sets a source number if (line[0] && strlen(line[0])) { strncpy(new_vline->src, line[0], sizeof(new_vline->src)); } else { new_vline->errors++; } // Sets a destination number if (line[1] && strlen(line[1])) { strncpy(new_vline->dst, line[1], sizeof(new_vline->dst)); } else { new_vline->errors++; } // Sets a start_time time-stamp if (line[2]) { new_vline->start_time = atoi(line[2]); } else { new_vline->errors++; } // Sets an answer_time time-stamp if (line[3]) { new_vline->answer_time = atoi(line[3]); } // Sets an end_time time-stamp if (line[4]) { new_vline->end_time = atoi(line[4]); } // Sets a billsec in seconds if (line[5]) { new_vline->billsec = atoi(line[5]); } else { new_vline->errors++; } // Sets a cost of a call if (line[6]) { new_vline->cost = strtod(line[6], NULL); } else { new_vline->errors++; } // Determines if a call is connected if (line[7]) { if (strcmp(line[7], "ANSWERED") == 0) { new_vline->connected = 0; } else if (strcmp(line[7], "NO ANSWER") == 0) { new_vline->connected = 1; } else if (strcmp(line[7], "BUSY") == 0) { new_vline->connected = 2; } else if (strcmp(line[7], "FAILED") == 0) { new_vline->connected = 3; } else { new_vline->errors++; } } else { new_vline->errors++; } if (line[8]) new_vline->id = atoll(line[8]); if (new_vline->errors) { // Marks errors new_vline->code = 99; } else { // Default code for not compared CDRs new_vline->code = 00; } return new_vline; } /* Appends a virtual line a to a virtual file. */ void append_vline(struct vfile *file, struct vline *new_vline) { if (file == NULL || new_vline == NULL) { return; } file->size++; if (file->tail == NULL) { file->head = file->tail = new_vline; return; } file->tail->next = new_vline; file->tail = new_vline; } /* Finds a virtual line in a virtual file by source and destination. */ struct vline *find_vline(struct vfile *file, const struct vline *to_find) { if (file == NULL || to_find == NULL) { return NULL; } if (file->tail == NULL) { return NULL; } struct vline *curr = file->head; while (curr != NULL) { if (!curr->skip) { if (!strcmp(curr->src, to_find->src) && !strcmp(curr->dst, to_find->dst)) { curr->skip = 1; return curr; } } curr = curr->next; } return NULL; } /* Deletes a virtual file contents, frees memory. */ void free_vfile(struct vfile *file) { if (file == NULL) { return; } struct vline *curr = file->head; struct vline *prev = NULL; while(curr != NULL) { prev = curr; curr = curr->next; file->head = curr; free(prev); } free(file); } /* Validates a dispute data */ int validate_dispute(const struct cdr_dispute *dispute) { if (dispute == NULL) { return 1; } // Invalid date range if (dispute->period_start_t > dispute->period_end_t) { m2_log("ERROR: Period start is greater than period end!\n"); return 1; } // Maximum limit of digits to compare reached if (dispute->cmp_last_src_digits > MAX_CMP_SRC) { m2_log("ERROR: Cannot compare more than %d last src digits!\n", MAX_CMP_SRC); return 1; } if (dispute->cmp_last_dst_digits > MAX_CMP_DST) { m2_log("ERROR: Cannot compare more than %d last dst digits!\n", MAX_CMP_DST); return 1; } // Invalid exchange rate if (dispute->exchange_rate <= 0) { m2_log("ERROR: Exchange rate (%.6f) is invalid!\n", dispute->exchange_rate); return 1; } // Billsec tolerance is possibly too high for matching if (dispute->billsec_tolerance > 3) { m2_log("WARNING: Bigger billsec tolerance might cause inaccurate results!\n"); } // SCR/DST compares too few digits if (dispute->cmp_last_dst_digits < 4 || dispute->cmp_last_src_digits < 4) { m2_log("WARNING: Fewer last digits might cause inaccurate results!\n"); } return 0; } /* Adds a virtual file to the Database */ int store_faulty_cdrs(struct vfile *file, const int dispute_id, const int type) { struct vline *curr = file->head; int count = 0, index = 0; // Creates an empty map of CDR triples pair_t *map = calloc(SQL_BATCH_SIZE, sizeof(pair_t)); if (map == NULL) { return 1; } // For each CDR with no match (file now contains only them) while (curr != NULL) { // All CDRs that are present on file and have no code // indicate that they have no match if (curr->code == 0 && (type || !curr->skip)) { curr->code = 90; } // Faulty CDRs have a code greater than or equal to 70 if (curr->code >= 70) { // Circular index from range [0;SQL_BATCH_SIZE] index = count % SQL_BATCH_SIZE; count++; // Map a matching External CDR to a Local CDR map[index].fst = curr->id; map[index].snd = curr->code; // Update the MySQL database if a batch is full if (index == SQL_BATCH_SIZE - 1) { insert_disputed_cdrs(map, index + 1, dispute_id, type); } } curr = curr->next; } // Insert the last batch insert_disputed_cdrs(map, count % SQL_BATCH_SIZE, dispute_id, type); m2_log("%s CDRs with no match: %d\n", type ? "External" : "Local", count); free(map); return 0; } /* Updates the external CDRs' in the DB */ int update_disputed_cdrs(const cdr_tripple_t *map, const int size, const int dispute_id) { if (!size) { return 0; } // Ending query char query_end[100] = "ON DUPLICATE KEY UPDATE call_id=VALUES(call_id),mismatch_type=VALUES(mismatch_type)"; // Dynamically allocates memory since stack might be not enough when batch size if big char *query = calloc(1, SQL_BATCH_SIZE * 64 + 200); strcpy(query, "INSERT INTO disputed_cdrs (id,call_id,mismatch_type,dispute_id) VALUES "); // Buffer to an insert triple char block[64] = ""; int i = 0; // Append each triple to a query for (i = 0; i < size; i++) { sprintf(block, "(%lld,%lld,'%02d',%d),", map[i].id, map[i].call_id, map[i].code, dispute_id); strcat(query, block); } // Remove the last comma query[strlen(query) - 1] = ' '; strcat(query, query_end); if (m2_mysql_query(query)) { free(query); return 1; } free(query); return 0; } /* Inserts the CDRs into the DB */ int insert_disputed_cdrs(const pair_t *map, const int size, const int dispute_id, const int type) { if (!size) { return 0; } char *query = calloc(1, SQL_BATCH_SIZE * 40 + 170); char query_end[90] = ""; char block[40] = ""; int i = 0; if (type) { // Inserts the external CDRs that are duplicate, have errors or have no match strcpy(query, "INSERT INTO disputed_cdrs (id,mismatch_type,dispute_id,is_placeholder) VALUES "); strcpy(query_end, "ON DUPLICATE KEY UPDATE mismatch_type=VALUES(mismatch_type)"); } else { // Inserts the local CDRs that are duplicate, have errors or have no match strcpy(query, "INSERT INTO disputed_cdrs (call_id,mismatch_type,dispute_id,is_placeholder) VALUES "); strcpy(query_end, "ON DUPLICATE KEY UPDATE call_id=VALUES(call_id),mismatch_type=VALUES(mismatch_type)"); } for (i = 0; i < size; i++) { sprintf(block, "(%lld,'%02d',%d,%d),", map[i].fst, map[i].snd, dispute_id, !type); strcat(query, block); } query[strlen(query) - 1] = ' '; if (type) { strcat(query, query_end); } if (m2_mysql_query(query)) { free(query); return 1; } free(query); return 0; } /* Logs Dispute data */ void print_dispute(const struct cdr_dispute *dispute) { if (dispute == NULL) { m2_log("Dispute was not found"); return; } m2_log("Dispute data:\n" "\t\t\t id = %d\n" "\t\t\t user_id = %d\n" "\t\t\t direction = %d\n" "\t\t\t period_start = '%s' (%lds)\n" "\t\t\t period_end = '%s' (%lds)\n" "\t\t\t time_shift = %ld\n" "\t\t\t time_zone = %lds\n" "\t\t\t billsec_tolerance = %ds\n" "\t\t\t cost_tolerance = %.5f\n" "\t\t\t cmp_last_src_digits = %d\n" "\t\t\t cmp_last_dst_digits = %d\n" "\t\t\t exchange_rate = %.6f\n" "\t\t\t check_only_answered_calls = %d\n", dispute->id, dispute->user_id, dispute->direction, dispute->period_start, dispute->period_start_t, dispute->period_end, dispute->period_end_t, dispute->time_shift, dispute->time_zone, dispute->billsec_tolerance, dispute->cost_tolerance, dispute->cmp_last_src_digits, dispute->cmp_last_dst_digits, dispute->exchange_rate, dispute->check_only_answered_calls); } /* Updates a status of a Dispute. */ int update_dispute_status(const int status, const int id) { char query[80] = ""; sprintf(query, "UPDATE disputes SET status=%d, message=%d WHERE id=%d", status, dispute_message_id, id); if (m2_mysql_query(query)) { return 1; } return 0; } /* Updates information of a Dispute. */ int finish_dispute(const struct cdr_dispute *dispute) { char query[128] = ""; sprintf(query, "UPDATE disputes SET status=%d,time_shift=%ld,time_zone=%ld WHERE id=%d", 3, dispute->time_shift, dispute->time_zone, dispute->id); if (m2_mysql_query(query)) { return 1; } return 0; } /* Retrieves a dispute by a dispute_id and stores its data on a cdr_dispute structure. */ int get_cdr_dispute(struct cdr_dispute *dispute, const int id) { if (dispute == NULL) { return 1; } memset(dispute, 0, sizeof(struct cdr_dispute)); char query[210] = ""; sprintf(query, "SELECT " "user_id,direction,period_start,period_end,billsec_tolerance," "cost_tolerance,cmp_last_src_digits,cmp_last_dst_digits,exchange_rate,check_only_answered_calls " "FROM disputes WHERE id=%d LIMIT 1", id); if (m2_mysql_query(query)) { return 1; } MYSQL_RES *result = mysql_store_result(&mysql); if (result) { my_ulonglong result_size = mysql_num_rows(result); if (result_size) { MYSQL_ROW row = mysql_fetch_row(result); if (row) { dispute->id = id; if (row[0]) dispute->user_id = atoi(row[0]); if (row[1]) dispute->direction = atoi(row[1]); if (row[2]) { strncpy(dispute->period_start, row[2], sizeof(dispute->period_start)); // Creates a time-stamp from a date struct tm pstart; memset(&pstart, 0, sizeof(struct tm)); strptime(dispute->period_start, DATE_FORMAT, &pstart); dispute->period_start_t = mktime(&pstart); } if (row[3]) { strncpy(dispute->period_end, row[3], sizeof dispute->period_end); // Creates a time-stamp from a date struct tm pend; memset(&pend, 0, sizeof(struct tm)); strptime(dispute->period_end, DATE_FORMAT, &pend); dispute->period_end_t = mktime(&pend); } if (row[4]) dispute->billsec_tolerance = atoi(row[4]); if (row[5]) dispute->cost_tolerance = strtod(row[5], NULL); if (row[6]) dispute->cmp_last_src_digits = atoi(row[6]); if (row[7]) dispute->cmp_last_dst_digits = atoi(row[7]); if (row[8]) dispute->exchange_rate = strtod(row[8], NULL); if (row[9]) dispute->check_only_answered_calls = atoi(row[9]); } } mysql_free_result(result); return result_size ? 0 : 1; } return 1; } /* Retrieves local CDRs and stores them on a virtual file. */ struct vfile *get_local_cdrs(const struct cdr_dispute *dispute, int *status) { // Initializes an empty virtual file struct vfile *local_cdrs = init_vfile(); if (local_cdrs == NULL ) { *status = 1; return NULL; } char query[1024] = ""; sprintf(query, "SELECT " "IF(LENGTH(src)>=%d,SUBSTRING(src,-%d),src),IF(LENGTH(dst)>=%d,SUBSTRING(dst,-%d),dst)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',calldate)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',answer_time)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',end_time)," "billsec,%s,disposition,id " "FROM calls " "WHERE calldate BETWEEN '%s' AND '%s' AND %s=%d %s " "ORDER BY calldate DESC LIMIT %d", dispute->cmp_last_src_digits, dispute->cmp_last_src_digits, dispute->cmp_last_dst_digits, dispute->cmp_last_dst_digits, dispute->direction == 0 ? "user_price" : "provider_price", dispute->period_start, dispute->period_end, dispute->direction == 0 ? "user_id" : "provider_id", dispute->user_id, dispute->check_only_answered_calls == 0 ? "" : " AND disposition='ANSWERED' ", VFILE_MAX_SIZE); if (m2_mysql_query(query)) { *status = 1; return NULL; } MYSQL_RES *result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { MYSQL_ROW row; while ((row = mysql_fetch_row(result))) { struct vline *new_vline = make_vline(row); if (new_vline != NULL) { // Adjust the local cost with exchange rate new_vline->cost *= dispute->exchange_rate; append_vline(local_cdrs, new_vline); } } } mysql_free_result(result); } *status = 0; return local_cdrs; } /* Retrieves external CDRs and stores them on a virtual file. */ struct vfile *get_external_cdrs(const struct cdr_dispute *dispute, int *status) { // Initialize an empty virtual file struct vfile *external_cdrs = init_vfile(); if (external_cdrs == NULL ) { *status = 1; return NULL; } char query[1024] = ""; sprintf(query, "SELECT " "IF(LENGTH(src)>=%d,SUBSTRING(src,-%d),src),IF(LENGTH(dst)>=%d,SUBSTRING(dst,-%d),dst)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',start_time)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',answer_time)," "TIMESTAMPDIFF(SECOND,'1970-01-01 00:00:00',end_time)," "billsec,cost,disposition,id " "FROM disputed_cdrs " "WHERE dispute_id=%d %s " "ORDER BY start_time DESC LIMIT %d", dispute->cmp_last_src_digits, dispute->cmp_last_src_digits, dispute->cmp_last_dst_digits, dispute->cmp_last_dst_digits, dispute->id, dispute->check_only_answered_calls == 0 ? "" : " AND disposition='ANSWERED' ", VFILE_MAX_SIZE); if (m2_mysql_query(query)) { *status = 1; return NULL; } MYSQL_RES *result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { MYSQL_ROW row; while ((row = mysql_fetch_row(result))) { struct vline *new_vline = make_vline(row); if (new_vline != NULL) { append_vline(external_cdrs, new_vline); } } } mysql_free_result(result); } *status = 0; return external_cdrs; } /* At exit callback to free used resources */ void cleanup_memory(void) { if (dispute_in_progress) { update_dispute_status(4, dispute_id); m2_task_unlock(4); } m2_log("Script completed\n"); } /* Compares two doubles with a certain precision. Used to avoid rounded decimal mantissa tails */ int dequal(double a, double b) { return fabs(a - b) < EPSILON; }