Received: from mail.netlandish.com (mail.netlandish.com [174.136.98.166]) by code.netlandish.com (Postfix) with ESMTP id 75A0B141 for <~netlandish/links-dev@lists.code.netlandish.com>; Sat, 01 Mar 2025 01:20:07 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.221.172; helo=mail-vk1-f172.google.com; envelope-from=peter@netlandish.com; receiver= Authentication-Results: mail.netlandish.com; dkim=pass (1024-bit key; unprotected) header.d=netlandish.com header.i=@netlandish.com header.b=SDN7i9T7 Received: from mail-vk1-f172.google.com (mail-vk1-f172.google.com [209.85.221.172]) by mail.netlandish.com (Postfix) with ESMTP id 1B4BD1D80C2 for <~netlandish/links-dev@lists.code.netlandish.com>; Sat, 01 Mar 2025 01:28:31 +0000 (UTC) Received: by mail-vk1-f172.google.com with SMTP id 71dfb90a1353d-520b9dc56afso1191344e0c.1 for <~netlandish/links-dev@lists.code.netlandish.com>; Fri, 28 Feb 2025 17:28:31 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=netlandish.com; s=google; t=1740792510; x=1741397310; darn=lists.code.netlandish.com; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:from:to:cc:subject:date:message-id:reply-to; bh=PnguoSZ7r4+lJV+HbHrcbuNRsoQ9ynwJuspRKEfSnFI=; b=SDN7i9T7T5JQPGcAgwE6vjHsroHsgv53nQOuoABsWvFlN4lokx7oP31yG9cx4+x3jO Qw97pDkAjn/hx/nrxgfxa/hR50pXfWPiZPV/7NPKGl17ddNpupjQ6ZR+cJ+bqleS4MXn t/aNpMrkDN7aEquHEEMOtmX/0xDVyCHFT9MX4= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1740792510; x=1741397310; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:x-gm-message-state:from:to:cc:subject:date:message-id :reply-to; bh=PnguoSZ7r4+lJV+HbHrcbuNRsoQ9ynwJuspRKEfSnFI=; b=iv9Sr6OV9x3ERAWXMgkaflk6a4HaF96+SSb4ddCQ+/wbYNqx/d9wkI1VB/MZORSfJU v06o5XNZPBEfFjtY/4EmQYh1sKPPMkySqgIgfsEj48n9BQC96XkYyYtsGgodDLgaZch2 Pm1DkHQiF/BB/tuMVW4ygeABQ8NWtsrXnswfP0Xow0XlgfVJpINReBBLWsfDYvAq4xUM pPFJEnEDp8x9j9v8xrh2Wn9DY9K4kxI2IXoxDzrBTUDJTfPH3oUOwDi427OgtLjXGdEM yr3JDwl8EK7FmCoQpEr+T4t6fc8gN30wC2IrPselCKuVZAP9Ol0APAZ42X8mPSWOAAtf yAcQ== X-Gm-Message-State: AOJu0YyjMR8Zsnoy9Nmx8L101+CXdugisVX10p6pflsWIVcFcPYvVusv 7jtvfRTmlENoMxa5rr/QAZ/0etYDI/4nl5ZBLZ4nynn8SGsEjKL9QOqAGRS7s9kMSdUc4PdtGSF Dw20= X-Gm-Gg: ASbGncvQfGzUWrTq/1SheDY5i3QYG4qMHc0ZphK9pK06B9Byk+QhCXC+zZmRGJ/2To4 gwM0W/gt6Dh6XWcyBbG/cfp/rZZw4BD5qd7MAUIYxIOwYBTJOd+Sp/qGpfHVZJPIYmezI+Q4Pih mpdGQf76nwdcmBJBDxy/5OSDrEch3e+OjwYhU81+XQsZwehuCguCJ5YedvH8Zjqo6g9TTUg4LqD Gh15BkJi+MVS4pUwN22mk4zbyhSql8wH8tTGhxlDbwrzWu+m1rHW7vODAgpwfS7xK3cZcvDSz2A e+1LCguMqLZKJk3K98B1R6B73B2v6wzA8IBlCelUhOeo4A== X-Google-Smtp-Source: AGHT+IECfyoDkjQv0lyAj5xX/kkE/xw++cLXjLsJMeEFBO0f/67kwK+rYY7YV1kbNp+mj6me1XOXlg== X-Received: by 2002:a05:6102:3309:b0:4b2:5c2a:cc9d with SMTP id ada2fe7eead31-4c044a02b68mr4487279137.16.1740792509898; Fri, 28 Feb 2025 17:28:29 -0800 (PST) Received: from localhost ([2803:2d60:1118:5ee:a6d2:7677:54bb:802d]) by smtp.gmail.com with ESMTPSA id a1e0cc1a2514c-86b3dbed870sm766360241.5.2025.02.28.17.28.27 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 28 Feb 2025 17:28:29 -0800 (PST) From: Peter Sanchez To: ~netlandish/links-dev@lists.code.netlandish.com Cc: Peter Sanchez Subject: [PATCH links] Moving imports into async task using a special queue that has a larger buffer and more workers to process the base url's efficiently. Date: Fri, 28 Feb 2025 19:27:21 -0600 Message-ID: <20250301012823.10991-1-peter@netlandish.com> X-Mailer: git-send-email 2.47.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Fixes: https://todo.code.netlandish.com/~netlandish/links/97 Changelog-changed: Pinboard import now uses streaming json decoding to avoid loading large files completely into memory. --- This is a larger patch than I planned. As I kept digging into it more and more I found other tweaks and issues to work around. Anyway I've tested this a bit but I need a large pinboard file to really put it to work. accounts/utils.go | 4 +- api/graph/schema.resolvers.go | 24 ++- cmd/api/main.go | 2 +- cmd/global.go | 2 +- cmd/links/main.go | 5 +- cmd/list/main.go | 2 +- cmd/server.go | 12 +- cmd/short/main.go | 2 +- config.example.ini | 11 +- core/import.go | 170 +++++++++++++------- core/inputs.go | 57 ++++--- core/processors.go | 71 ++++++++ core/routes.go | 24 ++- helpers.go | 10 ++ models/base_url.go | 8 +- templates/email_import_complete_html.html | 5 + templates/email_import_complete_subject.txt | 1 + templates/email_import_complete_text.txt | 6 + 18 files changed, 291 insertions(+), 125 deletions(-) create mode 100644 templates/email_import_complete_html.html create mode 100644 templates/email_import_complete_subject.txt create mode 100644 templates/email_import_complete_text.txt diff --git a/accounts/utils.go b/accounts/utils.go index 745313c..c653eec 100644 --- a/accounts/utils.go +++ b/accounts/utils.go @@ -29,7 +29,7 @@ func SendVerificationEmail(ctx context.Context, conf *accounts.Confirmation, use } // NOTE: improve this lt := localizer.GetLocalizer("") - pd := localizer.NewPageData(lt.Translate("Links - Confirm account email")) + pd := localizer.NewPageData(lt.Translate("LinkTaco - Confirm account email")) pd.Data["please_confirm"] = lt.Translate("Please confirm your account") pd.Data["to_confirm"] = lt.Translate("To confirm your email address and complete your Links registration, please click the link below.") pd.Data["confirm"] = lt.Translate("Confirm") @@ -88,6 +88,6 @@ func ResendVerificationConf(ctx context.Context, user gobwebs.User) (bool, error linkUser := user.(*models.User) lt := localizer.GetLocalizer(linkUser.Settings.Account.DefaultLang) - return true, fmt.Errorf(lt.Translate("We sent you a new confirmation email. Please click the link in that " + + return true, fmt.Errorf("%s", lt.Translate("We sent you a new confirmation email. Please click the link in that "+ "email to confirm your account.")) } diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index bd8d6ed..60ca18d 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -299,13 +299,8 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) } } - baseURL, err := url.Parse(input.URL) - if err != nil { - return nil, fmt.Errorf("%s", lt.Translate("Error parsin url: %s", err)) - } - baseURL.Fragment = "" BaseURL := &models.BaseURL{ - URL: baseURL.String(), + URL: links.StripURLFragment(input.URL), } err = BaseURL.Store(ctx) if err != nil { @@ -313,7 +308,9 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) } if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + if visibility == models.OrgLinkVisibilityPublic { + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + } } userID := int(user.ID) @@ -472,13 +469,8 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi WithCode(valid.ErrValidationCode) return nil, nil } - baseURL, err := url.Parse(*input.URL) - if err != nil { - return nil, fmt.Errorf("%s", lt.Translate("Error parsin url: %s", err)) - } - baseURL.Fragment = "" BaseURL := &models.BaseURL{ - URL: baseURL.String(), + URL: links.StripURLFragment(*input.URL), } err = BaseURL.Store(ctx) if err != nil { @@ -487,7 +479,9 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)} orgLink.URL = *input.URL - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic { + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + } } if input.Visibility != nil { @@ -812,7 +806,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput) Title: input.Title, OrgID: org.ID, Description: gcore.StripHtmlTags(input.Description), - BaseURLID: sql.NullInt64{Valid: BaseURL.ID > 0, Int64: int64(BaseURL.ID)}, + BaseURLID: sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)}, Visibility: string(input.Visibility), Starred: input.Starred, URL: noteURL, diff --git a/cmd/api/main.go b/cmd/api/main.go index 85ec98f..c9f0a7b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -82,7 +82,7 @@ func run() error { return fmt.Errorf("unable to load storage service: %v", err) } - eSize, gSize, err := cmd.LoadWorkerQueueSizes(config) + eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config) if err != nil { return err } diff --git a/cmd/global.go b/cmd/global.go index f788bc5..1b07240 100644 --- a/cmd/global.go +++ b/cmd/global.go @@ -47,7 +47,7 @@ func RunMigrations(t *testing.T, db *sql.DB) { func GetWorkerPoolCount(queue *work.Queue) int { var pSize int switch queue.Name() { - case "general": + case "general", "import": pSize = 10 case "invoice": pSize = 3 diff --git a/cmd/links/main.go b/cmd/links/main.go index c911962..2d70c83 100644 --- a/cmd/links/main.go +++ b/cmd/links/main.go @@ -160,7 +160,7 @@ func run() error { return fmt.Errorf("Unknown storage service configured") } - eSize, gSize, err := cmd.LoadWorkerQueueSizes(config) + eSize, gSize, iSize, err := cmd.LoadWorkerQueueSizes(config) if err != nil { return err } @@ -183,6 +183,7 @@ func run() error { sq := email.NewServiceQueue(e.Logger, esvc, eq, &mailChecker) wq := work.NewQueue("general", gSize) wqi := work.NewQueue("invoice", gSize) + imq := work.NewQueue("import", iSize) mwConf := &server.MiddlewareConfig{ Sessions: true, @@ -216,7 +217,7 @@ func run() error { WithStorage(storesvc). DefaultMiddlewareWithConfig(mwConf). SetWorkerPoolFunc(cmd.GetWorkerPoolCount). - WithQueues(eq, wq, wqi). + WithQueues(eq, wq, wqi, imq). WithMiddleware( database.Middleware(db), core.RemoteIPMiddleware, diff --git a/cmd/list/main.go b/cmd/list/main.go index dc78a1c..3b43752 100644 --- a/cmd/list/main.go +++ b/cmd/list/main.go @@ -59,7 +59,7 @@ func run() error { } } - eSize, gSize, err := cmd.LoadWorkerQueueSizes(config) + eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config) if err != nil { return err } diff --git a/cmd/server.go b/cmd/server.go index f64597d..0d416ba 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -208,22 +208,22 @@ func LoadAutoTLS(config *config.Config, db *sql.DB, service string) *autocert.Ma } // LoadWorkerQueueSizes ... -func LoadWorkerQueueSizes(config *config.Config) (int, int, error) { +func LoadWorkerQueueSizes(config *config.Config) (int, int, int, error) { var ( - eSize, gSize int = 512, 512 - err error + eSize, gSize, iSize int = 512, 512, 1024 + err error ) if tStr, ok := config.File.Get("links", "email-queue-size"); ok { eSize, err = strconv.Atoi(tStr) if err != nil { - return eSize, gSize, fmt.Errorf("links:email-queue-size invalid") + return eSize, gSize, iSize, fmt.Errorf("links:email-queue-size invalid") } } if tStr, ok := config.File.Get("links", "general-queue-size"); ok { gSize, err = strconv.Atoi(tStr) if err != nil { - return eSize, gSize, fmt.Errorf("links:general-queue-size invalid") + return eSize, gSize, iSize, fmt.Errorf("links:general-queue-size invalid") } } - return eSize, gSize, nil + return eSize, gSize, iSize, nil } diff --git a/cmd/short/main.go b/cmd/short/main.go index 34bf3e2..f9b961f 100644 --- a/cmd/short/main.go +++ b/cmd/short/main.go @@ -57,7 +57,7 @@ func run() error { } } - eSize, gSize, err := cmd.LoadWorkerQueueSizes(config) + eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config) if err != nil { return err } diff --git a/config.example.ini b/config.example.ini index 7ed3145..c26190a 100644 --- a/config.example.ini +++ b/config.example.ini @@ -92,6 +92,10 @@ ses-secret-key=YOUR_AWS_SECRET_KEY # Defaults to: ./ root-directory=./ +# Value that can be used to set a custom temp directory. +# Defaults to: /tmp +tmp-directory=/tmp + # If storage is s3, set the appropriate variables here # s3 works with any s3 compatible service (ie, minio, etc.) endpoint=s3.amazonaws.com @@ -182,12 +186,15 @@ rate-limit-burst=40 # How long (in minutes) of inactivity does the limit record live for rate-limit-expire=3 -# How many email queue workers. Defaults to 512 +# How large is the email queue buffer. Defaults to 512 email-queue-size = 512 -# How many general queue workers. Defaults to 512 +# How large is the general queue buffer. Defaults to 512 general-queue-size = 512 +# How large is the import queue buffer. Defaults to 1024 +import-queue-size = 1024 + # How many short links can free accounts create per month short-limit-month = 10 diff --git a/core/import.go b/core/import.go index ae8e14e..d58cfab 100644 --- a/core/import.go +++ b/core/import.go @@ -5,11 +5,12 @@ import ( "database/sql" "encoding/json" "fmt" + "io" "links" "links/models" - "mime/multipart" "net/url" "strings" + "time" sq "github.com/Masterminds/squirrel" "github.com/labstack/echo/v4" @@ -17,6 +18,7 @@ import ( "golang.org/x/net/html" "netlandish.com/x/gobwebs/database" "netlandish.com/x/gobwebs/server" + "netlandish.com/x/gobwebs/storage" ) // Define import source, used in template and handler @@ -201,7 +203,7 @@ func (i importAdapter) GetType() int { } func processBaseURLs(obj importObj, tmpURLMap map[string]bool, urlList []string, c echo.Context) { - importedURL, err := url.ParseRequestURI(obj.GetURL()) + importedURL, err := url.Parse(obj.GetURL()) if err != nil { return } @@ -216,25 +218,33 @@ func processBaseURLs(obj importObj, tmpURLMap map[string]bool, urlList []string, } // Get a list of base url to make a query // of existing ones - urlList = append(urlList, obj.GetURL()) - tmpURLMap[obj.GetURL()] = true + strippedURL := links.StripURLFragment(obj.GetURL()) + urlList = append(urlList, strippedURL) + tmpURLMap[strippedURL] = true } // Get a importAdapter with a list of pinBoardObj, HTMLObj and FirefoxObj // and create the base url objects in the db // returns a map containing the base url as index and its id as value -func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int, error) { +func importBaseURLs(ctx context.Context, objAdapter *importAdapter) (map[string]int, error) { urlList := make([]string, 0) tmpURLMap := make(map[string]bool) // temporary map to store non existing url baseURLMap := make(map[string]int) + srv := server.ForContext(ctx) + + // TODO: Rethink this process. Total hack for the moment all to use a URL + // helper + gctx := &server.Context{ + Server: srv, + } switch objAdapter.GetType() { case pinBoardType: for _, obj := range objAdapter.GetPinBoards() { - processBaseURLs(obj, tmpURLMap, urlList, c) + processBaseURLs(obj, tmpURLMap, urlList, gctx) } case htmlType: for _, obj := range objAdapter.GetHTMLLinks() { - processBaseURLs(obj, tmpURLMap, urlList, c) + processBaseURLs(obj, tmpURLMap, urlList, gctx) } } @@ -242,7 +252,6 @@ func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int, opts := &database.FilterOptions{ Filter: sq.Eq{"b.url": urlList}, } - ctx := c.Request().Context() baseURLs, err := models.GetBaseURLs(ctx, opts) if err != nil { return nil, err @@ -265,13 +274,14 @@ func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int, return nil, err } baseURLMap[baseURL.URL] = baseURL.ID + srv.QueueTask("import", ParseBaseURLTask(srv, baseURL)) } return baseURLMap, nil } func processOrgLinks(obj importObj, baseURLMap map[string]int, org *models.Organization, user *models.User, billEnabled bool) *models.OrgLink { - baseID, ok := baseURLMap[obj.GetURL()] + baseID, ok := baseURLMap[links.StripURLFragment(obj.GetURL())] if !ok { return nil } @@ -368,65 +378,91 @@ func importOrgLinks(ctx context.Context, objAdapter *importAdapter, baseURLMap m return nil } -func ImportFromPinBoard(c echo.Context, src multipart.File, +func getTmpFileStorage(ctx context.Context) storage.Service { + srv := server.ForContext(ctx) + tmpDir, ok := srv.Config.File.Get("storage", "tmp-directory") + if !ok { + tmpDir = "/tmp" + } + return storage.NewFileSystemService(tmpDir) +} + +func ImportFromPinBoard(ctx context.Context, path string, org *models.Organization, user *models.User) error { - var pinBoardList []*pinBoardObj - err := json.NewDecoder(src).Decode(&pinBoardList) + fs := getTmpFileStorage(ctx) + fobj, err := fs.GetObject(ctx, path) if err != nil { return err } - var listlen, start, end int - step := 100 - - adapter := &importAdapter{ - elementType: pinBoardType, - start: start, - end: end, - pinBoards: pinBoardList, + dcode := json.NewDecoder(fobj.Content) + _, err = dcode.Token() + if err != nil { + return fmt.Errorf("Error parsing json: %w", err) } - gctx := c.(*server.Context) - billEnabled := links.BillingEnabled(gctx.Server.Config) + var totalCount int + step := 100 + srv := server.ForContext(ctx) + billEnabled := links.BillingEnabled(srv.Config) - listlen = len(pinBoardList) - for start < listlen { - if end+step > listlen { - end = listlen - } else { - end += step - } - adapter.start = start - adapter.end = end - baseURLMap, err := importBaseURLs(c, adapter) - if err != nil { - return err + for { + var pinBoardList []*pinBoardObj + for dcode.More() { + var pbObj *pinBoardObj + err := dcode.Decode(&pbObj) + if err != nil { + srv.Logger().Printf("Error decoding json object in pinboard import: %v", err) + continue + } + pinBoardList = append(pinBoardList, pbObj) + if len(pinBoardList) == step { + break + } } - err = importOrgLinks( - c.Request().Context(), - adapter, - baseURLMap, - org, - user, - billEnabled, - ) - if err != nil { - return err - } + listlen := len(pinBoardList) + if listlen > 0 { + adapter := &importAdapter{ + elementType: pinBoardType, + start: 0, + end: listlen, + pinBoards: pinBoardList, + } - start += step + totalCount += listlen + + baseURLMap, err := importBaseURLs(ctx, adapter) + if err != nil { + return err + } + + err = importOrgLinks( + ctx, + adapter, + baseURLMap, + org, + user, + billEnabled, + ) + if err != nil { + return err + } + time.Sleep(3 * time.Second) // Let the parse url workers catch up + } else { + break // No more items to process + } } - if listlen > 0 { + if totalCount > 0 { mdata := make(map[string]any) mdata["org_id"] = org.ID err := models.RecordAuditLog( - c.Request().Context(), + ctx, int(user.ID), - c.RealIP(), + "internal task", models.LOG_BOOKMARK_IMPORTED, - fmt.Sprintf("Imported %d Pinboard bookmarks into organization %s.", listlen, org.Slug), + fmt.Sprintf("Imported %d Pinboard bookmarks into organization %s.", totalCount, org.Slug), mdata, ) if err != nil { @@ -434,6 +470,11 @@ func ImportFromPinBoard(c echo.Context, src multipart.File, } } + // XXX Pass some sort of flag to call this, in the future + //err = fs.DeleteObject(ctx, path) + //if err != nil { + // return err + //} return nil } @@ -443,7 +484,7 @@ type htmlObjData struct { } // Parse import html files. Used for chrome and safari -func getLinksFromHTML(src multipart.File) map[string]htmlObjData { +func getLinksFromHTML(src io.Reader) map[string]htmlObjData { tkn := html.NewTokenizer(src) vals := make(map[string]htmlObjData) var ( @@ -500,9 +541,14 @@ func getLinksFromHTML(src multipart.File) map[string]htmlObjData { } // Get an html -func ImportFromHTML(c echo.Context, src multipart.File, +func ImportFromHTML(ctx context.Context, path string, org *models.Organization, user *models.User) error { - linksSrc := getLinksFromHTML(src) + fs := getTmpFileStorage(ctx) + fobj, err := fs.GetObject(ctx, path) + if err != nil { + return err + } + linksSrc := getLinksFromHTML(fobj.Content) var htmlList []*htmlObj for i, v := range linksSrc { l := &htmlObj{ @@ -523,8 +569,8 @@ func ImportFromHTML(c echo.Context, src multipart.File, htmlList: htmlList, } - gctx := c.(*server.Context) - billEnabled := links.BillingEnabled(gctx.Server.Config) + srv := server.ForContext(ctx) + billEnabled := links.BillingEnabled(srv.Config) listlen = len(htmlList) for start < listlen { @@ -535,13 +581,13 @@ func ImportFromHTML(c echo.Context, src multipart.File, } adapter.start = start adapter.end = end - baseURLMap, err := importBaseURLs(c, adapter) + baseURLMap, err := importBaseURLs(ctx, adapter) if err != nil { return err } err = importOrgLinks( - c.Request().Context(), + ctx, adapter, baseURLMap, org, @@ -559,9 +605,9 @@ func ImportFromHTML(c echo.Context, src multipart.File, mdata := make(map[string]any) mdata["org_id"] = org.ID err := models.RecordAuditLog( - c.Request().Context(), + ctx, int(user.ID), - c.RealIP(), + "internal task", models.LOG_BOOKMARK_IMPORTED, fmt.Sprintf("Imported %d bookmarks into organization %s.", listlen, org.Slug), mdata, @@ -570,5 +616,11 @@ func ImportFromHTML(c echo.Context, src multipart.File, return err } } + + // XXX Pass some sort of flag to call this, in the future + //err = fs.DeleteObject(ctx, path) + //if err != nil { + // return err + //} return nil } diff --git a/core/inputs.go b/core/inputs.go index 2355d62..6a17cb7 100644 --- a/core/inputs.go +++ b/core/inputs.go @@ -4,13 +4,16 @@ import ( "errors" "fmt" "links/internal/localizer" - "mime/multipart" "net/http" "net/url" "path/filepath" "strings" + "time" "github.com/labstack/echo/v4" + "github.com/segmentio/ksuid" + "netlandish.com/x/gobwebs/server" + "netlandish.com/x/gobwebs/storage" "netlandish.com/x/gobwebs/validate" ) @@ -162,9 +165,9 @@ func (d *DomainForm) Validate(c echo.Context) error { } type ImportForm struct { - File string `form:"file"` - Origin int `form:"origin" validate:"oneof=0 1 2 3"` - src multipart.File `form:"-"` + File string `form:"file"` + Origin int `form:"origin" validate:"oneof=0 1 2 3"` + FilePath string `form:"-"` } func (i *ImportForm) Validate(c echo.Context) error { @@ -181,33 +184,51 @@ func (i *ImportForm) Validate(c echo.Context) error { return err } - f, err := c.FormFile("file") + lt := localizer.GetSessionLocalizer(c) + if i.Origin < pinBoardOrigin || i.Origin > firefoxOrigin { + err = fmt.Errorf("%s", lt.Translate("Invalid origin source for importer.")) + return validate.InputErrors{"Origin": err} + } + + file, hdr, err := c.Request().FormFile("file") if err != nil { if errors.Is(err, http.ErrMissingFile) { - lt := localizer.GetSessionLocalizer(c) - err = fmt.Errorf(lt.Translate("The file is required")) + err = fmt.Errorf("%s", lt.Translate("The file is required")) return validate.InputErrors{"File": err} } return err } - src, err := f.Open() - if err != nil { - return err - } - defer src.Close() - ext := strings.ToLower(filepath.Ext(f.Filename)) + defer file.Close() + + ext := strings.ToLower(filepath.Ext(hdr.Filename)) if i.Origin == safariOrigin || i.Origin == chromeOrigin || i.Origin == firefoxOrigin { if ext != ".html" { - lt := localizer.GetSessionLocalizer(c) - err = fmt.Errorf(lt.Translate("The file submitted for this source should be html")) + err = fmt.Errorf("%s", lt.Translate("The file submitted for this source should be html")) return validate.InputErrors{"File": err} } } else if i.Origin == pinBoardOrigin && ext != ".json" { - lt := localizer.GetSessionLocalizer(c) - err = fmt.Errorf(lt.Translate("The file submitted for this source should be json")) + err = fmt.Errorf("%s", lt.Translate("The file submitted for this source should be json")) + return validate.InputErrors{"File": err} + } + + gctx := c.(*server.Context) + tmpDir, ok := gctx.Server.Config.File.Get("storage", "tmp-directory") + if !ok { + tmpDir = "/tmp" + } + path := fmt.Sprintf("import/%d/%s/%s%s", + gctx.User.GetID(), + time.Now().UTC().Format("2006-01-02"), + ksuid.New().String(), + ext, + ) + fs := storage.NewFileSystemService(tmpDir) + err = fs.PutObject(c.Request().Context(), path, file) + if err != nil { + c.Logger().Printf("Error writing tmp file for user bookmark import: %v", err) return validate.InputErrors{"File": err} } - i.src = src + i.FilePath = path return nil } diff --git a/core/processors.go b/core/processors.go index 954be9b..3f4355c 100644 --- a/core/processors.go +++ b/core/processors.go @@ -3,15 +3,86 @@ package core import ( "context" "links" + "links/internal/localizer" "links/models" work "git.sr.ht/~sircmpwn/dowork" + "github.com/labstack/echo/v4" "netlandish.com/x/gobwebs" "netlandish.com/x/gobwebs/database" + "netlandish.com/x/gobwebs/email" "netlandish.com/x/gobwebs/server" "netlandish.com/x/gobwebs/timezone" + "netlandish.com/x/gobwebs/validate" ) +// ImportBookmarksTask task to parse html title tags. +func ImportBookmarksTask(c echo.Context, origin int, path string, + org *models.Organization, user *models.User) *work.Task { + lt := localizer.GetLocalizer("") + gctx := c.(*server.Context) + return work.NewTask(func(ctx context.Context) error { + ctx = server.ServerContext(ctx, gctx.Server) + ctx = database.Context(ctx, gctx.Server.DB) + ctx = timezone.Context(ctx, "UTC") + + var err error + switch origin { + case pinBoardOrigin: + // Run task to import from Pinboard + err = ImportFromPinBoard(ctx, path, org, user) + default: + err = ImportFromHTML(ctx, path, org, user) + } + + if err != nil { + return err + } + + server := server.ForContext(ctx) + tmpl := server.Echo().Renderer.(*validate.Template).Templates() + tmap := email.TMap{ + "email_import_complete": []string{ + "email_import_complete_subject.txt", + "email_import_complete_text.txt", + "email_import_complete_html.html", + }, + } + // NOTE: improve this + pd := localizer.NewPageData(lt.Translate("LinkTaco - Your bookmark import is complete")) + pd.Data["hi"] = lt.Translate("Hi there,") + pd.Data["complete"] = lt.Translate("Just wanted to let you know that your bookmark import has completed successfully!") + pd.Data["team"] = lt.Translate("- LinkTaco Team") + + helper := email.NewHelper(server.Email, tmap, tmpl) + err = helper.Send( + "email_import_complete", + server.Config.DefaultFromEmail, + gctx.User.GetEmail(), + gobwebs.Map{"pd": pd}, + ) + if err != nil { + server.Logger().Printf("Error sending import complete email: %v", err) + } + return nil + }).Retries(3).Before(func(ctx context.Context, task *work.Task) { + gobwebs.TaskIDWork(task) + gctx.Server.Logger().Printf( + "Running task ImportBookmarksTask %s for the %d attempt.", + task.Metadata["id"], task.Attempts()) + }).After(func(ctx context.Context, task *work.Task) { + if task.Result() == nil { + gctx.Server.Logger().Printf( + "Completed task ImportBookmarksTask %s after %d attempts.", + task.Metadata["id"], task.Attempts()) + } else { + gctx.Server.Logger().Printf( + "Failed task ImportBookmarksTask %s after %d attempts: %v", + task.Metadata["id"], task.Attempts(), task.Result()) + } + }) +} + // ParseBaseURLTask task to parse html title tags. func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { return work.NewTask(func(ctx context.Context) error { diff --git a/core/routes.go b/core/routes.go index 8338503..231e35a 100644 --- a/core/routes.go +++ b/core/routes.go @@ -3018,6 +3018,12 @@ func (s *Service) ImportData(c echo.Context) error { "isFree": org.IsFreeAccount(), } if req.Method == http.MethodPost { + // Set file limit to 200MB for import handler + err := c.Request().ParseMultipartForm(200 << 20) + if err != nil { + return fmt.Errorf("Unable to set file upload limit: %w", err) + } + if err := form.Validate(c); err != nil { switch err.(type) { case validate.InputErrors: @@ -3028,22 +3034,14 @@ func (s *Service) ImportData(c echo.Context) error { return err } } - switch form.Origin { - case pinBoardOrigin: - err = ImportFromPinBoard(c, form.src, org, user) - case safariOrigin: - err = ImportFromHTML(c, form.src, org, user) - case chromeOrigin: - err = ImportFromHTML(c, form.src, org, user) - case firefoxOrigin: - err = ImportFromHTML(c, form.src, org, user) - default: - // Should not be reached since this is validated in the form - return fmt.Errorf("Invalid origin source for importer") - } + + err = gctx.Server.QueueTask("import", + ImportBookmarksTask(c, form.Origin, form.FilePath, org, user)) if err != nil { return err } + messages.Success(c, lt.Translate( + "Your bookmark import is being processed. We will notify you once it's complete.")) return c.Redirect(http.StatusMovedPermanently, c.Echo().Reverse("core:org_link_list", org.Slug)) } diff --git a/helpers.go b/helpers.go index 125698a..c917207 100644 --- a/helpers.go +++ b/helpers.go @@ -1180,3 +1180,13 @@ func SanitizeUTF8(input string) string { } return b.String() } + +// StripURLFragment will simply return a URL without any fragment options +func StripURLFragment(furl string) string { + baseURL, err := url.Parse(furl) + if err != nil { + return furl + } + baseURL.Fragment = "" + return baseURL.String() +} diff --git a/models/base_url.go b/models/base_url.go index a70ea87..b8a96ef 100644 --- a/models/base_url.go +++ b/models/base_url.go @@ -150,14 +150,14 @@ func (b *BaseURL) Store(ctx context.Context) error { ON CONFLICT (url) DO UPDATE SET url = NULL WHERE FALSE - RETURNING id, url, created_on + RETURNING id, url, public_ready, created_on ) - SELECT id, url, created_on FROM ins + SELECT id, url, public_ready, created_on FROM ins UNION ALL - SELECT id, url, created_on FROM base_urls + SELECT id, url, public_ready, created_on FROM base_urls WHERE url = $1 LIMIT 1;`, b.URL, b.Hash) - err := row.Scan(&b.ID, &b.URL, &b.CreatedOn) + err := row.Scan(&b.ID, &b.URL, &b.PublicReady, &b.CreatedOn) if err != nil { return err } diff --git a/templates/email_import_complete_html.html b/templates/email_import_complete_html.html new file mode 100644 index 0000000..f0ecc54 --- /dev/null +++ b/templates/email_import_complete_html.html @@ -0,0 +1,5 @@ +{{template "email_base" .}} +

{{.pd.Data.hi}}

+

{{.pd.Data.complete}}

+

{{.pd.Data.team}}

+{{template "email_base_footer" .}} diff --git a/templates/email_import_complete_subject.txt b/templates/email_import_complete_subject.txt new file mode 100644 index 0000000..4d3b16b --- /dev/null +++ b/templates/email_import_complete_subject.txt @@ -0,0 +1 @@ +{{.pd.Title}} diff --git a/templates/email_import_complete_text.txt b/templates/email_import_complete_text.txt new file mode 100644 index 0000000..7e7ae8d --- /dev/null +++ b/templates/email_import_complete_text.txt @@ -0,0 +1,6 @@ +{{.pd.Data.hi}} + +{{.pd.Data.complete}} + +{{.pd.Data.team}} + -- 2.47.2