// Author: Ricardas Stoma // Company: Kolmisoft // Year: 2012 // About: Script moves old calls from 'calls' to 'calls_old' table #define SCRIPT_VERSION "1.38" #define SCRIPT_NAME "m2_archive_old_calls" #define DB_FROM "calls" #define DB_TO "calls_old" #define SKIPPED_CALLS "/tmp/m2_skipped_calls.sql" #define DATA_INFILE_PATH "/tmp/" DB_TO ".txt" #define SQL_LIMIT 1000 #define SQL_LIMIT_IDS 1000 * 2 #define BATCH_SIZE 250 #define IDS_BUFFER_SIZE BATCH_SIZE * 20 + 256 #define PROGRESS_TIMER 10 #define M2_SQL_CONNECTIONS 2 // use multiple sql connections #include "m2_functions.c" // VARIABLES // SQL variables char sql_file_buffer[2048] = ""; char sql_delete_statement[64] = "DELETE FROM " DB_FROM " WHERE id IN ("; char sql_count_statement[64] = "SELECT COUNT(id) FROM " DB_FROM " WHERE id IN ("; char buffer[6000] = ""; long long int total_calls = 0; long long int total_answered_calls = 0; long long int transferred_calls = 0; // call ids stored in array for future deletion char ids_buffer[IDS_BUFFER_SIZE] = ""; char check_ids_buffer[IDS_BUFFER_SIZE] = ""; long long int ids[SQL_LIMIT_IDS]; // list of column names char colnames[4096] = ""; char colnames_calls[4096] = ""; char colnames_table_name[4096] = ""; int colnames_count = 0; // task variables int time_limit_h = 0; int time_limit_m = 0; int task_failed = 0; int forced_stop = 0; int finished = 1; int time_started_h = 0; int time_started_m = 0; int timer_is_set = 0; char forced_stop_date[20] = ""; char script_start_time[20] = ""; char resume_point_sql[256] = ""; int older_than = 0; int archive_only_answered = 0; int delete_from_call_details = 0; int delete_instead_of_archive = 0; int delete_archived_calls_older_than = 0; int do_not_delete_from_calls = 0; char last_calldate[20] = ""; char prev_last_calldate[20] = ""; char last_calldate_resume_point[20] = ""; int sql_limit_var = SQL_LIMIT; int sql_limit_inc = 0; // backup to csv int save_to_csv = 0; char path_to_backups[256] = ""; char csv_filename[512] = ""; int csv_iterations = 0; int csv_part = 0; // Special fields int calldate_field_pos = -1; int disposition_field_pos = -1; int id_field_pos = -1; // FTP variables int save_on_ftp = 0; int DEBUG_PROGRESS = 0; // FUNCTION DECLARATIONS void *set_timer(); void my_strcat(char *str1, char *str2); int delete_calls_from_database(int calls); int get_calls_from_database(FILE *infile, int *calls, int *done, int older_than, int *failed_calls); void error_handle(); void generate_forced_stop_date(); int delete_archived_calls(); void get_settings_from_conflines(); void get_backups_path(char *path); void process_csv_files(); int file_count(char *path); void char_replace(char *string, char find, char replace); int cmd_output_line_count(char *cmd); int get_calls_count(char *db, int records, int cloud_enabled); void get_resume_point(); void get_min_max_calldates(char *min_calldate, char *max_calldate); long long int get_calls_count_from_table(char *table, char *start_date, char *end_date); void fix_inconsistencies(char *min_calldate, char *max_calldate); void fix_inconsistencies_by_day(char *start_date, char *end_date); // MAIN FUNCTION int main(int argc, char *argv[]) { MYSQL_RES *result; MYSQL_ROW row; char sqlcmd[5000] = ""; int calls = 0; int done = 0; char datetime[20] = ""; pthread_attr_t tattr; pthread_attr_init(&tattr); pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); pthread_t timer; // check if debug is ON if (argc > 1) { if(strcmp(argv[1], "--debug") == 0) { DEBUG_PROGRESS = 1; } } // starting the script m2_init("Starting M2 Archive Old Calls script\n"); // mark task as failed on segmentation fault struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); sigemptyset(&sa.sa_mask); sa.sa_sigaction = error_handle; sa.sa_flags = SA_SIGINFO; sigaction(SIGSEGV, &sa, NULL); sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL); atexit(error_handle); m2_get_current_date(script_start_time); // Round time down minutes and seconds strcpy(script_start_time + 14, "00:00"); get_elasticsearch_host(); // connect to cloud DB (if configured in conflines) m2_cloud_connect(); struct tm tmm; strptime(datetime, DATE_FORMAT, &tmm); time_started_h = tmm.tm_hour; time_started_m = tmm.tm_min; char time_limit_str[256] = ""; if (m2_task_get(2, NULL, NULL, time_limit_str, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) { return 1; } // parse hours and minutes if (strlen(time_limit_str)) { char buffer[64] = ""; strncpy(buffer, time_limit_str, 2); time_limit_h = atoi(buffer); strncpy(buffer, time_limit_str + 3, 2); time_limit_m = atoi(buffer); } task_failed = 1; // get various settings get_settings_from_conflines(); if (!(time_limit_h <= 24 || time_limit_h >= -1)) { m2_log("Time limit must be between -1 and 24\n"); return 1; } if (!(time_limit_m <= 59 || time_limit_m >= -1)) { m2_log("Time limit must be between -1 and 59\n"); return 1; } // generate stop date if (time_limit_h > -1) { generate_forced_stop_date(); m2_log("Call archiving should be stopped at %s\n", forced_stop_date); } if (save_to_csv == 1) { m2_log("Calls should be archived to CSV file\n"); get_backups_path(path_to_backups); if (strlen(path_to_backups)) { char system_cmd[512] = ""; m2_log("Backups path: %s\n", path_to_backups); sprintf(system_cmd, "rm -fr %s/m2_archived_calls", path_to_backups); system(system_cmd); sprintf(system_cmd, "mkdir -p %s/m2_archived_calls", path_to_backups); system(system_cmd); sprintf(system_cmd, "chmod 777 -R %s/m2_archived_calls", path_to_backups); system(system_cmd); } else { m2_log("Unknown path to backups!\n"); return 1; } } if (do_not_delete_from_calls) { m2_log("Calls will not be deleted from database\n"); get_resume_point(); m2_log("Last calldate in calls_old: %s\n", last_calldate_resume_point); } // check hdd usage int hdd_usage = m2_get_hdd_usage(); m2_log("HDD free space left: %d%%\n", 100 - hdd_usage); if (hdd_usage >= 95 && cloud_mysql_enabled == 0) { m2_log("Server is running low on free HDD space. Calls will not be archived.\n"); return 1; } char db_to_columns[4096] = ""; // get columns from 'db_to' if (cloud_mysql_enabled) { sprintf(sqlcmd, "SELECT column_name FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s' ORDER BY ORDINAL_POSITION", DB_TO, cloud_mysql_database); if (m2_cloud_mysql_query(sqlcmd)) { return 1; } result = mysql_store_result(&cloud_mysql); } else { sprintf(sqlcmd, "SELECT column_name FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s' AND column_name IN (SELECT DISTINCT column_name FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s') ORDER BY ORDINAL_POSITION", DB_FROM, dbname, DB_TO, dbname); if (m2_mysql_query(sqlcmd)) { return 1; } result = mysql_store_result(&mysql); } // construct string from column names like 'id','calldate','dst',... while ((row = mysql_fetch_row(result)) != NULL) { strcat(db_to_columns, "'"); strcat(db_to_columns, row[0]); strcat(db_to_columns, "'"); strcat(db_to_columns, ","); } if (strlen(db_to_columns)) { db_to_columns[strlen(db_to_columns) - 1] = 0; } if (result) { mysql_free_result(result); } if (!strlen(db_to_columns)) { m2_log("Failed to retrieve column fields for table [%s]\n", DB_TO); return 1; } // get column names from calls table that match calls_old table columns sprintf(sqlcmd, "SELECT column_name FROM information_schema.columns WHERE table_name = '" DB_FROM "' AND column_name IN (%s) ORDER BY ORDINAL_POSITION", db_to_columns); if (m2_mysql_query(sqlcmd)) { return 1; } result = mysql_store_result(&mysql); // construct string from column names like 'id,calldate,dst....' while (( row = mysql_fetch_row(result)) != NULL ) { char tmp_buffer[256] = ""; sprintf(tmp_buffer, "%s.%s", DB_FROM, row[0]); strcat(colnames_table_name, tmp_buffer); strcat(colnames, row[0]); strcat(colnames, ","); strcat(colnames_table_name, ","); // Get disposition column position if (strcmp(row[0], "disposition") == 0) { disposition_field_pos = colnames_count; } // Get id column position if (strcmp(row[0], "id") == 0) { id_field_pos = colnames_count; } // Get calldate column position if (strcmp(row[0], "calldate") == 0) { calldate_field_pos = colnames_count; } colnames_count++; } // Check if we found required special fields if (disposition_field_pos < 0) { m2_log("Failed to get disposition field position\n"); return 1; } if (id_field_pos < 0) { m2_log("Failed to get id field position\n"); return 1; } if (calldate_field_pos < 0) { m2_log("Failed to get calldate field position\n"); return 1; } colnames[strlen(colnames) - 1] = 0; colnames_table_name[strlen(colnames_table_name) - 1] = 0; strcpy(colnames_calls, colnames); if (DEBUG_PROGRESS) { sprintf(buffer, "LOAD DATA LOCAL INFILE '" DATA_INFILE_PATH "' INTO TABLE " DB_TO " FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\\n' (%s);", colnames); printf("%s\n", buffer); exit(0); } mysql_free_result(result); m2_log("Transferring common fields from %s to %s: %s\n", DB_FROM, DB_TO, colnames); if (do_not_delete_from_calls && strlen(last_calldate_resume_point)) { sprintf(resume_point_sql, "AND calls.calldate >= '%s'", last_calldate_resume_point); } sprintf(buffer, "SELECT COUNT(id) FROM " DB_FROM " WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) %s", script_start_time, older_than, resume_point_sql); if (m2_mysql_query(buffer)) { return 1; } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); if (row[0]) { total_calls = atoi(row[0]); } } } mysql_free_result(result); if (archive_only_answered) { sprintf(buffer, "SELECT COUNT(id) FROM " DB_FROM " WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) AND disposition = 'ANSWERED' %s", script_start_time, older_than, resume_point_sql); if (m2_mysql_query(buffer)) { return 1; } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); if (row[0]) { total_answered_calls = atoi(row[0]); } } } mysql_free_result(result); } if (total_calls) { m2_log("Moving calls that are older than %d days\n", older_than); if (delete_instead_of_archive) { m2_log("%lld calls will be deleted\n", total_calls); } else { if (archive_only_answered) { m2_log("%lld answered calls will be archived\n", total_answered_calls); } else { m2_log("%lld calls will be archived\n", total_calls); } m2_log("Starting transfer...\n"); } // mark as 'IN PROGRESS' m2_task_lock(); pthread_create(&timer, &tattr, set_timer, NULL); timer_is_set = 1; while (done == 0 && forced_stop == 0) { // check HDD usage on each iteration int hdd_usage = m2_get_hdd_usage(); if (hdd_usage >= 95 && cloud_mysql_enabled == 0) { m2_log("Server is running low on free HDD space (%d%% free space left). Calls archiving will be stopped.\n", 100 - hdd_usage); process_csv_files(); exit(1); } // delete old file unlink(DATA_INFILE_PATH); // MySQL DATA INFILE FILE *infile = fopen(DATA_INFILE_PATH, "w"); if (infile == NULL) { fprintf(stderr, "Cannot open: " DATA_INFILE_PATH "\n"); m2_log("Cannot create: " DATA_INFILE_PATH "\n"); return 1; } // default state is 'done' done = 1; // reset calls calls = 0; int failed_calls = 0; // get calls from database and prepare data to be imported to another database if (get_calls_from_database(infile, &calls, &done, older_than, &failed_calls)) return 1; fclose(infile); if (done) { // remove data file unlink(DATA_INFILE_PATH); // job is done break; } if (delete_instead_of_archive == 0 && save_to_csv == 0) { // insert calls using data infile sprintf(buffer, "LOAD DATA LOCAL INFILE '" DATA_INFILE_PATH "' INTO TABLE " DB_TO " FIELDS TERMINATED BY ',' ENCLOSED BY '\"' LINES TERMINATED BY '\n' (%s);", colnames); if (m2_cloud_mysql_query(buffer)) { return 1; } int calls_records = get_calls_count(DB_FROM, calls, 0); int calls_old_records = get_calls_count(DB_TO, calls, cloud_mysql_enabled); if (calls_records != calls_old_records) { char buffer[1024] = ""; m2_log("Some records could not be inserted into table '%s'. Check " SKIPPED_CALLS " query. Tried to insert records: %d, inserted records: %d, difference: %d\n", DB_TO, calls_records, calls_old_records, calls_records - calls_old_records); sprintf(buffer, "echo '\n-- %s\n' >> " SKIPPED_CALLS " && cat " DATA_INFILE_PATH " >> " SKIPPED_CALLS, datetime); system(buffer); exit(1); } } // remove temporary data file unlink(DATA_INFILE_PATH); // delete calls from database if (!do_not_delete_from_calls) { if (delete_calls_from_database(calls)) return 1; } transferred_calls = transferred_calls + calls; } // Consistency check if (do_not_delete_from_calls && forced_stop == 0 && cloud_mysql_enabled == 0) { char min_calldate[20] = ""; char max_calldate[20] = ""; m2_log("Performing consistency check\n"); get_min_max_calldates(min_calldate, max_calldate); if (strlen(min_calldate) && strlen(max_calldate)) { m2_log("Min calldate: %s, max calldate: %s\n", min_calldate, max_calldate); long long int calls_count = get_calls_count_from_table("calls", min_calldate, max_calldate); long long int calls_old_count = get_calls_count_from_table("calls_old", min_calldate, max_calldate); m2_log("Calls count: %lld, calls old count: %lld\n", calls_count, calls_old_count); if (calls_count != calls_old_count) { m2_log("Detected inconsistencies!\n"); if (abs(calls_count - calls_old_count) < 100000) { m2_log("Trying to fix it!\n"); fix_inconsistencies(min_calldate, max_calldate); calls_count = get_calls_count_from_table("calls", min_calldate, max_calldate); calls_old_count = get_calls_count_from_table("calls_old", min_calldate, max_calldate); if (calls_count != calls_old_count) { m2_log("Failed to fix inconsistencies\n"); } else { m2_log("Fixed inconsistencies\n"); } } else { fix_inconsistencies_by_day(min_calldate, max_calldate); } } else { m2_log("Data consistent!\n"); } } } } else { m2_log("No calls to be transferred\n"); } // delete archived calls if (delete_archived_calls_older_than) { delete_archived_calls(); } // terminate timer if (timer_is_set) { pthread_cancel(timer); pthread_attr_destroy(&tattr); } process_csv_files(); if (finished && forced_stop == 0) m2_task_finish(); if (finished && forced_stop == 1) { char sqlcmd[1024] = ""; m2_task_unlock(3); sprintf(sqlcmd,"UPDATE background_tasks SET finished_at = NOW() WHERE id = %d", task_id); if (m2_mysql_query(sqlcmd)) exit(1); } m2_log("Script finished\n"); task_failed = 0; return 0; } int get_calls_from_database(FILE *infile, int *calls, int *done, int older_than, int *failed_calls) { MYSQL_RES *result; MYSQL_ROW row; FILE *csv = NULL; char order_sql[256] = ""; int local_colnames_count = colnames_count; int local_disposition_field_pos = disposition_field_pos; int local_id_field_pos = id_field_pos; int local_calldate_field_pos = calldate_field_pos; int i = 0; if (save_to_csv == 1) { sprintf(csv_filename, "%s/m2_archived_calls/m2_archived_calls_part_%d.csv", path_to_backups, csv_part + 1); csv = fopen(csv_filename, "a+"); if (csv == NULL) { m2_log("Can't open %s\n", csv_filename); exit(1); } local_colnames_count += 4; local_disposition_field_pos += 4; local_id_field_pos += 4; local_calldate_field_pos += 4; } if (do_not_delete_from_calls) { strcpy(order_sql, "ORDER BY calls.calldate ASC"); if (strlen(last_calldate)) { sprintf(resume_point_sql, "AND calls.calldate >= '%s'", last_calldate); } else if (strlen(last_calldate_resume_point)) { sprintf(resume_point_sql, "AND calls.calldate >= '%s'", last_calldate_resume_point); } } else { strcpy(resume_point_sql, ""); } strcpy(prev_last_calldate, last_calldate); // get calls that are older than X days if (save_to_csv == 1) { sprintf(buffer, "SELECT IF(LENGTH(TRIM(CONCAT(users.first_name, ' ', users.last_name))) > 0, TRIM(CONCAT(users.first_name, ' ', users.last_name)), users.username) AS 'src_user', devices.description AS 'src_device', " "IF(LENGTH(TRIM(CONCAT(dst_user.first_name, ' ', dst_user.last_name))) > 0, TRIM(CONCAT(dst_user.first_name, ' ', dst_user.last_name)), dst_user.username) AS 'dst_user', dst_device.description AS 'dst_device', " "%s FROM %s " "LEFT JOIN users ON users.id = %s.user_id " "LEFT JOIN users AS dst_user ON (dst_user.id = %s.dst_user_id AND dst_user.id > 0) " "LEFT JOIN devices ON devices.id = %s.src_device_id " "LEFT JOIN devices AS dst_device ON dst_device.id = %s.dst_device_id " " WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) %s %s LIMIT %d", colnames_table_name, DB_FROM, DB_FROM, DB_FROM, DB_FROM, DB_FROM, script_start_time, older_than, resume_point_sql, order_sql, sql_limit_var); } else { sprintf(buffer, "SELECT %s FROM " DB_FROM " WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) %s %s LIMIT %d", colnames, script_start_time, older_than, resume_point_sql, order_sql, sql_limit_var); } if (m2_mysql_query(buffer)) { return 1; } result = mysql_store_result(&mysql); // format sql data file while (( row = mysql_fetch_row(result)) != NULL ) { int skip_call = 0; *done = 0; // check if call is answered if (archive_only_answered && row[local_disposition_field_pos] && strcmp(row[local_disposition_field_pos], "ANSWERED")) { skip_call = 1; } if (!skip_call) { // get each field for (i = 0; i < local_colnames_count; i++) { if (row[i]) { my_strcat(sql_file_buffer, row[i]); } else { if (save_to_csv == 1) { my_strcat(sql_file_buffer, ""); } else { my_strcat(sql_file_buffer, "NULL"); } } } sql_file_buffer[strlen(sql_file_buffer) - 1] = 0; // write to data infile fprintf(infile, "%s\n", sql_file_buffer); if (save_to_csv == 1) { fprintf(csv, "%s\n", sql_file_buffer); csv_iterations++; if (csv_iterations >= 50000) { csv_part++; csv_iterations = 0; fclose(csv); sprintf(csv_filename, "%s/m2_archived_calls/m2_archived_calls_part_%d.csv", path_to_backups, csv_part + 1); csv = fopen(csv_filename, "a+"); if (csv == NULL) { m2_log("Can't open %s\n", csv_filename); exit(1); } } } } else { *failed_calls += 1; } *sql_file_buffer = 0; // save ids ids[*calls] = atoll(row[local_id_field_pos]); strcpy(last_calldate, row[local_calldate_field_pos]); *calls = *calls + 1; } if (strcmp(prev_last_calldate, last_calldate) == 0) { if (sql_limit_var != *calls) { m2_log("Transferred all calls\n"); *calls = 0; *done = 1; } else { m2_log("Calldate [%s] repeats, trying to increase limit to %d\n", prev_last_calldate, SQL_LIMIT * 2); if (sql_limit_inc == 1) { m2_log("Limit was increased previously, aborting...\n"); return 1; } sql_limit_var = SQL_LIMIT * 2; sql_limit_inc = 1; } } else { sql_limit_inc = 0; sql_limit_var = SQL_LIMIT; } if (save_to_csv == 1 && csv) { fclose(csv); } mysql_free_result(result); return 0; } int delete_calls_from_database(int calls) { MYSQL_RES *result; MYSQL_ROW row; int i = 0; int batch_counter = 0; // initialize ids_buffer strcpy(ids_buffer, sql_delete_statement); strcpy(check_ids_buffer, sql_count_statement); long long int check_calls = 0; int last_iteration_done = 0; char es_bulk_delete_query[30000] = ""; char es_bulk_delete_buffer[256] = ""; char es_bulk_delete_command[31024] = ""; // delete from calls table for (i = 0; i < calls; i++) { sprintf(buffer, "%lld,", ids[i]); strcat(ids_buffer, buffer); strcat(check_ids_buffer, buffer); sprintf(es_bulk_delete_buffer, "{ \"delete\" : { \"_id\" : \"%llu\" } }\n", ids[i]); strcat(es_bulk_delete_query, es_bulk_delete_buffer); batch_counter++; // batch is full, send query if (batch_counter == BATCH_SIZE) { last_iteration: ids_buffer[strlen(ids_buffer) - 1] = 0; check_ids_buffer[strlen(check_ids_buffer) - 1] = 0; strcat(ids_buffer, ");"); strcat(check_ids_buffer, ");"); // delete from MySQL if (m2_mysql_query(ids_buffer)) { return 1; } // check if calls with those ids are delete if (m2_mysql_query(check_ids_buffer)) { return 1; } check_calls = -1; result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); if (row[0]) check_calls = atoll(row[0]); } } mysql_free_result(result); if (check_calls == -1) { m2_log("Can't determine if calls where deleted correctly\n"); m2_log("%s\n", check_ids_buffer); return 1; } else if (check_calls) { m2_log("Tried to delete %d calls, but there are still %lld calls left. Check why some calls where not deleted!\n", BATCH_SIZE, check_calls); m2_log("%s\n", check_ids_buffer); return 1; } *ids_buffer = 0; *check_ids_buffer = 0; strcpy(ids_buffer, sql_delete_statement); strcpy(check_ids_buffer, sql_count_statement); batch_counter = 0; // delete from Elasticsearch sprintf(es_bulk_delete_command, "/usr/bin/curl -s -XPOST \"%s:9200/m2/calls/_bulk\" -d \"%s\" &> /dev/null", elasticsearch_host, es_bulk_delete_query); system(es_bulk_delete_command); strcpy(es_bulk_delete_query, ""); if (last_iteration_done) break; } } // if batch is not full, send query anyway if (strlen(ids_buffer) > strlen(sql_delete_statement) && last_iteration_done == 0) { last_iteration_done = 1; goto last_iteration; } return 0; } void calculate_expected_time(char *datetime, int seconds) { time_t t; struct tm tmp; char tmp_str[100]; t = time(NULL) + seconds; localtime_r(&t, &tmp); strftime(tmp_str, sizeof(tmp_str), DATE_FORMAT, &tmp); strcpy(datetime, tmp_str); } void check_time_limit() { char current_date[20] = ""; m2_get_current_date(current_date); if (m2_compare_dates(current_date, forced_stop_date)) { m2_log("Archiving should be stopped (current_date: %s, archive_till: %s)\n", current_date, forced_stop_date); forced_stop = 1; } } int is_date(char *date) { int i = 0; if (strlen(date) != 19) return 0; for (i = 0; i < 19; i++) { if (i == 4 || i == 7 || i == 10 || i == 13 || i == 16) continue; if (date[i] > '9' || date[i] < '0') { return 0; } } return 1; } void *set_timer() { long long int counter = 0; long long int time_left = 0; double calls_per_sec = 0; double progress_percent = 0; char datetime[20] = ""; char progress_buffer[1024] = ""; int connection = 0; if (DEBUG_PROGRESS && total_calls > 0) { printf("--------------------------------------------------------------------------------\n"); printf(" Progress | Completed | Calls per sec | Time left | Expected to finish at\n"); printf("--------------------------------------------------------------------------------\n"); } while (1) { sleep(PROGRESS_TIMER); counter++; if (transferred_calls > 0) { progress_percent = (double)((double)transferred_calls/total_calls)*100; calls_per_sec = (double)transferred_calls/counter/PROGRESS_TIMER; time_left = ceil((double)(total_calls - transferred_calls) / calls_per_sec); } else { progress_percent = 0; calls_per_sec = 0; time_left = 9999; } if (time_left > 0) { calculate_expected_time(datetime, time_left); } else { strcpy(datetime, ""); } if (!is_date(datetime)) { strcpy(datetime, ""); } if (strlen(datetime) && time_limit_h > -1) { if (m2_compare_dates(datetime, forced_stop_date)) { strcpy(datetime, forced_stop_date); } } // m2_mysql_query_multi is used here to have another sql connection // we are doing queries in a thread, so another sql connection is required to prevent crashes sprintf(progress_buffer, "UPDATE background_tasks SET status = 2, percent_completed = %.3f, expected_to_finish_at = '%s', updated_at = NOW() WHERE id = %i", progress_percent, datetime, task_id); if (m2_mysql_query_multi(progress_buffer, &connection)) { m2_log("Query failed: %s\n", progress_buffer); } // free connection mysql_connections[connection] = 0; if (DEBUG_PROGRESS) printf(" %07lld/%07lld | %6.2f %% | %9.2f | %4lld sec | %s\n", transferred_calls, total_calls, progress_percent, calls_per_sec, time_left, datetime); // check if we should stop this script due to time limit if (time_limit_h > -1) { check_time_limit(); } } pthread_exit(NULL); } void my_strcat(char *str1, char *str2) { strcat(str1, "\""); char_replace(str2, '"', '\''); strcat(str1, str2); strcat(str1, "\""); strcat(str1, ","); } void error_handle() { static int marked = 0; if (marked == 0) { if(task_failed) { if (!DEBUG_PROGRESS) m2_task_unlock(4); m2_log("Task failed\n"); } marked = 1; } forced_stop = 1; finished = 0; } void generate_forced_stop_date() { MYSQL_RES *result; MYSQL_ROW row; struct tm tm1, tm2; time_t time1, time2; char archive_at[20] = ""; char archive_till[20] = ""; char archive_at_date[20] = ""; char archive_till_date[20] = ""; char *tz; m2_get_current_date(archive_at_date); m2_get_current_date(archive_till_date); strcpy(archive_at_date + 17, "00"); strcpy(archive_till_date + 17, "00"); // get archive_at and archive till values if (m2_mysql_query("SELECT name, value FROM conflines WHERE name = 'Archive_at' OR name = 'Archive_till'")) { exit(1); } result = mysql_store_result(&mysql); // fetch data while ((row = mysql_fetch_row(result)) != NULL) { if (row[0] && row[1]) { if (strcmp(row[0], "Archive_at") == 0) { strcpy(archive_at, row[1]); } if (strcmp(row[0], "Archive_till") == 0) { strcpy(archive_till, row[1]); } } } mysql_free_result(result); strncpy(archive_at_date + 11, archive_at, 5); strncpy(archive_till_date + 11, archive_till, 5); // set UTC time zone to avoid daylight saving and other bad stuff tz = getenv("TZ"); if (tz) tz = strdup(tz); setenv("TZ", "UTC", 1); tzset(); // convert date strings to tm stuctures memset(&tm1, 0, sizeof(struct tm)); memset(&tm2, 0, sizeof(struct tm)); strptime(archive_at_date, DATE_FORMAT, &tm1); strptime(archive_till_date, DATE_FORMAT, &tm2); time1 = mktime(&tm1); time2 = mktime(&tm2); if (time2 < time1) { time2 += 24*60*60; memset(&tm2, 0, sizeof(struct tm)); localtime_r(&time2, &tm2); strftime(archive_till_date, 20, DATE_FORMAT, &tm2); strncpy(archive_till_date + 11, archive_till, 5); } // restore timezone sessions variable if (tz) { setenv("TZ", tz, 1); free(tz); } else { unsetenv("TZ"); } tzset(); strcpy(forced_stop_date, archive_till_date); } /* Delete calls from calls_old table (older than x days) */ int delete_archived_calls() { MYSQL_RES *result; MYSQL_ROW row; char query[2048] = ""; char delete_query[2048] = ""; int done = 0; int counter = 0; m2_log("Deleting archived calls older than %d days\n", delete_archived_calls_older_than); sprintf(query, "SELECT id FROM calls_old WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) LIMIT 1", script_start_time, delete_archived_calls_older_than); sprintf(delete_query, "DELETE FROM calls_old WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) LIMIT 1000", script_start_time, delete_archived_calls_older_than); while (done == 0 && forced_stop == 0) { done = 1; // delete old calls if (m2_cloud_mysql_query(delete_query)) { exit(1); } // get old calls count if (m2_cloud_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); // fetch data while ((row = mysql_fetch_row(result)) != NULL) { if (row[0] && atoll(row[0]) > 0) { done = 0; } } mysql_free_result(result); counter++; if (counter >= 10000) { m2_log("Possible loop detected. Breaking out of it!\n"); return 0; } } return 0; } /* Get various settings from conflines */ void get_settings_from_conflines() { MYSQL_RES *result; MYSQL_ROW row; m2_log("Reading conflines\n"); // get Move_to_old_calls_older_than value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Move_to_old_calls_older_than'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { older_than = atoi(row[0]); } else { m2_log("Move_to_old_calls_older_than is not set\n"); exit(1); } } } // nothing to do if (older_than == 0) { m2_log("Move_to_old_calls_older_than is 0\n"); exit(1); } mysql_free_result(result); // get Archive_only_answered_calls value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Archive_only_answered_calls'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { archive_only_answered = atoi(row[0]); } } } if (archive_only_answered) { m2_log("Only ANSWERED calls will be archived\n"); } mysql_free_result(result); // get Delete_Calls_instead_of_Archiving value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Delete_Calls_instead_of_Archiving'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { delete_instead_of_archive = atoi(row[0]); } } } if (delete_instead_of_archive) { m2_log("Calls will be deleted instead of archived!\n"); } mysql_free_result(result); // get Delete_Archived_Calls_older_than value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Delete_Archived_Calls_older_than'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { delete_archived_calls_older_than = atoi(row[0]); } } } mysql_free_result(result); // get Archive_Calls_to_CSV value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Archive_Calls_to_CSV'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { save_to_csv = atoi(row[0]); } } } mysql_free_result(result); // get Do_not_delete_Archived_Calls_from_calls_table value if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Do_not_delete_Archived_Calls_from_calls_table'")) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { do_not_delete_from_calls = atoi(row[0]); } } } mysql_free_result(result); char sql_cmd[2048] = ""; sprintf(sql_cmd, "SELECT (SELECT value FROM conflines WHERE name = 'save_archived_calls_on_ftp')"); if (m2_mysql_query(sql_cmd)) { exit(1); } result = mysql_store_result(&mysql); if (result) { if (mysql_num_rows(result)) { row = mysql_fetch_row(result); // check if we get result if (row[0]) { save_on_ftp = atoi(row[0]); if (save_on_ftp) save_to_csv = 1; } } } if (save_on_ftp) { m2_get_ftp_settings(); } mysql_free_result(result); } /* Get path to backups folders */ void get_backups_path(char *path) { MYSQL_RES *result; MYSQL_ROW row; // get path if (m2_mysql_query("SELECT value FROM conflines WHERE name = 'Backup_Folder' LIMIT 1")) { exit(1); } result = mysql_store_result(&mysql); // fetch data while ((row = mysql_fetch_row(result)) != NULL) { if (row[0]) { strcpy(path, row[0]); } } mysql_free_result(result); } /* Process CSV files (set proper filenames, create tgz) */ void process_csv_files() { char filename[512] = ""; char ftp_filename[512] = ""; char cmd[5000] = ""; int pid = getpid(); int pid_used = 0; int i = 0; int local_calldate_field_pos = calldate_field_pos + 4; if (save_to_csv == 0) { return; } m2_log("Processing CSV files\n"); // delete empty files sprintf(cmd, "find %s/m2_archived_calls -size 0 -print0 | xargs -0 rm", path_to_backups); system(cmd); sprintf(cmd, "%s/m2_archived_calls", path_to_backups); if (file_count(cmd)) { // rename files for (i = 0; i < (csv_part + 1); i++) { // check that csv file exists sprintf(filename, "%s/m2_archived_calls/m2_archived_calls_part_%d.csv", path_to_backups, i + 1); m2_log("Checking file: %s\n", filename); // check if file exists if (file_count(filename) == 0) { break; } // get min date sprintf(cmd, "cat %s/m2_archived_calls/m2_archived_calls_part_%d.csv | awk -F\\\",\\\" '{print $%d}' | sort | head -n 1 | tr '\"' '\\0' > /tmp/m2_min_date && LC_ALL=en_EN.utf8 date -d \"`cat /tmp/m2_min_date`\" +%%Y%%b%%d-%%H%%M%%S > /tmp/m2_min_date ", path_to_backups, i + 1, local_calldate_field_pos + 1); system(cmd); // make a copy of min date for gz file if (i == 0) { system("cp -fr /tmp/m2_min_date /tmp/m2_min_date_tgz"); } // get max date sprintf(cmd, "cat %s/m2_archived_calls/m2_archived_calls_part_%d.csv | awk -F\\\",\\\" '{print $%d}' | sort | tail -n 1 | tr '\"' '\\0' > /tmp/m2_max_date && LC_ALL=en_EN.utf8 date -d \"`cat /tmp/m2_max_date`\" +%%Y%%b%%d-%%H%%M%%S > /tmp/m2_max_date", path_to_backups, i + 1, local_calldate_field_pos + 1); system(cmd); // add header sprintf(cmd, "sed -i '1i src_user,src_device,dst_user,dst_device,%s' %s/m2_archived_calls/m2_archived_calls_part_%d.csv", colnames_calls, path_to_backups, i + 1); system(cmd); // check if existing file doest not have the same name sprintf(cmd, "ls %s/m2_archived_calls/m2_archived_calls_from_`cat /tmp/m2_min_date`_to_`cat /tmp/m2_max_date`.csv | wc -l", path_to_backups); if (cmd_output_line_count(cmd) == 0) { // rename csv filename sprintf(cmd, "mv %s/m2_archived_calls/m2_archived_calls_part_%d.csv %s/m2_archived_calls/m2_archived_calls_from_`cat /tmp/m2_min_date`_to_`cat /tmp/m2_max_date`.csv", path_to_backups, i + 1, path_to_backups); system(cmd); } else { sprintf(cmd, "ls %s/m2_archived_calls/m2_archived_calls_from_`cat /tmp/m2_min_date`_to_`cat /tmp/m2_max_date`.csv | wc -l", path_to_backups); m2_log("File already exists with the name: %s\n", cmd); // rename csv filename sprintf(cmd, "mv %s/m2_archived_calls/m2_archived_calls_part_%d.csv %s/m2_archived_calls/m2_archived_calls_from_`cat /tmp/m2_min_date`_to_`cat /tmp/m2_max_date`_`uuidgen`.csv", path_to_backups, i + 1, path_to_backups); system(cmd); } } // make tgz sprintf(filename, "%s/m2_archived_calls", path_to_backups); if (file_count(filename)) { m2_log("Compressing CSV files to tgz\n"); // check if existing file doest not have the same name sprintf(cmd, "ls %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date` | wc -l", path_to_backups); if (cmd_output_line_count(cmd) == 0) { // rename dir sprintf(cmd, "mv %s/m2_archived_calls %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`", path_to_backups, path_to_backups); system(cmd); } else { sprintf(cmd, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`", path_to_backups); m2_log("Directory already exists with the name: %s\n", cmd); // rename dir sprintf(cmd, "mv %s/m2_archived_calls %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_%d", path_to_backups, path_to_backups, pid); system(cmd); pid_used = 1; } // make tgz // check if existing file doest not have the same name sprintf(cmd, "ls %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz | wc -l", path_to_backups); if (cmd_output_line_count(cmd) == 0) { if (pid_used) { sprintf(cmd, "cd %s && tar -cvzf m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_%d", path_to_backups, pid); sprintf(ftp_filename, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz", path_to_backups); } else { sprintf(cmd, "cd %s && tar -cvzf m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`", path_to_backups); sprintf(ftp_filename, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz", path_to_backups); } m2_log("%s\n", cmd); system(cmd); } else { sprintf(cmd, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`.tgz", path_to_backups); m2_log("File already exists with the name: %s\n", cmd); if (pid_used) { sprintf(cmd, "cd %s && tar -cvzf m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_`uuidgen`.tgz m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_%d", path_to_backups, pid); sprintf(ftp_filename, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_`uuidgen`.tgz", path_to_backups); } else { sprintf(cmd, "cd %s && tar -cvzf m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_`uuidgen`.tgz m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`", path_to_backups); sprintf(ftp_filename, "%s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_`uuidgen`.tgz", path_to_backups); } m2_log("%s\n", cmd); system(cmd); } // Send file to ftp if (save_on_ftp) { m2_log("CSV file will be transferred to FTP server \n"); if (m2_upload_file_to_ftp(ftp_filename, ftp_archived_calls_path)) { m2_log("CSV File will be deleted from local server \n"); sprintf(cmd, "rm -f %s", ftp_filename); m2_log("%s\n", cmd); system(cmd); } else { m2_log("CSV File will be stored on local server \n"); } } } } // remove tmp data m2_log("Cleaning tmp CSV data\n"); if (pid_used) { sprintf(cmd, "rm -fr %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`_%d", path_to_backups, pid); system(cmd); } else { sprintf(cmd, "rm -fr %s/m2_archived_calls_from_`cat /tmp/m2_min_date_tgz`_to_`cat /tmp/m2_max_date`", path_to_backups); system(cmd); } sprintf(cmd, "rm -fr %s/m2_archived_calls", path_to_backups); system(cmd); system("rm -fr /tmp/m2_min_date"); system("rm -fr /tmp/m2_min_date_tgz"); system("rm -fr /tmp/m2_max_date"); } /* how many files in directory */ int file_count(char *path) { char command[512] = ""; char pipe_buffer[64] = ""; sprintf(command, "ls %s | wc -l", path); FILE *pipe = popen(command, "r"); if (pipe == NULL) { m2_log("Command failed: %s\n", command); exit(1); } fgets(pipe_buffer, 64, pipe); pclose(pipe); return atoi(pipe_buffer); } /* Replace single character */ void char_replace(char *string, char find, char replace) { int i = 0; for (i = 0; i < strlen(string); i++) { if (string[i] == find) { string[i] = replace; } } } /* how many files in directory */ int cmd_output_line_count(char *cmd) { char pipe_buffer[64] = ""; FILE *pipe = popen(cmd, "r"); if (pipe == NULL) { m2_log("Command failed: %s\n", cmd); exit(1); } fgets(pipe_buffer, 64, pipe); pclose(pipe); return atoi(pipe_buffer); } /* Check how many calls are in a table (by id) */ int get_calls_count(char *db, int records, int cloud_enabled) { MYSQL_RES *result; MYSQL_ROW row; int calls = 0; int i = 0; char query[(IDS_BUFFER_SIZE) * 5] = ""; if (archive_only_answered) { sprintf(query, "SELECT COUNT(*) FROM %s WHERE disposition = 'ANSWERED' AND id IN (", db); } else { sprintf(query, "SELECT COUNT(*) FROM %s WHERE id IN (", db); } for (i = 0; i < records; i++) { char buffer[256] = ""; sprintf(buffer, "%llu,", ids[i]); strcat(query, buffer); } query[strlen(query) - 1] = ')'; if (cloud_enabled) { if (m2_cloud_mysql_query(query)) { exit(1); } result = mysql_store_result(&cloud_mysql); } else { if (m2_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); } // fetch data while ((row = mysql_fetch_row(result)) != NULL) { if (row[0]) { calls = atoi(row[0]); } } mysql_free_result(result); return calls; } /* Resume point when not deleting from calls table */ void get_resume_point() { MYSQL_RES *result; MYSQL_ROW row; if (m2_mysql_query("SELECT MAX(calldate) FROM calls_old")) { exit(1); } result = mysql_store_result(&mysql); while ((row = mysql_fetch_row(result)) != NULL) { if (row[0]) strcpy(last_calldate_resume_point, row[0]); } mysql_free_result(result); } /* Get calls count */ long long int get_calls_count_from_table(char *table, char *start_date, char *end_date) { MYSQL_RES *result; MYSQL_ROW row; long long int calls_count = -1; char query[2000] = ""; char answered_sql[200] = ""; if (archive_only_answered) { strcpy(answered_sql, "AND disposition = 'ANSWERED'"); } sprintf(query, "SELECT COUNT(*) FROM %s WHERE calldate BETWEEN '%s' AND '%s' %s", table, start_date, end_date, answered_sql); if (m2_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); while ((row = mysql_fetch_row(result)) != NULL) { if (row[0]) calls_count = atoll(row[0]); } mysql_free_result(result); return calls_count; } /* Get min/max calldates from calls table */ void get_min_max_calldates(char *min_calldate, char *max_calldate) { MYSQL_RES *result; MYSQL_ROW row; char resume_point_sql[512] = ""; char query[2048] = ""; if (do_not_delete_from_calls && strlen(last_calldate_resume_point)) { sprintf(resume_point_sql, "AND calls.calldate >= '%s'", last_calldate_resume_point); } sprintf(query, "SELECT MIN(calldate), MAX(calldate) FROM calls WHERE calldate < DATE_SUB('%s', INTERVAL %d DAY) %s", script_start_time, older_than, resume_point_sql); if (m2_mysql_query(query)) { exit(1); } result = mysql_store_result(&mysql); while ((row = mysql_fetch_row(result)) != NULL) { if (row[0]) strcpy(min_calldate, row[0]); if (row[1]) strcpy(max_calldate, row[1]); } mysql_free_result(result); } /* Hard way to fix missing data */ void fix_inconsistencies(char *min_calldate, char *max_calldate) { char query[9000] = ""; char answered_sql[200] = ""; if (archive_only_answered) { strcpy(answered_sql, "AND disposition = 'ANSWERED'"); } sprintf(query, "INSERT INTO calls_old (%s) SELECT %s FROM calls WHERE id IN (" "SELECT id FROM calls WHERE calldate BETWEEN '%s' AND '%s' %s AND id NOT IN (" "SELECT id FROM calls_old WHERE calldate BETWEEN '%s' and '%s' %s" ")" ")", colnames, colnames_table_name, min_calldate, max_calldate, answered_sql, min_calldate, max_calldate, answered_sql); if (m2_mysql_query(query)) { exit(1); } } /* Fix inconsistencies by day */ void fix_inconsistencies_by_day(char *start_date, char *end_date) { int done = 0; int counter = 0; char day_start[20] = ""; char day_end[20] = ""; m2_log("Trying to fix inconsistencies in period %s - %s\n", start_date, end_date); strcpy(day_end, end_date); while (!done) { m2_subtract_days(day_start, day_end, 1); m2_log("Checking day %s - %s\n", day_start, day_end); long long int calls_count = get_calls_count_from_table("calls", day_start, day_end); long long int calls_old_count = get_calls_count_from_table("calls_old", day_start, day_end); if (calls_count != calls_old_count) { if (abs(calls_count - calls_old_count) >= 500000) { m2_log("Found inconsistencies - calls count: %lld, calls old count: %lld, difference is too big to fix automatically, skipping this day...\n", calls_count, calls_old_count); continue; } m2_log("Found inconsistencies - calls count: %lld, calls old count: %lld, trying to fix...\n", calls_count, calls_old_count); fix_inconsistencies(day_start, day_end); calls_count = get_calls_count_from_table("calls", day_start, day_end); calls_old_count = get_calls_count_from_table("calls_old", day_start, day_end); if (calls_count != calls_old_count) { m2_log("Failed to fix inconsistencies in day %s - %s\n", day_start, day_end); } else { m2_log("Fixed inconsistencies in day %s - %s\n", day_start, day_end); } } else { m2_log("Inconsistencies not found in this day\n"); } if (!m2_compare_dates(day_end, start_date)) { done = 1; } counter++; if (counter >= 1000) { m2_log("1000 iterations reached, exiting\n"); break; } strcpy(day_end, day_start); } }