import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; const corsHeaders = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "authorization, x-client-info, apikey, content-type, x-supabase-client-platform, x-supabase-client-platform-version, x-supabase-client-runtime, x-supabase-client-runtime-version", }; const INITIAL_BATCH_SIZE = 1000; const MIN_BATCH_SIZE = 100; const MAX_BATCH_SIZE = 2000; const BATCH_GROW_FACTOR = 1.25; // Increase by 25% on success const BATCH_SHRINK_FACTOR = 0.5; // Halve on error const MAX_ROWS_PER_CALL = 50000; const MAX_RETRIES = 3; const RETRY_DELAY_MS = 2000; const TIME_LIMIT_MS = 110000; const LOG_INTERVAL = 5000; // ─── GIN Index Definitions (to drop/recreate) ────────────────────────────── const GIN_INDEXES = [ { name: "idx_addr_formatee_trgm", def: `CREATE INDEX idx_addr_formatee_trgm ON public.addresses USING gin (adresse_formatee gin_trgm_ops)` }, { name: "idx_addr_municipalite_trgm", def: `CREATE INDEX idx_addr_municipalite_trgm ON public.addresses USING gin (nom_municipalite gin_trgm_ops)` }, { name: "idx_addr_odonyme_court_trgm", def: `CREATE INDEX idx_addr_odonyme_court_trgm ON public.addresses USING gin (odonyme_recompose_court gin_trgm_ops)` }, { name: "idx_addr_odonyme_long_trgm", def: `CREATE INDEX idx_addr_odonyme_long_trgm ON public.addresses USING gin (odonyme_recompose_long gin_trgm_ops)` }, { name: "idx_addr_odonyme_normal_trgm", def: `CREATE INDEX idx_addr_odonyme_normal_trgm ON public.addresses USING gin (odonyme_recompose_normal gin_trgm_ops)` }, { name: "idx_addresses_adresse_formatee_trgm", def: `CREATE INDEX idx_addresses_adresse_formatee_trgm ON public.addresses USING gin (adresse_formatee gin_trgm_ops)` }, { name: "idx_addresses_num_odonyme_trgm", def: `CREATE INDEX idx_addresses_num_odonyme_trgm ON public.addresses USING gin (((COALESCE(numero_municipal, ''::text) || ' '::text) || COALESCE(odonyme_recompose_normal, ''::text))) gin_trgm_ops)` }, ]; // ─── CSV Parsing helpers ───────────────────────────────────────────────────── function parseLine(line: string, delimiter: string): string[] { if (delimiter === "\t" || delimiter === ";") { return line.split(delimiter).map((v) => v.trim().replace(/^"|"$/g, "")); } const values: string[] = []; let current = ""; let inQuotes = false; for (let i = 0; i < line.length; i++) { const char = line[i]; if (char === '"') { if (inQuotes && i + 1 < line.length && line[i + 1] === '"') { current += '"'; i++; } else { inQuotes = !inQuotes; } } else if (char === delimiter && !inQuotes) { values.push(current.trim()); current = ""; } else { current += char; } } values.push(current.trim()); return values; } function normalizeHeader(name: string): string { return name .toLowerCase() .normalize("NFD") .replace(/[\u0300-\u036f]/g, "") .replace(/[_\s]+/g, "_") .replace(/[^a-z0-9_]/g, "") .trim(); } const FIBER_HEADER_MAP: Record = { id_placemark: "id_placemark", idplacemark: "id_placemark", id_appart: "id_appart", idappart: "id_appart", uuidadresse: "uuidadresse", idgouvqc: "uuidadresse", id_gouv_qc: "uuidadresse", nom: "nom", civique: "civique", appartement: "appartement", appt: "appartement", apt: "appartement", rue: "rue", ville: "ville", code_postal: "code_postal", codepostal: "code_postal", cp: "code_postal", lien_googlemap: "lien_googlemap", liengooglemap: "lien_googlemap", googlemap: "lien_googlemap", zone_tarifaire: "zone_tarifaire", zonetarifaire: "zone_tarifaire", zone: "zone_tarifaire", max_speed: "max_speed", maxspeed: "max_speed", vitesse: "max_speed", vitesse_max: "max_speed", commentaire: "commentaire", commentaires: "commentaire", comment: "commentaire", lien_map_targo: "_skip", lienmaptargo: "_skip", }; // ─── Row Mapping ───────────────────────────────────────────────────────────── function mapAddressRow(r: Record) { return { identifiant_unique_adresse: r.identifiant_unique_adresse, date_diffusion_version: r.date_diffusion_version || null, date_creation: r.date_creation || null, date_modification: r.date_modification || null, date_fin: r.date_fin || null, numero_municipal: r.numero_municipal || null, numero_municipal_suffixe: r.numero_municipal_suffixe || null, numero_unite: r.numero_unite || null, type_unite: r.type_unite || null, code_postal: r.code_postal || null, seqodo: r.seqodo || null, generique_odonyme: r.generique_odonyme || null, particule_odonyme: r.particule_odonyme || null, specifique_odonyme: r.specifique_odonyme || null, point_cardinal_odonyme: r.point_cardinal_odonyme || null, odonyme_recompose_normal: r.odonyme_recompose_normal || null, odonyme_recompose_court: r.odonyme_recompose_court || null, odonyme_recompose_long: r.odonyme_recompose_long || null, discriminant_odonyme: r.discriminant_odonyme || null, odonyme_officiel_ctop: r.odonyme_officiel_ctop || null, adresse_formatee: r.adresse_formatee || null, qualite_positionnement_geometrique: r.qualite_positionnement_geometrique || null, etat: r.etat || null, code_utilisation: r.code_utilisation || null, categorie: r.categorie || null, nombre_unite: r.nombre_unite ? parseInt(r.nombre_unite) : null, numero_lot: r.numero_lot || null, matricule_unite_evaluation: r.matricule_unite_evaluation || null, nosqnocivq: r.nosqnocivq || null, caracteristique_adresse: r.caracteristique_adresse || null, code_region_administrative: r.code_region_administrative || null, nom_region_administrative: r.nom_region_administrative || null, code_mrc: r.code_mrc || null, nom_mrc: r.nom_mrc || null, code_municipalite: r.code_municipalite || null, nom_municipalite: r.nom_municipalite || null, nom_municipalite_complet: r.nom_municipalite_complet || null, code_arrondissement: r.code_arrondissement || null, nom_arrondissement: r.nom_arrondissement || null, code_communaute_metropolitaine: r.code_communaute_metropolitaine || null, nom_communaute_metropolitaine: r.nom_communaute_metropolitaine || null, identifiant_reseau_routier: r.identifiant_reseau_routier || null, cote_route: r.cote_route || null, longitude: r.longitude ? parseFloat(r.longitude) : null, latitude: r.latitude ? parseFloat(r.latitude) : null, }; } function mapFiberRow(r: Record) { if (!r.uuidadresse) return null; return { id_placemark: r.id_placemark || null, id_appart: r.id_appart || null, uuidadresse: r.uuidadresse, nom: r.nom || null, civique: r.civique || null, appartement: r.appartement || null, rue: r.rue || null, ville: r.ville || null, code_postal: r.code_postal || null, lien_googlemap: r.lien_googlemap || null, zone_tarifaire: r.zone_tarifaire ? parseInt(r.zone_tarifaire) : 1, max_speed: r.max_speed ? parseInt(r.max_speed) : 0, commentaire: r.commentaire || null, }; } // ─── Upsert with retry ────────────────────────────────────────────────────── async function insertWithRetry( supabase: any, table: string, batch: any[], ): Promise<{ error: any; timedOut: boolean }> { for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { const { error } = await supabase.from(table).insert(batch); if (!error) return { error: null, timedOut: false }; // On duplicate key, skip silently (data already exists) if (error.code === "23505") return { error: null, timedOut: false }; const isTimeout = !!error.message?.includes("statement timeout"); if (isTimeout && attempt < MAX_RETRIES - 1) { await new Promise((r) => setTimeout(r, RETRY_DELAY_MS * (attempt + 1))); continue; } return { error, timedOut: isTimeout }; } return { error: { message: "Max retries exceeded" }, timedOut: true }; } // ─── Index Management ─────────────────────────────────────────────────────── async function dropGinIndexes(supabase: any, log: (msg: string) => void) { log("🗑️ Dropping GIN trigram indexes for faster import…"); for (const idx of GIN_INDEXES) { const { error } = await supabase.rpc("exec_sql", { query: `DROP INDEX IF EXISTS public.${idx.name}` }); if (error) { // Try via direct SQL if rpc doesn't exist log(`⚠️ Could not drop ${idx.name} via rpc: ${error.message}`); } else { log(` ✅ Dropped ${idx.name}`); } } } async function createGinIndexes(supabase: any, log: (msg: string) => void) { log("🔨 Recreating GIN trigram indexes (this may take a while)…"); for (const idx of GIN_INDEXES) { const { error } = await supabase.rpc("exec_sql", { query: idx.def }); if (error) { log(`⚠️ Could not create ${idx.name}: ${error.message}`); } else { log(` ✅ Created ${idx.name}`); } } } // ─── Main Handler ──────────────────────────────────────────────────────────── Deno.serve(async (req) => { if (req.method === "OPTIONS") { return new Response(null, { headers: corsHeaders }); } const serviceClient = createClient( Deno.env.get("SUPABASE_URL")!, Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")!, { db: { schema: "public" }, global: { headers: { "x-statement-timeout": "120000" } } } ); // Find the oldest active import job const { data: jobs, error: jobErr } = await serviceClient .from("import_jobs") .select("*") .in("status", ["running", "queued"]) .order("created_at", { ascending: true }) .limit(1); if (jobErr || !jobs || jobs.length === 0) { return new Response(JSON.stringify({ message: "No active jobs" }), { headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const job = jobs[0]; const logLines: string[] = [...(job.logs || [])].slice(-300); function log(msg: string) { const ts = new Date().toISOString().slice(11, 19); logLines.push(`[${ts}] ${msg}`); console.log(`[process-import-job] ${msg}`); } const updateJob = async (updates: Record) => { await serviceClient .from("import_jobs") .update({ ...updates, logs: logLines.slice(-500), updated_at: new Date().toISOString() }) .eq("id", job.id); }; await updateJob({ status: "running" }); const url = job.source_url; if (!url) { log("❌ No source URL"); await updateJob({ status: "error", error_messages: ["No source URL"] }); return new Response(JSON.stringify({ error: "No source URL" }), { headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const type = job.type as "addresses" | "fiber"; const byteOffset: number = job.byte_offset || 0; let totalInserted = job.total_inserted || 0; let totalErrors = job.total_errors || 0; const errorMessages: string[] = [...(job.error_messages || [])]; const startTime = Date.now(); // Detect if this is a direct CSV (not ZIP) — only CSV supports Range const isZip = url.toLowerCase().includes(".zip"); const useRange = !isZip && byteOffset > 0; // Drop GIN indexes on addresses table for faster upserts (only on first chunk or when explicitly needed) const indexesDropped = job.indexes_dropped === true; if (type === "addresses" && !indexesDropped) { await dropGinIndexes(serviceClient, log); await updateJob({ indexes_dropped: true }); } try { log(`🚀 Job ${job.id.slice(0, 8)}… type=${type} byteOffset=${byteOffset.toLocaleString()} totalInserted=${totalInserted.toLocaleString()}`); // Download the source file, using Range header for CSV resume const fetchHeaders: Record = {}; if (useRange) { fetchHeaders["Range"] = `bytes=${byteOffset}-`; log(`📥 Resuming from byte ${byteOffset.toLocaleString()}`); } else { log("📥 Downloading source file…"); } const resp = await fetch(url, { headers: fetchHeaders }); if (!resp.ok || !resp.body) { throw new Error(`Download failed: ${resp.status} ${resp.statusText}`); } const rangeSupported = useRange && resp.status === 206; if (useRange && !rangeSupported) { log(`⚠️ Server returned ${resp.status} instead of 206 — Range not supported, will skip rows`); } log(`📥 Download started (status ${resp.status})`); let csvStream: ReadableStream; if (isZip) { csvStream = await decompressZipStream(resp.body, log); } else { csvStream = resp.body; } // Process CSV from stream const streamReader = csvStream.getReader(); const decoder = new TextDecoder("utf-8", { fatal: false }); const knownHeaders = job.headers_cache as string[] | null; let headers: string[] = knownHeaders || []; let delimiter = ","; let leftover = ""; let totalRowsRead = 0; let processedRows = 0; let inserted = 0; let errors = 0; let pendingRows: any[] = []; let done = false; let stoppedByLimit = false; let bytesConsumed = 0; let currentBatchSize = job.current_batch_size || INITIAL_BATCH_SIZE; let consecutiveSuccesses = 0; const needsHeaderDetection = headers.length === 0; if (!needsHeaderDetection) { log(`📋 Using cached headers (${headers.length} cols) | batch=${currentBatchSize}`); } const flushBatch = async (batch: any[]) => { if (batch.length === 0) return; const table = type === "addresses" ? "addresses" : "fiber_availability"; const { error, timedOut } = await insertWithRetry(serviceClient, table, batch); if (error) { const msg = error.message || "Unknown error"; if (timedOut) { // Shrink batch size on timeout — re-queue these rows by NOT counting them as errors const oldSize = currentBatchSize; currentBatchSize = Math.max(MIN_BATCH_SIZE, Math.floor(currentBatchSize * BATCH_SHRINK_FACTOR)); consecutiveSuccesses = 0; log(`⚠️ Timeout on batch of ${batch.length} — shrinking batch ${oldSize} → ${currentBatchSize}`); // Split and retry the failed batch in smaller chunks for (let i = 0; i < batch.length; i += currentBatchSize) { const subBatch = batch.slice(i, i + currentBatchSize); const retry = await insertWithRetry(serviceClient, table, subBatch); if (retry.error) { errors += subBatch.length; if (errorMessages.length < 20) errorMessages.push(retry.error.message || msg); } else { inserted += subBatch.length; } } } else { log(`❌ Batch error (${batch.length} rows): ${msg}`); errors += batch.length; if (errorMessages.length < 20) errorMessages.push(msg); } } else { inserted += batch.length; consecutiveSuccesses++; // Grow batch size after 3 consecutive successes if (consecutiveSuccesses >= 3 && currentBatchSize < MAX_BATCH_SIZE) { const oldSize = currentBatchSize; currentBatchSize = Math.min(MAX_BATCH_SIZE, Math.floor(currentBatchSize * BATCH_GROW_FACTOR)); if (currentBatchSize !== oldSize) { log(`🚀 3 successes — growing batch ${oldSize} → ${currentBatchSize}`); } consecutiveSuccesses = 0; } } }; const skipRows = (!rangeSupported && !isZip) ? (job.skip_rows || 0) : 0; if (skipRows > 0) { log(`⏩ Will skip ${skipRows.toLocaleString()} rows (no Range support)`); } while (true) { if (Date.now() - startTime > TIME_LIMIT_MS) { stoppedByLimit = true; break; } if (processedRows >= MAX_ROWS_PER_CALL) { stoppedByLimit = true; break; } const { done: streamDone, value } = await streamReader.read(); const chunk = value ? decoder.decode(value, { stream: !streamDone }) : ""; if (value) bytesConsumed += value.byteLength; const text = leftover + chunk; const lines = text.split(/\r?\n/); leftover = streamDone ? "" : (lines.pop() || ""); for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; if (headers.length === 0) { const cleanLine = trimmed.charCodeAt(0) === 0xfeff ? trimmed.slice(1) : trimmed; const tabCount = (cleanLine.match(/\t/g) || []).length; const semiCount = (cleanLine.match(/;/g) || []).length; const commaCount = (cleanLine.match(/,/g) || []).length; delimiter = tabCount >= semiCount && tabCount >= commaCount ? "\t" : semiCount >= commaCount ? ";" : ","; const candidateHeaders = parseLine(cleanLine, delimiter); if (candidateHeaders.length < 3) continue; headers = candidateHeaders; log(`📋 Headers (${headers.length}): ${headers.slice(0, 5).join(", ")}…`); continue; } // Auto-detect delimiter for Range resume if (needsHeaderDetection === false && totalRowsRead === 0 && processedRows === 0) { const tabCount = (trimmed.match(/\t/g) || []).length; const semiCount = (trimmed.match(/;/g) || []).length; const commaCount = (trimmed.match(/,/g) || []).length; delimiter = tabCount >= semiCount && tabCount >= commaCount ? "\t" : semiCount >= commaCount ? ";" : ","; } totalRowsRead++; if (skipRows > 0 && totalRowsRead <= skipRows) { if (totalRowsRead % 500000 === 0) { log(`⏩ Skipping… ${totalRowsRead.toLocaleString()} / ${skipRows.toLocaleString()}`); } continue; } const values = parseLine(trimmed, delimiter); const row: Record = {}; if (type === "fiber") { headers.forEach((h, idx) => { const norm = normalizeHeader(h); const dbCol = FIBER_HEADER_MAP[norm]; if (dbCol && dbCol !== "_skip") { row[dbCol] = values[idx] || ""; } }); } else { headers.forEach((h, idx) => { row[h] = values[idx] || ""; }); } const mapped = type === "addresses" ? mapAddressRow(row) : mapFiberRow(row); if (mapped) { pendingRows.push(mapped); processedRows++; } if (pendingRows.length >= currentBatchSize) { await flushBatch(pendingRows); pendingRows = []; } // Log progress at regular intervals if (processedRows > 0 && processedRows % LOG_INTERVAL === 0) { const elapsed = ((Date.now() - startTime) / 1000).toFixed(0); const rate = (processedRows / ((Date.now() - startTime) / 1000)).toFixed(0); log(`📊 ${processedRows.toLocaleString()} rows (+${inserted.toLocaleString()} ok, ${errors} err) | ${rate} rows/s | batch=${currentBatchSize} | ${elapsed}s`); // Persist progress periodically await updateJob({ total_inserted: totalInserted + inserted, total_errors: totalErrors + errors, byte_offset: byteOffset + bytesConsumed, }); } if (processedRows >= MAX_ROWS_PER_CALL || Date.now() - startTime > TIME_LIMIT_MS) { stoppedByLimit = true; break; } } if (stoppedByLimit) break; if (streamDone) { done = true; break; } } // Flush remaining if (pendingRows.length > 0) { await flushBatch(pendingRows); } if (!done) { try { streamReader.cancel(); } catch {} } const newByteOffset = byteOffset + bytesConsumed; const nextSkipRows = (job.skip_rows || 0) + processedRows; totalInserted += inserted; totalErrors += errors; const isComplete = done && !stoppedByLimit; const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); const rate = (processedRows / (parseFloat(elapsed) || 1)).toFixed(0); if (isComplete) { log(`✅ COMPLETE: ${processedRows.toLocaleString()} rows this run (${rate}/s), total: ${totalInserted.toLocaleString()}, errors: ${totalErrors} (${elapsed}s)`); // Recreate GIN indexes now that import is done if (type === "addresses") { await createGinIndexes(serviceClient, log); } await updateJob({ status: "done", byte_offset: newByteOffset, skip_rows: nextSkipRows, total_inserted: totalInserted, total_errors: totalErrors, error_messages: errorMessages.slice(-20), indexes_dropped: false, }); } else { log(`⏸️ Chunk: +${processedRows.toLocaleString()} rows (${rate}/s), inserted=${inserted.toLocaleString()}, batch=${currentBatchSize}, byteOffset=${newByteOffset.toLocaleString()} (${elapsed}s)`); await updateJob({ status: "running", byte_offset: newByteOffset, skip_rows: nextSkipRows, total_inserted: totalInserted, total_errors: totalErrors, error_messages: errorMessages.slice(-20), headers_cache: headers, current_batch_size: currentBatchSize, }); } return new Response( JSON.stringify({ success: true, jobId: job.id, processed: processedRows, inserted, totalInserted, done: isComplete, byteOffset: newByteOffset, }), { headers: { ...corsHeaders, "Content-Type": "application/json" } } ); } catch (err: any) { log(`💥 Fatal: ${err.message}`); console.error("[process-import-job] Error:", err); await updateJob({ status: "paused", error_messages: [...errorMessages, err.message].slice(-20), }); return new Response(JSON.stringify({ error: err.message }), { status: 500, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } }); // ─── Streaming ZIP Decompression (legacy) ─────────────────────────────────── async function decompressZipStream( zipStream: ReadableStream, log: (msg: string) => void ): Promise> { const reader = zipStream.getReader(); let buffer = new Uint8Array(0); async function readBytes(n: number): Promise { while (buffer.length < n) { const { done, value } = await reader.read(); if (done) throw new Error("Unexpected end of ZIP stream"); const newBuf = new Uint8Array(buffer.length + value.length); newBuf.set(buffer); newBuf.set(value, buffer.length); buffer = newBuf; } const result = buffer.slice(0, n); buffer = buffer.slice(n); return result; } const header = await readBytes(30); const view = new DataView(header.buffer, header.byteOffset, header.byteLength); const sig = view.getUint32(0, true); if (sig !== 0x04034b50) { throw new Error(`Invalid ZIP local file header signature: 0x${sig.toString(16)}`); } const method = view.getUint16(8, true); const compressedSize = view.getUint32(18, true); const filenameLen = view.getUint16(26, true); const extraLen = view.getUint16(28, true); const nameExtra = await readBytes(filenameLen + extraLen); const filename = new TextDecoder().decode(nameExtra.slice(0, filenameLen)); log(`📄 ZIP entry: "${filename}" method=${method === 8 ? "DEFLATE" : method === 0 ? "STORED" : method} size=${compressedSize > 0 ? (compressedSize / 1024 / 1024).toFixed(1) + "MB" : "streaming"}`); const remainingStream = new ReadableStream({ async pull(controller) { if (buffer.length > 0) { controller.enqueue(buffer); buffer = new Uint8Array(0); return; } const { done, value } = await reader.read(); if (done) { controller.close(); return; } controller.enqueue(value); }, cancel() { reader.cancel(); }, }); if (method === 0) { return remainingStream; } else if (method === 8) { log("🔓 Streaming DEFLATE decompression..."); return remainingStream.pipeThrough(new DecompressionStream("deflate-raw")); } else { throw new Error(`Unsupported ZIP compression method: ${method}`); } }