#! /bin/bash . /usr/src/m2/framework/bash_functions.sh # make sure that we can access database m2_mysql_connect if [ "$?" != "0" ]; then exit 1; fi VERSION=1.4.3 CURL_TIMEOUT=5 ADDITIONAL_FIELDS= LOG_PATH=/var/log/m2/m2_elasticsearch.log ES_HOST=$(get_elasticsearch_host) ES_ARG="$@" ES_DOUBLE_ARG="$1 $2" ES_DB_HOST="localhost" MAIN_DB_IP=$(sed 's/ //g' /etc/m2/system.conf | grep MAIN_DB_IP | awk -F"=" '{print $2}') ADDITIONAL_RESYNC_ARGS="" ES_DB_HOST="" function es_log { echo "$@" echo "[$(date +"%Y-%m-%d %H:%M:%S")] $@" >> $LOG_PATH } if [ "$VERSION_SHOWN" != "1" ]; then es_log "Version: $VERSION" export VERSION_SHOWN=1 fi if [[ $ES_ARG == *"local"* ]] || [ "$USE_ES_LOCALHOST" == "1" ]; then ES_HOST="localhost" ES_ARG=${ES_ARG// local/} export USE_ES_LOCALHOST=1 fi if [ "$1" == "resync" ] && [ "$2" != "interval" ]; then ES_ARG="resync" shift ADDITIONAL_RESYNC_ARGS="$@" fi if [ "$HOST_SHOWN" != "1" ]; then es_log "Elasticsearch host: $ES_HOST" export HOST_SHOWN=1 fi if [ "$MAIN_DB_IP" != "" ]; then ES_DB_HOST=$MAIN_DB_IP else ES_DB_HOST=$DB_HOST fi if [ "$ES_DB_HOST_SHOW" != "1" ]; then es_log "Database host: $ES_DB_HOST" export ES_DB_HOST_SHOW=1 fi check_additional_fields() { COLUMN_EXISTS=$(MYSQL_PWD="$DB_PASSWORD" /usr/bin/mysql -h "$ES_DB_HOST" -u "$DB_USERNAME" $P_OPT "$DB_NAME" -e "DESC calls" | grep -cw "$1") if [ "$COLUMN_EXISTS" == "1" ]; then ADDITIONAL_FIELDS="$ADDITIONAL_FIELDS, $1" fi } if [ "$ES_ARG" == "index delete" ]; then elasticsearch sync stop curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "$ES_HOST:9200/${DB_NAME}?pretty" &> /dev/null exit 0 fi if [ "$ES_ARG" == "status" ]; then curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" "http://$ES_HOST:9200/_cluster/health?pretty" curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" "http://$ES_HOST:9200/_nodes/process?pretty" exit 0 fi if [ "$ES_ARG" == "sync stop" ]; then curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "$ES_HOST:9200/_river/m2_jdbc_river/?pretty" &> /dev/null if [ -e /etc/cron.d/es_sync ]; then rm -f /etc/cron.d/es_sync &> /dev/null service crond restart &> /dev/null fi exit 0 fi if [ "$ES_ARG" == "sync restart" ]; then elasticsearch sync stop elasticsearch sync start exit 0 fi if [ "$ES_ARG" == "sync start" ]; then # check if cron exists if [ ! -e /etc/cron.d/es_sync ]; then cp -f /usr/src/m2/elasticsearch/es_sync_cron /etc/cron.d/es_sync &> /dev/null chmod 0644 /etc/cron.d/es_sync service crond restart &> /dev/null fi check_additional_fields "pdd" check_additional_fields "did_id" check_additional_fields "did_customer_id" check_additional_fields "did_vendor_id" check_additional_fields "did_selling_billsec" check_additional_fields "did_buying_billsec" check_additional_fields "did_selling_price" check_additional_fields "did_buying_price" check_additional_fields "did_buying_rate" check_additional_fields "did_selling_rate" check_additional_fields "tp_src" check_additional_fields "src_number_pool_id" check_additional_fields "src_prefix" # check if previous sync is still running ES_SYNC_IS_RUNNING=$(MYSQL_PWD="$DB_PASSWORD" /usr/bin/mysql -h "$ES_DB_HOST" -u "$DB_USERNAME" $P_OPT "$DB_NAME" -e "show processlist" | grep -cF "/*ES sync*/") if [ "$ES_SYNC_IS_RUNNING" != "0" ]; then es_log "Elasticsearch sync is already running" exit 0 else # double check in 3 seconds sleep 3 ES_SYNC_IS_RUNNING=$(MYSQL_PWD="$DB_PASSWORD" /usr/bin/mysql -h "$ES_DB_HOST" -u "$DB_USERNAME" $P_OPT "$DB_NAME" -e "show processlist" | grep -cF "/*ES sync*/") if [ "$ES_SYNC_IS_RUNNING" != "0" ]; then es_log "Elasticsearch sync is already running" exit 0 fi fi max_attempts=5 attempt=0 system_uptime=$(grep -Po '\d+' /proc/uptime | head -n 1) if [ "$system_uptime" -lt "120" ]; then es_log "System was rebooted recently (uptime $system_uptime seconds), max retries for initial sync is increased to 20" max_attempts=20 fi while (( attempt < max_attempts )); do # get ES count ES_COUNT=$(curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XGET "$ES_HOST:9200/${DB_NAME}/calls/_count?pretty" | awk '{print $3}' | grep -Po '\d+' | head -n 1) # if ES is not empty, get max call id from ES if (( ES_COUNT > 0 )); then # format check from date (current time - 1 day) CHECK_FROM=$(date +%Y-%m-%dT%H:%M:%S -d "1 day ago") # search for max ES call id in last day ES_MAX_CALL_ID=$(curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XGET "$ES_HOST:9200/${DB_NAME}/calls/_search?pretty" -d "{\"size\":0,\"query\":{\"filtered\":{\"filter\":{\"bool\":{\"must\":[{\"range\":{\"calldate\":{\"gte\":\"$CHECK_FROM\"}}}]}}}},\"aggregations\":{\"max_id\":{\"max\":{\"field\":\"id\"}}}}" | grep value | awk '{print $3}' | awk -F"E" 'BEGIN{OFMT="%10.0f"} {print $1 * (10 ^ $2)}' | head -n 1) if (( ES_MAX_CALL_ID < 1 )); then es_log "Last ES call id not found in recent day, searching for last call id in all ES records" # search for max ES call id in all ES records ES_MAX_CALL_ID=$(curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XGET "$ES_HOST:9200/${DB_NAME}/calls/_search?pretty" -d "{\"size\":0,\"query\":{ \"match_all\": {}},\"aggregations\":{\"max_id\":{\"max\":{\"field\":\"id\"}}}}" | grep value | awk '{print $3}' | awk -F"E" 'BEGIN{OFMT="%10.0f"} {print $1 * (10 ^ $2)}' | head -n 1) if (( ES_MAX_CALL_ID < 1 )); then es_log "Can't retrieve last ES call id" es_log "Exiting..." exit 0 fi fi else ES_MAX_CALL_ID=0 fi attempt=$((attempt + 1)) if (( ES_MAX_CALL_ID < 1 )); then es_log "ES last call id is 0, retrying again (#$attempt)" sleep 2 else break fi done # For development environment, do not limit results by time interval if [[ $(hostname) == "dev.kolmisoft.com" ]]; then HOURS_INTERVAL_SQL="" else HOURS_INTERVAL_SQL="AND calldate > NOW() - INTERVAL 6 HOUR" fi es_log "Resyncing from call id: $ES_MAX_CALL_ID" # delete previous sync river curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "$ES_HOST:9200/_river/m2_jdbc_river/?pretty" &> /dev/null # create new sync river # settings described here: https://github.com/mradamlacey/elasticsearch-river-jdbc curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XPUT "$ES_HOST:9200/_river/m2_jdbc_river/_meta?pretty" -d "{ \"type\" : \"jdbc\", \"threadpoolsize\" : 1, \"max_concurrent_bulk_requests\" : 1, \"max_bulk_volume\" : \"2m\", \"jdbc\" : [ { \"url\" : \"jdbc:mysql://${ES_DB_HOST}:3306/${DB_NAME}\", \"user\" : \"$DB_USERNAME\", \"password\" : \"$DB_PASSWORD\", \"ignore_null_values\" : true, \"timezone\" : \"UTC\", \"rounding\" : \"halfeven\", \"scale\" : 9, \"sql\" : [ { \"statement\" : \"/*ES sync*/ SELECT calls.id AS _id,calls.id AS id,calldate,src,dst,duration,billsec,disposition,dst_device_id,provider_id,provider_billsec,provider_price,user_id,user_billsec,user_price,calls.prefix,hangupcause,localized_dst,real_duration,real_billsec,server_id,IF(destinationgroup_id IS NULL,0,destinationgroup_id) AS destinationgroup_id,src_device_id,user_rate,provider_rate,user_id AS src_user_id, IF(managers.responsible_accountant_id IS NULL OR managers.responsible_accountant_id = -1, 0, managers.responsible_accountant_id) AS manager_id $ADDITIONAL_FIELDS FROM calls LEFT JOIN destinations ON destinations.prefix = calls.prefix LEFT JOIN users AS managers ON calls.user_id = managers.id WHERE calls.id > $ES_MAX_CALL_ID $HOURS_INTERVAL_SQL AND disposition = 'ANSWERED'\" } ], \"index\" : \"${DB_NAME}\", \"type\" : \"calls\" }, { \"url\" : \"jdbc:mysql://${ES_DB_HOST}:3306/${DB_NAME}\", \"user\" : \"$DB_USERNAME\", \"password\" : \"$DB_PASSWORD\", \"ignore_null_values\" : true, \"timezone\" : \"UTC\", \"rounding\" : \"halfeven\", \"scale\" : 9, \"sql\" : [ { \"statement\" : \"/*ES sync*/ SELECT calls.id AS _id,calls.id AS id,calldate,src,dst,duration,disposition,dst_device_id,provider_id,calls.user_id,calls.prefix,hangupcause,localized_dst,real_duration,calls.server_id,IF(destinationgroup_id IS NULL,0,destinationgroup_id) AS destinationgroup_id,src_device_id,user_rate,provider_rate,devices.user_id AS src_user_id, IF(managers.responsible_accountant_id IS NULL OR managers.responsible_accountant_id = -1, 0, managers.responsible_accountant_id) AS manager_id $ADDITIONAL_FIELDS FROM calls LEFT JOIN devices ON devices.id = calls.src_device_id LEFT JOIN destinations ON destinations.prefix = calls.prefix LEFT JOIN users AS managers ON calls.user_id = managers.id WHERE calls.id > $ES_MAX_CALL_ID $HOURS_INTERVAL_SQL AND disposition != 'ANSWERED' \" } ], \"index\" : \"${DB_NAME}\", \"type\" : \"calls\" } ] }" # separate SQL for non-Answered calls to save ES space, because it has less data exit 0 fi if [ "$ES_ARG" == "sync hourly stop" ]; then curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "$ES_HOST:9200/_river/m2_hourly_jdbc_river/?pretty" &> /dev/null exit 0 fi if [ "$ES_ARG" == "count" ]; then curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XGET "$ES_HOST:9200/${DB_NAME}/calls/_count?pretty" exit 0 fi if [ "$ES_DOUBLE_ARG" == "count period" ]; then es_log $# if [ "$#" != "6" ]; then es_log "Incorrect number of arguments" es_log "example request: count period 2018-10-30 00:00:00 2018-10-30 10:15:30" exit 1 fi curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XGET "$ES_HOST:9200/${DB_NAME}/calls/_count?q=calldate:\[$3\T$4+TO+$5\T$6\]&pretty" exit 0 fi if [ "$ES_ARG" == "index create" ]; then echo "" > /tmp/es_curl_resp curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -s -XPUT "$ES_HOST:9200/${DB_NAME}?pretty" -d '{ "settings": { "index.cache.query.enable": true }, "mappings": { "calls": { "dynamic": "strict", "_all": { "enabled" : false }, "properties": { "id" : { "type" : "long", "doc_values": true }, "calldate" : { "type" : "date", "doc_values": true }, "src" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "dst" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "duration" : { "type" : "integer", "doc_values": true }, "billsec" : { "type" : "integer", "doc_values": true }, "disposition" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "dst_device_id" : { "type" : "integer", "doc_values": true }, "src_device_id" : { "type" : "integer", "doc_values": true }, "provider_id" : { "type" : "integer", "doc_values": true }, "provider_billsec" : { "type" : "integer", "doc_values": true }, "provider_price" : { "type" : "double", "doc_values": true }, "provider_rate" : { "type" : "double", "doc_values": true }, "user_id" : { "type" : "integer", "doc_values": true }, "src_user_id" : { "type" : "integer", "doc_values": true }, "user_billsec" : { "type" : "integer", "doc_values": true }, "user_price" : { "type" : "double", "doc_values": true }, "user_rate" : { "type" : "double", "doc_values": true }, "prefix" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "hangupcause" : { "type" : "integer", "doc_values": true }, "localized_dst" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "real_duration" : { "type" : "double", "doc_values": true }, "real_billsec" : { "type" : "double", "doc_values": true }, "server_id" : { "type" : "integer", "doc_values": true }, "destinationgroup_id" : { "type" : "integer", "doc_values": true }, "pdd" : { "type" : "double", "doc_values": true }, "manager_id" : { "type" : "integer", "doc_values": true }, "did_id" : { "type" : "integer", "doc_values" : true }, "did_customer_id" : { "type" : "integer", "doc_values" : true }, "did_vendor_id" : { "type" : "integer", "doc_values" : true }, "did_selling_billsec" : { "type" : "integer", "doc_values" : true }, "did_buying_billsec" : { "type" : "integer", "doc_values" : true }, "did_selling_price" : { "type" : "double", "doc_values" : true }, "did_buying_price" : { "type" : "double", "doc_values" : true }, "did_selling_rate" : { "type" : "double", "doc_values" : true }, "did_buying_rate" : { "type" : "double", "doc_values" : true }, "tp_src" : { "type" : "string", "index": "not_analyzed", "doc_values": true }, "src_number_pool_id" : { "type" : "integer", "doc_values" : true }, "src_prefix" : { "type" : "string", "index": "not_analyzed", "doc_values": true } } } } }' > /tmp/es_curl_resp cat /tmp/es_curl_resp if grep 'error' /tmp/es_curl_resp &> /dev/null; then es_log "Index already exists" fi exit 0 fi if [ "$ES_ARG" == "resync" ]; then host_name=$(hostname) # For development env if [ "$host_name" == "dev.kolmisoft.com" ] && [ -d /home/mor ]; then svn info /home/mor | grep 'm2' &> /dev/null if [ "$?" == "0" ]; then ln -fs /usr/src/m2/elasticsearch/m2_elasticsearch.sh /usr/bin/elasticsearch &> /dev/null /usr/src/m2/elasticsearch/m2_elasticsearch.sh index delete &> /dev/null else ln -fs /usr/src/mor/sh_scripts/elasticsearch/mor_elasticsearch.sh /usr/bin/elasticsearch &> /dev/null /usr/src/mor/sh_scripts/elasticsearch/mor_elasticsearch.sh index delete &> /dev/null fi fi if [ "$host_name" != "dev.kolmisoft.com" ]; then # New resync by hours /usr/src/m2/elasticsearch/es_resync_by_day.sh $ADDITIONAL_RESYNC_ARGS else # Old resync taking all data from calls table at once elasticsearch index delete sleep 2 elasticsearch index create sleep 2 elasticsearch sync start curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XPOST "http://$ES_HOST:9200/_aliases" -d ' { "actions" : [ { "add" : { "index" : "mor", "alias" : "m2" } } ] }' &> /dev/null es_log "Elasticsearch synchronization started..." fi exit 0 fi if [ "$ES_DOUBLE_ARG" == "resync interval" ]; then FROM_DATE=$3 TILL_DATE=$4 FROM_DATE_LEN=${#FROM_DATE} TILL_DATE_LEN=${#TILL_DATE} if [ "$#" != "4" ]; then es_log "Incorrect number of arguments" exit 1 fi if [ "$FROM_DATE_LEN" != "10" ] && [ "$FROM_DATE_LEN" != "19" ]; then es_log "Wrong date format: $FROM_DATE" exit 1 fi if [ "$TILL_DATE_LEN" != "10" ] && [ "$TILL_DATE_LEN" != "19" ]; then es_log "Wrong date format: $TILL_DATE" exit 1 fi if [ "$FROM_DATE_LEN" == "10" ]; then FROM_DATE="$FROM_DATE 00:00:00" fi if [ "$TILL_DATE_LEN" == "10" ]; then TILL_DATE="$TILL_DATE 23:59:59" fi es_log "Starting resync for interval $FROM_DATE - $TILL_DATE" # remove old river curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "$ES_HOST:9200/_river/m2_interval_jdbc_river/?pretty" &> /dev/null sleep 3 # remove calls in selected period curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XDELETE "http://$ES_HOST:9200/m2/calls/_query" -d "{ \"query\" : { \"range\" : { \"calldate\" : { gte : \"${FROM_DATE// /T}\", lte : \"${TILL_DATE// /T}\" } } } }" check_additional_fields "pdd" check_additional_fields "did_id" check_additional_fields "did_customer_id" check_additional_fields "did_vendor_id" check_additional_fields "did_selling_billsec" check_additional_fields "did_buying_billsec" check_additional_fields "did_selling_price" check_additional_fields "did_buying_price" check_additional_fields "did_buying_rate" check_additional_fields "did_selling_rate" check_additional_fields "tp_src" check_additional_fields "src_number_pool_id" check_additional_fields "src_prefix" # create new river (insert new calls) curl --silent --show-error --connect-timeout "$CURL_TIMEOUT" -XPUT "$ES_HOST:9200/_river/m2_interval_jdbc_river/_meta?pretty" -d "{ \"type\" : \"jdbc\", \"threadpoolsize\" : 1, \"jdbc\" : [ { \"url\" : \"jdbc:mysql://${ES_DB_HOST}:3306/${DB_NAME}\", \"user\" : \"$DB_USERNAME\", \"password\" : \"$DB_PASSWORD\", \"ignore_null_values\" : true, \"rounding\" : \"halfeven\", \"scale\" : 9, \"sql\" : [ { \"statement\" : \"/*ES interval sync*/ SELECT calls.id AS _id,calls.id AS id,calldate,src,dst,duration,billsec,disposition,dst_device_id,provider_id,provider_billsec,provider_price,user_id,user_billsec,user_price,calls.prefix,hangupcause,localized_dst,real_duration,real_billsec,server_id,IF(destinationgroup_id IS NULL,0,destinationgroup_id) AS destinationgroup_id,src_device_id,user_rate,provider_rate,user_id AS src_user_id, IF(managers.responsible_accountant_id IS NULL OR managers.responsible_accountant_id = -1, 0, managers.responsible_accountant_id) AS manager_id $ADDITIONAL_FIELDS FROM calls LEFT JOIN destinations ON destinations.prefix = calls.prefix LEFT JOIN users AS managers ON calls.user_id = managers.id WHERE disposition = 'ANSWERED' AND calls.calldate BETWEEN '$FROM_DATE' AND '$TILL_DATE'\" } ], \"index\" : \"${DB_NAME}\", \"type\" : \"calls\" }, { \"url\" : \"jdbc:mysql://${ES_DB_HOST}:3306/${DB_NAME}\", \"user\" : \"$DB_USERNAME\", \"password\" : \"$DB_PASSWORD\", \"ignore_null_values\" : true, \"timezone\" : \"UTC\", \"rounding\" : \"halfeven\", \"scale\" : 9, \"sql\" : [ { \"statement\" : \"/*ES interval sync*/ SELECT calls.id AS _id,calls.id AS id,calldate,src,dst,duration,disposition,dst_device_id,provider_id,calls.user_id,calls.prefix,hangupcause,localized_dst,real_duration,calls.server_id,IF(destinationgroup_id IS NULL,0,destinationgroup_id) AS destinationgroup_id,src_device_id,user_rate,provider_rate,devices.user_id AS src_user_id, IF(managers.responsible_accountant_id IS NULL OR managers.responsible_accountant_id = -1, 0, managers.responsible_accountant_id) AS manager_id $ADDITIONAL_FIELDS FROM calls LEFT JOIN devices ON devices.id = calls.src_device_id LEFT JOIN destinations ON destinations.prefix = calls.prefix LEFT JOIN users AS managers ON calls.user_id = managers.id WHERE disposition != 'ANSWERED' AND calls.calldate BETWEEN '$FROM_DATE' AND '$TILL_DATE'\" } ], \"index\" : \"${DB_NAME}\", \"type\" : \"calls\" } ] }" exit 0 fi es_log "Wrong command: $ES_ARG" exit 1