psql_sync_mysql.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # encoding: utf-8
  2. import sys
  3. import pymysql.cursors
  4. import psycopg2
  5. import time
  6. reload(sys)
  7. sys.setdefaultencoding('utf-8')
  8. def mysql_connect(host, db, user, password):
  9. try:
  10. mysql_conn = pymysql.connect(host=host,
  11. user=user,
  12. password=password,
  13. db=db,
  14. charset='utf8mb4',
  15. cursorclass=pymysql.cursors.DictCursor)
  16. return mysql_conn
  17. except:
  18. print('连接mysql失败!')
  19. pass
  20. def postgres_connect(host, port, db, user, password):
  21. try:
  22. postgres_conn = psycopg2.connect(database=db, user=user, password=password, host=host, port=port)
  23. return postgres_conn
  24. except:
  25. print('连接Postgres失败!')
  26. pass
  27. if __name__ == '__main__':
  28. # 获取postgres连接
  29. postgres_conn = postgres_connect('127.0.0.1', '5432', 'nwaycc', 'postgres', 'Nway2017')
  30. # 获取mysql连接
  31. mysql_conn = mysql_connect('47.94.228.245', 'swdz_crm', 'dbuser', 'BXhKw6JRxMy9FTb2')
  32. # try:
  33. postgres_cur = postgres_conn.cursor()
  34. mysql_cur = mysql_conn.cursor()
  35. ip = '172.31.20.135'
  36. mysql_cur.execute(
  37. """select cdr_id from call_records where ip = %s order by id desc limit 1""",
  38. (ip)
  39. )
  40. last_id = mysql_cur.fetchone()
  41. last_id = last_id['cdr_id'] if last_id else 0
  42. # 获取要同步的通话详单
  43. postgres_cur.execute("""select caller,start_time,end_time,record_path,hangup_dispostion,call_id,id,term_status,intention from ai_cdr where id > %s order by id asc""",
  44. ([last_id]))
  45. call_records = postgres_cur.fetchall()
  46. for call_record in call_records:
  47. record_path = ip+':8088/recordings/'+call_record[3]
  48. mysql_cur.execute(
  49. """insert into call_records(phone, start_time, end_time, record_path, hangup_dispostion, call_id, cdr_id, term_status,intention, ip) value (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
  50. (call_record[0], call_record[1], call_record[2], record_path,
  51. call_record[4], call_record[5], call_record[6], call_record[7], call_record[8], ip)
  52. )
  53. mysql_conn.commit()
  54. # 获取通话过程
  55. postgres_cur.execute(
  56. """select ai_speak, people_speak, call_id, voice_path, flow_time, node_name from ai_flow_content where call_id = %s""",
  57. ([call_record[5]]))
  58. call_records_processes = postgres_cur.fetchall()
  59. for call_records_process in call_records_processes:
  60. str1 = call_records_process[0]
  61. str2 = call_records_process[3]
  62. str1 = str1.replace('/opt/fsgui/nway-sys', ip+':8088')
  63. str2 = str2.replace('/opt/fsgui/nway-sys', ip+':8088')
  64. mysql_cur.execute(
  65. """insert into call_records_process(ai_speak, people_speak, call_id, voice_path, flow_time, node_name, ip) value (%s, %s, %s, %s, %s, %s, %s)""",
  66. (str1, call_records_process[1], call_records_process[2], str2,
  67. call_records_process[4], call_records_process[5], ip)
  68. )
  69. mysql_conn.commit()
  70. mysql_conn.close()
  71. postgres_conn.close()
  72. # except:
  73. # print "Error: insert data error"
  74. # pass