#!/usr/bin/env python3 """ Full re-import of ALL legacy payments into ERPNext. - Flushes existing Payment Entries + references - Imports all 542K+ payments from legacy DB - Links to Sales Invoices via payment_item table - Naming: PE-{legacy_payment_id zero-padded to 10 digits} - Invoice refs: SINV-{legacy_invoice_id zero-padded to 10 digits} Usage: docker exec -it erpnext-backend bench --site erp.gigafibre.ca execute 'frappe.utils.background_jobs...' OR: python3 reimport_payments.py (runs via direct PG + MySQL) """ import pymysql import psycopg2 import psycopg2.extras import sys from datetime import datetime, timezone LEGACY = { "host": "legacy-db", "user": "facturation", "password": "VD67owoj", "database": "gestionclient", "connect_timeout": 30, "read_timeout": 600, } PG = { "host": "db", "port": 5432, "user": "postgres", "password": "123", "dbname": "_eb65bdc0c4b1b2d6", } ADMIN = "Administrator" COMPANY = "TARGO" PAID_FROM = "Comptes clients - T" # Receivable PAID_TO = "Banque - T" # Bank # Legacy type -> ERPNext mode_of_payment MODE_MAP = { "ppa": "Bank Draft", "paiement direct": "Bank Transfer", "carte credit": "Credit Card", "cheque": "Cheque", "comptant": "Cash", "reversement": "Bank Transfer", "credit": "Credit Note", "credit targo": "Credit Note", "credit facture": "Credit Note", } BATCH_SIZE = 2000 now_ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f") def pe_name(legacy_id): return "PE-" + str(legacy_id).zfill(10) def sinv_name(legacy_inv_id): return "SINV-" + str(legacy_inv_id).zfill(10) def ts_to_date(t): if not t or t <= 0: return None try: return datetime.fromtimestamp(int(t), tz=timezone.utc).strftime("%Y-%m-%d") except Exception: return None def log(msg): print(msg, flush=True) def main(): log("=== FULL Payment Re-Import ===") log(f"Started at {now_ts}") # ── Connect ── mc = pymysql.connect(**LEGACY) cur = mc.cursor(pymysql.cursors.DictCursor) pg = psycopg2.connect(**PG) pg.autocommit = False pgc = pg.cursor() # ── Step 1: Flush existing Payment Entries ── log("\n[1/6] Flushing existing Payment Entries...") pgc.execute('SELECT COUNT(*) FROM "tabPayment Entry"') existing_count = pgc.fetchone()[0] log(f" Deleting {existing_count} Payment Entries + references...") pgc.execute('DELETE FROM "tabPayment Entry Reference"') pgc.execute('DELETE FROM "tabPayment Entry Deduction"') pgc.execute('DELETE FROM "tabPayment Entry"') pg.commit() log(" Flushed.") # ── Step 2: Load legacy data ── log("\n[2/6] Loading legacy payments...") cur.execute("SELECT * FROM payment ORDER BY id") payments = cur.fetchall() log(f" {len(payments)} payments loaded") log(" Loading payment_items...") cur.execute("SELECT payment_id, invoice_id, amount FROM payment_item ORDER BY payment_id, id") all_items = cur.fetchall() items_by_pay = {} for r in all_items: items_by_pay.setdefault(r["payment_id"], []).append(r) log(f" {len(all_items)} payment-invoice links loaded") mc.close() # ── Step 3: Load mappings from ERPNext ── log("\n[3/6] Loading ERPNext mappings...") pgc.execute('SELECT legacy_account_id, name FROM "tabCustomer" WHERE legacy_account_id > 0') cust_map = {r[0]: r[1] for r in pgc.fetchall()} log(f" {len(cust_map)} customer mappings") # Invoice existence check - just need the set of existing SINV names pgc.execute('SELECT legacy_invoice_id FROM "tabSales Invoice" WHERE legacy_invoice_id > 0') inv_set = set(r[0] for r in pgc.fetchall()) log(f" {len(inv_set)} invoice mappings") # ── Step 4: Ensure custom field exists ── log("\n[4/6] Ensuring custom field...") pgc.execute(""" SELECT 1 FROM information_schema.columns WHERE table_name = 'tabPayment Entry' AND column_name = 'legacy_payment_id' """) if not pgc.fetchone(): pgc.execute("""ALTER TABLE "tabPayment Entry" ADD COLUMN legacy_payment_id BIGINT DEFAULT 0""") pg.commit() log(" Added legacy_payment_id column") else: log(" legacy_payment_id column exists") # ── Step 5: Import all payments ── log("\n[5/6] Importing payments...") ok = skip = err = ref_count = 0 pe_batch = [] per_batch = [] for i, p in enumerate(payments): legacy_id = p["id"] account_id = p["account_id"] cust_name = cust_map.get(account_id) if not cust_name: skip += 1 continue posting_date = ts_to_date(p["date_orig"]) if not posting_date: skip += 1 continue amount = round(abs(float(p["amount"] or 0)), 2) if amount <= 0: skip += 1 continue pay_type = (p["type"] or "").strip().lower() mode = MODE_MAP.get(pay_type, "Bank Transfer") reference = (p["reference"] or "")[:140] memo = (p["memo"] or "")[:140] remarks = f"{reference} {memo}".strip() if reference or memo else "" is_credit = pay_type in ("credit", "credit targo", "credit facture") name = pe_name(legacy_id) # Build invoice references refs = [] items = items_by_pay.get(legacy_id, []) for idx_j, item in enumerate(items): inv_legacy = item["invoice_id"] if inv_legacy and inv_legacy in inv_set: ref_amt = round(abs(float(item["amount"] or 0)), 2) if ref_amt <= 0: continue ref_name = f"PER-{legacy_id}-{idx_j}" refs.append(( ref_name, ADMIN, now_ts, now_ts, ADMIN, 1, # docstatus idx_j + 1, # idx "Sales Invoice", # reference_doctype sinv_name(inv_legacy), # reference_name ref_amt, # allocated_amount 1, # exchange_rate name, # parent "references", # parentfield "Payment Entry", # parenttype )) total_allocated = sum(r[9] for r in refs) # allocated_amount is index 9 unallocated = round(amount - total_allocated, 2) if unallocated < 0: unallocated = 0.0 pe_batch.append(( name, ADMIN, now_ts, now_ts, ADMIN, 1, # docstatus 0, # idx legacy_id, # legacy_payment_id "Receive", # payment_type posting_date, COMPANY, "Customer", # party_type cust_name, # party PAID_FROM, # paid_from "CAD", # paid_from_account_currency PAID_TO, # paid_to "CAD", # paid_to_account_currency amount, # paid_amount 1, # source_exchange_rate amount, # base_paid_amount amount, # received_amount 1, # target_exchange_rate amount, # base_received_amount total_allocated, # total_allocated_amount total_allocated, # base_total_allocated_amount unallocated, # unallocated_amount 0.0, # difference_amount mode or "", # mode_of_payment reference[:140] if reference else "", # reference_no posting_date, # reference_date "Submitted", # status remarks[:140], # remarks "No", # is_opening )) per_batch.extend(refs) ref_count += len(refs) ok += 1 # Flush batch if len(pe_batch) >= BATCH_SIZE: _flush(pgc, pg, pe_batch, per_batch) pe_batch = [] per_batch = [] if (i + 1) % 50000 == 0: log(f" ... {i+1}/{len(payments)} processed ({ok} ok, {skip} skip, {err} err)") # Final flush if pe_batch: _flush(pgc, pg, pe_batch, per_batch) pg.commit() log(f"\n[6/6] Done!") log(f" Imported: {ok}") log(f" Skipped: {skip} (no customer match / no date / zero amount)") log(f" Errors: {err}") log(f" Invoice refs: {ref_count}") pg.close() def _flush(pgc, pg, pe_batch, per_batch): if pe_batch: psycopg2.extras.execute_values( pgc, """INSERT INTO "tabPayment Entry" ( name, owner, creation, modified, modified_by, docstatus, idx, legacy_payment_id, payment_type, posting_date, company, party_type, party, paid_from, paid_from_account_currency, paid_to, paid_to_account_currency, paid_amount, source_exchange_rate, base_paid_amount, received_amount, target_exchange_rate, base_received_amount, total_allocated_amount, base_total_allocated_amount, unallocated_amount, difference_amount, mode_of_payment, reference_no, reference_date, status, remarks, is_opening ) VALUES %s""", pe_batch, page_size=BATCH_SIZE, ) if per_batch: psycopg2.extras.execute_values( pgc, """INSERT INTO "tabPayment Entry Reference" ( name, owner, creation, modified, modified_by, docstatus, idx, reference_doctype, reference_name, allocated_amount, exchange_rate, parent, parentfield, parenttype ) VALUES %s""", per_batch, page_size=BATCH_SIZE * 3, ) pg.commit() if __name__ == "__main__": main()