import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; const corsHeaders = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "authorization, x-client-info, apikey, content-type, x-supabase-client-platform, x-supabase-client-platform-version, x-supabase-client-runtime, x-supabase-client-runtime-version", }; const BATCH_SIZE = 200; const MAX_ROWS_PER_CALL = 50000; const MAX_RETRIES = 3; const RETRY_DELAY_MS = 2000; const TIME_LIMIT_MS = 120000; // Collect log lines for returning to frontend let logLines: string[] = []; function log(msg: string) { const ts = new Date().toISOString().slice(11, 19); const line = `[${ts}] ${msg}`; logLines.push(line); console.log(`[import-csv-url] ${msg}`); } async function upsertWithRetry( supabase: any, table: string, batch: any[], onConflict: string, ignoreDuplicates = false ) { for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { const opts: any = { onConflict }; if (ignoreDuplicates) opts.ignoreDuplicates = true; const { error } = await supabase.from(table).upsert(batch, opts); if (!error) return { error: null }; if (error.message?.includes("statement timeout") && attempt < MAX_RETRIES - 1) { log(`⚠️ Statement timeout on batch (attempt ${attempt + 1}/${MAX_RETRIES}), retrying...`); await new Promise((r) => setTimeout(r, RETRY_DELAY_MS * (attempt + 1))); continue; } return { error }; } return { error: { message: "Max retries exceeded" } }; } function parseLine(line: string, delimiter: string): string[] { if (delimiter === "\t" || delimiter === ";") { return line.split(delimiter).map((v) => v.trim().replace(/^"|"$/g, "")); } const values: string[] = []; let current = ""; let inQuotes = false; for (let i = 0; i < line.length; i++) { const char = line[i]; if (char === '"') { if (inQuotes && i + 1 < line.length && line[i + 1] === '"') { current += '"'; i++; } else { inQuotes = !inQuotes; } } else if (char === delimiter && !inQuotes) { values.push(current.trim()); current = ""; } else { current += char; } } values.push(current.trim()); return values; } function normalizeHeader(name: string): string { return name .toLowerCase() .normalize("NFD") .replace(/[\u0300-\u036f]/g, "") .replace(/[_\s]+/g, "_") .replace(/[^a-z0-9_]/g, "") .trim(); } const FIBER_HEADER_MAP: Record = { id_placemark: "id_placemark", idplacemark: "id_placemark", id_appart: "id_appart", idappart: "id_appart", uuidadresse: "uuidadresse", idgouvqc: "uuidadresse", id_gouv_qc: "uuidadresse", nom: "nom", civique: "civique", appartement: "appartement", appt: "appartement", apt: "appartement", rue: "rue", ville: "ville", code_postal: "code_postal", codepostal: "code_postal", cp: "code_postal", lien_googlemap: "lien_googlemap", liengooglemap: "lien_googlemap", googlemap: "lien_googlemap", zone_tarifaire: "zone_tarifaire", zonetarifaire: "zone_tarifaire", zone: "zone_tarifaire", max_speed: "max_speed", maxspeed: "max_speed", vitesse: "max_speed", vitesse_max: "max_speed", commentaire: "commentaire", commentaires: "commentaire", comment: "commentaire", lien_map_targo: "_skip", lienmaptargo: "_skip", }; function mapAddressRow(r: Record) { return { identifiant_unique_adresse: r.identifiant_unique_adresse, date_diffusion_version: r.date_diffusion_version || null, date_creation: r.date_creation || null, date_modification: r.date_modification || null, date_fin: r.date_fin || null, numero_municipal: r.numero_municipal || null, numero_municipal_suffixe: r.numero_municipal_suffixe || null, numero_unite: r.numero_unite || null, type_unite: r.type_unite || null, code_postal: r.code_postal || null, seqodo: r.seqodo || null, generique_odonyme: r.generique_odonyme || null, particule_odonyme: r.particule_odonyme || null, specifique_odonyme: r.specifique_odonyme || null, point_cardinal_odonyme: r.point_cardinal_odonyme || null, odonyme_recompose_normal: r.odonyme_recompose_normal || null, odonyme_recompose_court: r.odonyme_recompose_court || null, odonyme_recompose_long: r.odonyme_recompose_long || null, discriminant_odonyme: r.discriminant_odonyme || null, odonyme_officiel_ctop: r.odonyme_officiel_ctop || null, adresse_formatee: r.adresse_formatee || null, qualite_positionnement_geometrique: r.qualite_positionnement_geometrique || null, etat: r.etat || null, code_utilisation: r.code_utilisation || null, categorie: r.categorie || null, nombre_unite: r.nombre_unite ? parseInt(r.nombre_unite) : null, numero_lot: r.numero_lot || null, matricule_unite_evaluation: r.matricule_unite_evaluation || null, nosqnocivq: r.nosqnocivq || null, caracteristique_adresse: r.caracteristique_adresse || null, code_region_administrative: r.code_region_administrative || null, nom_region_administrative: r.nom_region_administrative || null, code_mrc: r.code_mrc || null, nom_mrc: r.nom_mrc || null, code_municipalite: r.code_municipalite || null, nom_municipalite: r.nom_municipalite || null, nom_municipalite_complet: r.nom_municipalite_complet || null, code_arrondissement: r.code_arrondissement || null, nom_arrondissement: r.nom_arrondissement || null, code_communaute_metropolitaine: r.code_communaute_metropolitaine || null, nom_communaute_metropolitaine: r.nom_communaute_metropolitaine || null, identifiant_reseau_routier: r.identifiant_reseau_routier || null, cote_route: r.cote_route || null, longitude: r.longitude ? parseFloat(r.longitude) : null, latitude: r.latitude ? parseFloat(r.latitude) : null, }; } function mapFiberRow(r: Record) { if (!r.uuidadresse) return null; return { id_placemark: r.id_placemark || null, id_appart: r.id_appart || null, uuidadresse: r.uuidadresse, nom: r.nom || null, civique: r.civique || null, appartement: r.appartement || null, rue: r.rue || null, ville: r.ville || null, code_postal: r.code_postal || null, lien_googlemap: r.lien_googlemap || null, zone_tarifaire: r.zone_tarifaire ? parseInt(r.zone_tarifaire) : 1, max_speed: r.max_speed ? parseInt(r.max_speed) : 0, commentaire: r.commentaire || null, }; } Deno.serve(async (req) => { // Reset logs for each request logLines = []; if (req.method === "OPTIONS") { return new Response(null, { headers: corsHeaders }); } try { // --- Auth check --- const authHeader = req.headers.get("Authorization"); if (!authHeader) { return new Response(JSON.stringify({ error: "Unauthorized" }), { status: 401, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const authClient = createClient( Deno.env.get("SUPABASE_URL")!, Deno.env.get("SUPABASE_ANON_KEY")!, { global: { headers: { Authorization: authHeader } } } ); const token = authHeader.replace("Bearer ", ""); const { data: claimsData, error: authError } = await authClient.auth.getClaims(token); if (authError || !claimsData?.claims?.sub) { return new Response(JSON.stringify({ error: "Unauthorized: invalid token" }), { status: 401, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const serviceClient = createClient( Deno.env.get("SUPABASE_URL")!, Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")! ); const { data: roleRow } = await serviceClient .from("user_roles") .select("role") .eq("user_id", claimsData.claims.sub) .eq("role", "admin") .maybeSingle(); if (!roleRow) { return new Response(JSON.stringify({ error: "Forbidden: admin role required" }), { status: 403, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const { url, type, skip_rows = 0, csv_text } = await req.json(); if (!type || !["addresses", "fiber"].includes(type)) { return new Response(JSON.stringify({ error: "Missing 'type' (addresses|fiber)" }), { status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } log(`πŸš€ Import ${type} β€” skip_rows=${skip_rows}`); let csvStream: ReadableStream; if (csv_text) { // CSV text sent directly from browser (e.g. after client-side ZIP decompression) log(`πŸ“„ Receiving CSV text from client (${(csv_text.length / 1024 / 1024).toFixed(1)} MB)`); const encoded = new TextEncoder().encode(csv_text); csvStream = new ReadableStream({ start(controller) { // Send in 512KB chunks to avoid memory spikes const chunkSize = 512 * 1024; let offset = 0; while (offset < encoded.length) { controller.enqueue(encoded.slice(offset, offset + chunkSize)); offset += chunkSize; } controller.close(); }, }); } else if (url) { // Direct CSV URL (no ZIP support server-side) log(`πŸ“₯ Downloading from URL: ${url.slice(0, 80)}...`); const fetchResponse = await fetch(url); if (!fetchResponse.ok || !fetchResponse.body) { const errMsg = `Failed to fetch URL: ${fetchResponse.status} ${fetchResponse.statusText}`; log(`❌ ${errMsg}`); return new Response(JSON.stringify({ error: errMsg, logs: logLines }), { status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } csvStream = fetchResponse.body; } else { return new Response(JSON.stringify({ error: "Missing 'url' or 'csv_text'" }), { status: 400, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } const streamReader = csvStream.getReader(); const decoder = new TextDecoder("utf-8", { fatal: false }); const startTime = Date.now(); let headers: string[] = []; let delimiter = ","; let leftover = ""; let totalRowsRead = 0; let processedRows = 0; let inserted = 0; let errors = 0; const errorMessages: string[] = []; let pendingRows: any[] = []; let done = false; let stoppedByLimit = false; const flushBatch = async (batch: any[]) => { if (batch.length === 0) return; const table = type === "addresses" ? "addresses" : "fiber_availability"; const onConflict = type === "addresses" ? "identifiant_unique_adresse" : "uuidadresse"; const ignoreDups = type === "fiber"; const { error } = await upsertWithRetry(serviceClient, table, batch, onConflict, ignoreDups); if (error) { const msg = error.message || "Unknown error"; log(`❌ Batch error (${batch.length} rows): ${msg}`); errors += batch.length; if (errorMessages.length < 10) errorMessages.push(msg); } else { inserted += batch.length; } }; while (true) { if (Date.now() - startTime > TIME_LIMIT_MS) { stoppedByLimit = true; break; } if (processedRows >= MAX_ROWS_PER_CALL) { stoppedByLimit = true; break; } const { done: streamDone, value } = await streamReader.read(); const chunk = decoder.decode(value, { stream: !streamDone }); const text = leftover + chunk; const lines = text.split(/\r?\n/); leftover = streamDone ? "" : (lines.pop() || ""); for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; if (headers.length === 0) { const cleanLine = trimmed.charCodeAt(0) === 0xfeff ? trimmed.slice(1) : trimmed; const tabCount = (cleanLine.match(/\t/g) || []).length; const semiCount = (cleanLine.match(/;/g) || []).length; const commaCount = (cleanLine.match(/,/g) || []).length; delimiter = tabCount >= semiCount && tabCount >= commaCount ? "\t" : semiCount >= commaCount ? ";" : ","; const candidateHeaders = parseLine(cleanLine, delimiter); if (candidateHeaders.length < 3) continue; headers = candidateHeaders; log(`πŸ“‹ Headers (${headers.length}): ${headers.slice(0, 5).join(", ")}...`); if (type === "fiber") { const mapped = headers.map(h => { const norm = normalizeHeader(h); const col = FIBER_HEADER_MAP[norm]; return col ? `${h}β†’${col}` : `${h}β†’βŒ`; }); log(`πŸ”— Mapping: ${mapped.join(" | ")}`); } continue; } totalRowsRead++; if (totalRowsRead <= skip_rows) { if (totalRowsRead % 100000 === 0) { log(`⏩ Skipping... ${totalRowsRead.toLocaleString()} / ${skip_rows.toLocaleString()}`); } continue; } const values = parseLine(trimmed, delimiter); const row: Record = {}; if (type === "fiber") { headers.forEach((h, idx) => { const norm = normalizeHeader(h); const dbCol = FIBER_HEADER_MAP[norm]; if (dbCol && dbCol !== "_skip") { row[dbCol] = values[idx] || ""; } }); } else { headers.forEach((h, idx) => { row[h] = values[idx] || ""; }); } const mapped = type === "addresses" ? mapAddressRow(row) : mapFiberRow(row); if (mapped) { pendingRows.push(mapped); processedRows++; } if (pendingRows.length >= BATCH_SIZE) { await flushBatch(pendingRows); pendingRows = []; // Log progress every 10k rows if (processedRows % 10000 === 0) { const elapsed = ((Date.now() - startTime) / 1000).toFixed(0); log(`πŸ“Š Progress: ${processedRows.toLocaleString()} rows processed, ${inserted.toLocaleString()} inserted (${elapsed}s)`); } } if (processedRows >= MAX_ROWS_PER_CALL || Date.now() - startTime > TIME_LIMIT_MS) { stoppedByLimit = true; break; } } if (stoppedByLimit) break; if (streamDone) { done = true; break; } } if (pendingRows.length > 0) { await flushBatch(pendingRows); } if (!done) { try { streamReader.cancel(); } catch {} } const nextSkip = skip_rows + processedRows; const isComplete = done && !stoppedByLimit; const elapsed = ((Date.now() - startTime) / 1000).toFixed(1); if (isComplete) { log(`βœ… Import COMPLETE: ${processedRows.toLocaleString()} rows, ${inserted.toLocaleString()} inserted, ${errors} errors (${elapsed}s)`); } else { log(`⏸️ Chunk done: ${processedRows.toLocaleString()} rows this chunk, next_skip=${nextSkip} (${elapsed}s)`); } return new Response( JSON.stringify({ success: true, type, processed: processedRows, inserted, errors, errorMessages, next_skip: nextSkip, done: isComplete, logs: logLines, }), { headers: { ...corsHeaders, "Content-Type": "application/json" } } ); } catch (err: any) { log(`πŸ’₯ Fatal error: ${err.message}`); console.error("[import-csv-url] Error:", err); return new Response(JSON.stringify({ error: err.message, logs: logLines }), { status: 500, headers: { ...corsHeaders, "Content-Type": "application/json" }, }); } });