~netlandish/links-dev

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch
1

[PATCH links] Moving imports into async task using a special queue that has a larger buffer and more workers to process the base url's efficiently.

Details
Message ID
<20250301012823.10991-1-peter@netlandish.com>
Sender timestamp
1740770841
DKIM signature
missing
Download raw message
Patch: +291 -125
Fixes: https://todo.code.netlandish.com/~netlandish/links/97
Changelog-changed: Pinboard import now uses streaming json decoding to
  avoid loading large files completely into memory.
---
This is a larger patch than I planned. As I kept digging into it more
and more I found other tweaks and issues to work around. Anyway I've
tested this a bit but I need a large pinboard file to really put it to
work.

 accounts/utils.go                           |   4 +-
 api/graph/schema.resolvers.go               |  24 ++-
 cmd/api/main.go                             |   2 +-
 cmd/global.go                               |   2 +-
 cmd/links/main.go                           |   5 +-
 cmd/list/main.go                            |   2 +-
 cmd/server.go                               |  12 +-
 cmd/short/main.go                           |   2 +-
 config.example.ini                          |  11 +-
 core/import.go                              | 170 +++++++++++++-------
 core/inputs.go                              |  57 ++++---
 core/processors.go                          |  71 ++++++++
 core/routes.go                              |  24 ++-
 helpers.go                                  |  10 ++
 models/base_url.go                          |   8 +-
 templates/email_import_complete_html.html   |   5 +
 templates/email_import_complete_subject.txt |   1 +
 templates/email_import_complete_text.txt    |   6 +
 18 files changed, 291 insertions(+), 125 deletions(-)
 create mode 100644 templates/email_import_complete_html.html
 create mode 100644 templates/email_import_complete_subject.txt
 create mode 100644 templates/email_import_complete_text.txt

diff --git a/accounts/utils.go b/accounts/utils.go
index 745313c..c653eec 100644
--- a/accounts/utils.go
+++ b/accounts/utils.go
@@ -29,7 +29,7 @@ func SendVerificationEmail(ctx context.Context, conf *accounts.Confirmation, use
	}
	// NOTE: improve this
	lt := localizer.GetLocalizer("")
	pd := localizer.NewPageData(lt.Translate("Links - Confirm account email"))
	pd := localizer.NewPageData(lt.Translate("LinkTaco - Confirm account email"))
	pd.Data["please_confirm"] = lt.Translate("Please confirm your account")
	pd.Data["to_confirm"] = lt.Translate("To confirm your email address and complete your Links registration, please click the link below.")
	pd.Data["confirm"] = lt.Translate("Confirm")
@@ -88,6 +88,6 @@ func ResendVerificationConf(ctx context.Context, user gobwebs.User) (bool, error

	linkUser := user.(*models.User)
	lt := localizer.GetLocalizer(linkUser.Settings.Account.DefaultLang)
	return true, fmt.Errorf(lt.Translate("We sent you a new confirmation email. Please click the link in that " +
	return true, fmt.Errorf("%s", lt.Translate("We sent you a new confirmation email. Please click the link in that "+
		"email to confirm your account."))
}
diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go
index bd8d6ed..60ca18d 100644
--- a/api/graph/schema.resolvers.go
+++ b/api/graph/schema.resolvers.go
@@ -299,13 +299,8 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput)
		}
	}

	baseURL, err := url.Parse(input.URL)
	if err != nil {
		return nil, fmt.Errorf("%s", lt.Translate("Error parsin url: %s", err))
	}
	baseURL.Fragment = ""
	BaseURL := &models.BaseURL{
		URL: baseURL.String(),
		URL: links.StripURLFragment(input.URL),
	}
	err = BaseURL.Store(ctx)
	if err != nil {
@@ -313,7 +308,9 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput)
	}

	if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) {
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		if visibility == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		}
	}

	userID := int(user.ID)
@@ -472,13 +469,8 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi
				WithCode(valid.ErrValidationCode)
			return nil, nil
		}
		baseURL, err := url.Parse(*input.URL)
		if err != nil {
			return nil, fmt.Errorf("%s", lt.Translate("Error parsin url: %s", err))
		}
		baseURL.Fragment = ""
		BaseURL := &models.BaseURL{
			URL: baseURL.String(),
			URL: links.StripURLFragment(*input.URL),
		}
		err = BaseURL.Store(ctx)
		if err != nil {
@@ -487,7 +479,9 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi

		orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)}
		orgLink.URL = *input.URL
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		}
	}

	if input.Visibility != nil {
@@ -812,7 +806,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput)
		Title:       input.Title,
		OrgID:       org.ID,
		Description: gcore.StripHtmlTags(input.Description),
		BaseURLID:   sql.NullInt64{Valid: BaseURL.ID > 0, Int64: int64(BaseURL.ID)},
		BaseURLID:   sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)},
		Visibility:  string(input.Visibility),
		Starred:     input.Starred,
		URL:         noteURL,
diff --git a/cmd/api/main.go b/cmd/api/main.go
index 85ec98f..c9f0a7b 100644
--- a/cmd/api/main.go
+++ b/cmd/api/main.go
@@ -82,7 +82,7 @@ func run() error {
		return fmt.Errorf("unable to load storage service: %v", err)
	}

	eSize, gSize, err := cmd.LoadWorkerQueueSizes(config)
	eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config)
	if err != nil {
		return err
	}
diff --git a/cmd/global.go b/cmd/global.go
index f788bc5..1b07240 100644
--- a/cmd/global.go
+++ b/cmd/global.go
@@ -47,7 +47,7 @@ func RunMigrations(t *testing.T, db *sql.DB) {
func GetWorkerPoolCount(queue *work.Queue) int {
	var pSize int
	switch queue.Name() {
	case "general":
	case "general", "import":
		pSize = 10
	case "invoice":
		pSize = 3
diff --git a/cmd/links/main.go b/cmd/links/main.go
index c911962..2d70c83 100644
--- a/cmd/links/main.go
+++ b/cmd/links/main.go
@@ -160,7 +160,7 @@ func run() error {
		return fmt.Errorf("Unknown storage service configured")
	}

	eSize, gSize, err := cmd.LoadWorkerQueueSizes(config)
	eSize, gSize, iSize, err := cmd.LoadWorkerQueueSizes(config)
	if err != nil {
		return err
	}
@@ -183,6 +183,7 @@ func run() error {
	sq := email.NewServiceQueue(e.Logger, esvc, eq, &mailChecker)
	wq := work.NewQueue("general", gSize)
	wqi := work.NewQueue("invoice", gSize)
	imq := work.NewQueue("import", iSize)

	mwConf := &server.MiddlewareConfig{
		Sessions:       true,
@@ -216,7 +217,7 @@ func run() error {
		WithStorage(storesvc).
		DefaultMiddlewareWithConfig(mwConf).
		SetWorkerPoolFunc(cmd.GetWorkerPoolCount).
		WithQueues(eq, wq, wqi).
		WithQueues(eq, wq, wqi, imq).
		WithMiddleware(
			database.Middleware(db),
			core.RemoteIPMiddleware,
diff --git a/cmd/list/main.go b/cmd/list/main.go
index dc78a1c..3b43752 100644
--- a/cmd/list/main.go
+++ b/cmd/list/main.go
@@ -59,7 +59,7 @@ func run() error {
		}
	}

	eSize, gSize, err := cmd.LoadWorkerQueueSizes(config)
	eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config)
	if err != nil {
		return err
	}
diff --git a/cmd/server.go b/cmd/server.go
index f64597d..0d416ba 100644
--- a/cmd/server.go
+++ b/cmd/server.go
@@ -208,22 +208,22 @@ func LoadAutoTLS(config *config.Config, db *sql.DB, service string) *autocert.Ma
}

// LoadWorkerQueueSizes ...
func LoadWorkerQueueSizes(config *config.Config) (int, int, error) {
func LoadWorkerQueueSizes(config *config.Config) (int, int, int, error) {
	var (
		eSize, gSize int = 512, 512
		err          error
		eSize, gSize, iSize int = 512, 512, 1024
		err                 error
	)
	if tStr, ok := config.File.Get("links", "email-queue-size"); ok {
		eSize, err = strconv.Atoi(tStr)
		if err != nil {
			return eSize, gSize, fmt.Errorf("links:email-queue-size invalid")
			return eSize, gSize, iSize, fmt.Errorf("links:email-queue-size invalid")
		}
	}
	if tStr, ok := config.File.Get("links", "general-queue-size"); ok {
		gSize, err = strconv.Atoi(tStr)
		if err != nil {
			return eSize, gSize, fmt.Errorf("links:general-queue-size invalid")
			return eSize, gSize, iSize, fmt.Errorf("links:general-queue-size invalid")
		}
	}
	return eSize, gSize, nil
	return eSize, gSize, iSize, nil
}
diff --git a/cmd/short/main.go b/cmd/short/main.go
index 34bf3e2..f9b961f 100644
--- a/cmd/short/main.go
+++ b/cmd/short/main.go
@@ -57,7 +57,7 @@ func run() error {
		}
	}

	eSize, gSize, err := cmd.LoadWorkerQueueSizes(config)
	eSize, gSize, _, err := cmd.LoadWorkerQueueSizes(config)
	if err != nil {
		return err
	}
diff --git a/config.example.ini b/config.example.ini
index 7ed3145..c26190a 100644
--- a/config.example.ini
+++ b/config.example.ini
@@ -92,6 +92,10 @@ ses-secret-key=YOUR_AWS_SECRET_KEY
# Defaults to: ./
root-directory=./

# Value that can be used to set a custom temp directory.
# Defaults to: /tmp
tmp-directory=/tmp

# If storage is s3, set the appropriate variables here
# s3 works with any s3 compatible service (ie, minio, etc.)
endpoint=s3.amazonaws.com
@@ -182,12 +186,15 @@ rate-limit-burst=40
# How long (in minutes) of inactivity does the limit record live for
rate-limit-expire=3

# How many email queue workers. Defaults to 512
# How large is the email queue buffer. Defaults to 512
email-queue-size = 512

# How many general queue workers. Defaults to 512
# How large is the general queue buffer. Defaults to 512
general-queue-size = 512

# How large is the import queue buffer. Defaults to 1024
import-queue-size = 1024

# How many short links can free accounts create per month
short-limit-month = 10

diff --git a/core/import.go b/core/import.go
index ae8e14e..d58cfab 100644
--- a/core/import.go
+++ b/core/import.go
@@ -5,11 +5,12 @@ import (
	"database/sql"
	"encoding/json"
	"fmt"
	"io"
	"links"
	"links/models"
	"mime/multipart"
	"net/url"
	"strings"
	"time"

	sq "github.com/Masterminds/squirrel"
	"github.com/labstack/echo/v4"
@@ -17,6 +18,7 @@ import (
	"golang.org/x/net/html"
	"netlandish.com/x/gobwebs/database"
	"netlandish.com/x/gobwebs/server"
	"netlandish.com/x/gobwebs/storage"
)

// Define import source, used in template and handler
@@ -201,7 +203,7 @@ func (i importAdapter) GetType() int {
}

func processBaseURLs(obj importObj, tmpURLMap map[string]bool, urlList []string, c echo.Context) {
	importedURL, err := url.ParseRequestURI(obj.GetURL())
	importedURL, err := url.Parse(obj.GetURL())
	if err != nil {
		return
	}
@@ -216,25 +218,33 @@ func processBaseURLs(obj importObj, tmpURLMap map[string]bool, urlList []string,
	}
	// Get a list of base url to make a query
	// of existing ones
	urlList = append(urlList, obj.GetURL())
	tmpURLMap[obj.GetURL()] = true
	strippedURL := links.StripURLFragment(obj.GetURL())
	urlList = append(urlList, strippedURL)
	tmpURLMap[strippedURL] = true
}

// Get a importAdapter with a list of pinBoardObj, HTMLObj and FirefoxObj
// and create the base url objects in the db
// returns a map containing the base url as index and its id as value
func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int, error) {
func importBaseURLs(ctx context.Context, objAdapter *importAdapter) (map[string]int, error) {
	urlList := make([]string, 0)
	tmpURLMap := make(map[string]bool) // temporary map to store non existing url
	baseURLMap := make(map[string]int)
	srv := server.ForContext(ctx)

	// TODO: Rethink this process. Total hack for the moment all to use a URL
	// helper
	gctx := &server.Context{
		Server: srv,
	}
	switch objAdapter.GetType() {
	case pinBoardType:
		for _, obj := range objAdapter.GetPinBoards() {
			processBaseURLs(obj, tmpURLMap, urlList, c)
			processBaseURLs(obj, tmpURLMap, urlList, gctx)
		}
	case htmlType:
		for _, obj := range objAdapter.GetHTMLLinks() {
			processBaseURLs(obj, tmpURLMap, urlList, c)
			processBaseURLs(obj, tmpURLMap, urlList, gctx)
		}
	}

@@ -242,7 +252,6 @@ func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int,
	opts := &database.FilterOptions{
		Filter: sq.Eq{"b.url": urlList},
	}
	ctx := c.Request().Context()
	baseURLs, err := models.GetBaseURLs(ctx, opts)
	if err != nil {
		return nil, err
@@ -265,13 +274,14 @@ func importBaseURLs(c echo.Context, objAdapter *importAdapter) (map[string]int,
			return nil, err
		}
		baseURLMap[baseURL.URL] = baseURL.ID
		srv.QueueTask("import", ParseBaseURLTask(srv, baseURL))
	}
	return baseURLMap, nil
}

func processOrgLinks(obj importObj, baseURLMap map[string]int,
	org *models.Organization, user *models.User, billEnabled bool) *models.OrgLink {
	baseID, ok := baseURLMap[obj.GetURL()]
	baseID, ok := baseURLMap[links.StripURLFragment(obj.GetURL())]
	if !ok {
		return nil
	}
@@ -368,65 +378,91 @@ func importOrgLinks(ctx context.Context, objAdapter *importAdapter, baseURLMap m
	return nil
}

func ImportFromPinBoard(c echo.Context, src multipart.File,
func getTmpFileStorage(ctx context.Context) storage.Service {
	srv := server.ForContext(ctx)
	tmpDir, ok := srv.Config.File.Get("storage", "tmp-directory")
	if !ok {
		tmpDir = "/tmp"
	}
	return storage.NewFileSystemService(tmpDir)
}

func ImportFromPinBoard(ctx context.Context, path string,
	org *models.Organization, user *models.User) error {
	var pinBoardList []*pinBoardObj
	err := json.NewDecoder(src).Decode(&pinBoardList)
	fs := getTmpFileStorage(ctx)
	fobj, err := fs.GetObject(ctx, path)
	if err != nil {
		return err
	}

	var listlen, start, end int
	step := 100

	adapter := &importAdapter{
		elementType: pinBoardType,
		start:       start,
		end:         end,
		pinBoards:   pinBoardList,
	dcode := json.NewDecoder(fobj.Content)
	_, err = dcode.Token()
	if err != nil {
		return fmt.Errorf("Error parsing json: %w", err)
	}

	gctx := c.(*server.Context)
	billEnabled := links.BillingEnabled(gctx.Server.Config)
	var totalCount int
	step := 100
	srv := server.ForContext(ctx)
	billEnabled := links.BillingEnabled(srv.Config)

	listlen = len(pinBoardList)
	for start < listlen {
		if end+step > listlen {
			end = listlen
		} else {
			end += step
		}
		adapter.start = start
		adapter.end = end
		baseURLMap, err := importBaseURLs(c, adapter)
		if err != nil {
			return err
	for {
		var pinBoardList []*pinBoardObj
		for dcode.More() {
			var pbObj *pinBoardObj
			err := dcode.Decode(&pbObj)
			if err != nil {
				srv.Logger().Printf("Error decoding json object in pinboard import: %v", err)
				continue
			}
			pinBoardList = append(pinBoardList, pbObj)
			if len(pinBoardList) == step {
				break
			}
		}

		err = importOrgLinks(
			c.Request().Context(),
			adapter,
			baseURLMap,
			org,
			user,
			billEnabled,
		)
		if err != nil {
			return err
		}
		listlen := len(pinBoardList)
		if listlen > 0 {
			adapter := &importAdapter{
				elementType: pinBoardType,
				start:       0,
				end:         listlen,
				pinBoards:   pinBoardList,
			}

		start += step
			totalCount += listlen

			baseURLMap, err := importBaseURLs(ctx, adapter)
			if err != nil {
				return err
			}

			err = importOrgLinks(
				ctx,
				adapter,
				baseURLMap,
				org,
				user,
				billEnabled,
			)
			if err != nil {
				return err
			}
			time.Sleep(3 * time.Second) // Let the parse url workers catch up
		} else {
			break // No more items to process
		}
	}

	if listlen > 0 {
	if totalCount > 0 {
		mdata := make(map[string]any)
		mdata["org_id"] = org.ID
		err := models.RecordAuditLog(
			c.Request().Context(),
			ctx,
			int(user.ID),
			c.RealIP(),
			"internal task",
			models.LOG_BOOKMARK_IMPORTED,
			fmt.Sprintf("Imported %d Pinboard bookmarks into organization %s.", listlen, org.Slug),
			fmt.Sprintf("Imported %d Pinboard bookmarks into organization %s.", totalCount, org.Slug),
			mdata,
		)
		if err != nil {
@@ -434,6 +470,11 @@ func ImportFromPinBoard(c echo.Context, src multipart.File,
		}
	}

	// XXX Pass some sort of flag to call this, in the future
	//err = fs.DeleteObject(ctx, path)
	//if err != nil {
	//    return err
	//}
	return nil
}

@@ -443,7 +484,7 @@ type htmlObjData struct {
}

// Parse import html files. Used for chrome and safari
func getLinksFromHTML(src multipart.File) map[string]htmlObjData {
func getLinksFromHTML(src io.Reader) map[string]htmlObjData {
	tkn := html.NewTokenizer(src)
	vals := make(map[string]htmlObjData)
	var (
@@ -500,9 +541,14 @@ func getLinksFromHTML(src multipart.File) map[string]htmlObjData {
}

// Get an html
func ImportFromHTML(c echo.Context, src multipart.File,
func ImportFromHTML(ctx context.Context, path string,
	org *models.Organization, user *models.User) error {
	linksSrc := getLinksFromHTML(src)
	fs := getTmpFileStorage(ctx)
	fobj, err := fs.GetObject(ctx, path)
	if err != nil {
		return err
	}
	linksSrc := getLinksFromHTML(fobj.Content)
	var htmlList []*htmlObj
	for i, v := range linksSrc {
		l := &htmlObj{
@@ -523,8 +569,8 @@ func ImportFromHTML(c echo.Context, src multipart.File,
		htmlList:    htmlList,
	}

	gctx := c.(*server.Context)
	billEnabled := links.BillingEnabled(gctx.Server.Config)
	srv := server.ForContext(ctx)
	billEnabled := links.BillingEnabled(srv.Config)

	listlen = len(htmlList)
	for start < listlen {
@@ -535,13 +581,13 @@ func ImportFromHTML(c echo.Context, src multipart.File,
		}
		adapter.start = start
		adapter.end = end
		baseURLMap, err := importBaseURLs(c, adapter)
		baseURLMap, err := importBaseURLs(ctx, adapter)
		if err != nil {
			return err
		}

		err = importOrgLinks(
			c.Request().Context(),
			ctx,
			adapter,
			baseURLMap,
			org,
@@ -559,9 +605,9 @@ func ImportFromHTML(c echo.Context, src multipart.File,
		mdata := make(map[string]any)
		mdata["org_id"] = org.ID
		err := models.RecordAuditLog(
			c.Request().Context(),
			ctx,
			int(user.ID),
			c.RealIP(),
			"internal task",
			models.LOG_BOOKMARK_IMPORTED,
			fmt.Sprintf("Imported %d bookmarks into organization %s.", listlen, org.Slug),
			mdata,
@@ -570,5 +616,11 @@ func ImportFromHTML(c echo.Context, src multipart.File,
			return err
		}
	}

	// XXX Pass some sort of flag to call this, in the future
	//err = fs.DeleteObject(ctx, path)
	//if err != nil {
	//    return err
	//}
	return nil
}
diff --git a/core/inputs.go b/core/inputs.go
index 2355d62..6a17cb7 100644
--- a/core/inputs.go
+++ b/core/inputs.go
@@ -4,13 +4,16 @@ import (
	"errors"
	"fmt"
	"links/internal/localizer"
	"mime/multipart"
	"net/http"
	"net/url"
	"path/filepath"
	"strings"
	"time"

	"github.com/labstack/echo/v4"
	"github.com/segmentio/ksuid"
	"netlandish.com/x/gobwebs/server"
	"netlandish.com/x/gobwebs/storage"
	"netlandish.com/x/gobwebs/validate"
)

@@ -162,9 +165,9 @@ func (d *DomainForm) Validate(c echo.Context) error {
}

type ImportForm struct {
	File   string         `form:"file"`
	Origin int            `form:"origin" validate:"oneof=0 1 2 3"`
	src    multipart.File `form:"-"`
	File     string `form:"file"`
	Origin   int    `form:"origin" validate:"oneof=0 1 2 3"`
	FilePath string `form:"-"`
}

func (i *ImportForm) Validate(c echo.Context) error {
@@ -181,33 +184,51 @@ func (i *ImportForm) Validate(c echo.Context) error {
		return err
	}

	f, err := c.FormFile("file")
	lt := localizer.GetSessionLocalizer(c)
	if i.Origin < pinBoardOrigin || i.Origin > firefoxOrigin {
		err = fmt.Errorf("%s", lt.Translate("Invalid origin source for importer."))
		return validate.InputErrors{"Origin": err}
	}

	file, hdr, err := c.Request().FormFile("file")
	if err != nil {
		if errors.Is(err, http.ErrMissingFile) {
			lt := localizer.GetSessionLocalizer(c)
			err = fmt.Errorf(lt.Translate("The file is required"))
			err = fmt.Errorf("%s", lt.Translate("The file is required"))
			return validate.InputErrors{"File": err}
		}
		return err
	}
	src, err := f.Open()
	if err != nil {
		return err
	}
	defer src.Close()
	ext := strings.ToLower(filepath.Ext(f.Filename))
	defer file.Close()

	ext := strings.ToLower(filepath.Ext(hdr.Filename))
	if i.Origin == safariOrigin || i.Origin == chromeOrigin || i.Origin == firefoxOrigin {
		if ext != ".html" {
			lt := localizer.GetSessionLocalizer(c)
			err = fmt.Errorf(lt.Translate("The file submitted for this source should be html"))
			err = fmt.Errorf("%s", lt.Translate("The file submitted for this source should be html"))
			return validate.InputErrors{"File": err}
		}
	} else if i.Origin == pinBoardOrigin && ext != ".json" {
		lt := localizer.GetSessionLocalizer(c)
		err = fmt.Errorf(lt.Translate("The file submitted for this source should be json"))
		err = fmt.Errorf("%s", lt.Translate("The file submitted for this source should be json"))
		return validate.InputErrors{"File": err}
	}

	gctx := c.(*server.Context)
	tmpDir, ok := gctx.Server.Config.File.Get("storage", "tmp-directory")
	if !ok {
		tmpDir = "/tmp"
	}
	path := fmt.Sprintf("import/%d/%s/%s%s",
		gctx.User.GetID(),
		time.Now().UTC().Format("2006-01-02"),
		ksuid.New().String(),
		ext,
	)
	fs := storage.NewFileSystemService(tmpDir)
	err = fs.PutObject(c.Request().Context(), path, file)
	if err != nil {
		c.Logger().Printf("Error writing tmp file for user bookmark import: %v", err)
		return validate.InputErrors{"File": err}
	}
	i.src = src
	i.FilePath = path
	return nil
}

diff --git a/core/processors.go b/core/processors.go
index 954be9b..3f4355c 100644
--- a/core/processors.go
+++ b/core/processors.go
@@ -3,15 +3,86 @@ package core
import (
	"context"
	"links"
	"links/internal/localizer"
	"links/models"

	work "git.sr.ht/~sircmpwn/dowork"
	"github.com/labstack/echo/v4"
	"netlandish.com/x/gobwebs"
	"netlandish.com/x/gobwebs/database"
	"netlandish.com/x/gobwebs/email"
	"netlandish.com/x/gobwebs/server"
	"netlandish.com/x/gobwebs/timezone"
	"netlandish.com/x/gobwebs/validate"
)

// ImportBookmarksTask task to parse html title tags.
func ImportBookmarksTask(c echo.Context, origin int, path string,
	org *models.Organization, user *models.User) *work.Task {
	lt := localizer.GetLocalizer("")
	gctx := c.(*server.Context)
	return work.NewTask(func(ctx context.Context) error {
		ctx = server.ServerContext(ctx, gctx.Server)
		ctx = database.Context(ctx, gctx.Server.DB)
		ctx = timezone.Context(ctx, "UTC")

		var err error
		switch origin {
		case pinBoardOrigin:
			// Run task to import from Pinboard
			err = ImportFromPinBoard(ctx, path, org, user)
		default:
			err = ImportFromHTML(ctx, path, org, user)
		}

		if err != nil {
			return err
		}

		server := server.ForContext(ctx)
		tmpl := server.Echo().Renderer.(*validate.Template).Templates()
		tmap := email.TMap{
			"email_import_complete": []string{
				"email_import_complete_subject.txt",
				"email_import_complete_text.txt",
				"email_import_complete_html.html",
			},
		}
		// NOTE: improve this
		pd := localizer.NewPageData(lt.Translate("LinkTaco - Your bookmark import is complete"))
		pd.Data["hi"] = lt.Translate("Hi there,")
		pd.Data["complete"] = lt.Translate("Just wanted to let you know that your bookmark import has completed successfully!")
		pd.Data["team"] = lt.Translate("- LinkTaco Team")

		helper := email.NewHelper(server.Email, tmap, tmpl)
		err = helper.Send(
			"email_import_complete",
			server.Config.DefaultFromEmail,
			gctx.User.GetEmail(),
			gobwebs.Map{"pd": pd},
		)
		if err != nil {
			server.Logger().Printf("Error sending import complete email: %v", err)
		}
		return nil
	}).Retries(3).Before(func(ctx context.Context, task *work.Task) {
		gobwebs.TaskIDWork(task)
		gctx.Server.Logger().Printf(
			"Running task ImportBookmarksTask %s for the %d attempt.",
			task.Metadata["id"], task.Attempts())
	}).After(func(ctx context.Context, task *work.Task) {
		if task.Result() == nil {
			gctx.Server.Logger().Printf(
				"Completed task ImportBookmarksTask %s after %d attempts.",
				task.Metadata["id"], task.Attempts())
		} else {
			gctx.Server.Logger().Printf(
				"Failed task ImportBookmarksTask %s after %d attempts: %v",
				task.Metadata["id"], task.Attempts(), task.Result())
		}
	})
}

// ParseBaseURLTask task to parse html title tags.
func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task {
	return work.NewTask(func(ctx context.Context) error {
diff --git a/core/routes.go b/core/routes.go
index 8338503..231e35a 100644
--- a/core/routes.go
+++ b/core/routes.go
@@ -3018,6 +3018,12 @@ func (s *Service) ImportData(c echo.Context) error {
		"isFree":     org.IsFreeAccount(),
	}
	if req.Method == http.MethodPost {
		// Set file limit to 200MB for import handler
		err := c.Request().ParseMultipartForm(200 << 20)
		if err != nil {
			return fmt.Errorf("Unable to set file upload limit: %w", err)
		}

		if err := form.Validate(c); err != nil {
			switch err.(type) {
			case validate.InputErrors:
@@ -3028,22 +3034,14 @@ func (s *Service) ImportData(c echo.Context) error {
				return err
			}
		}
		switch form.Origin {
		case pinBoardOrigin:
			err = ImportFromPinBoard(c, form.src, org, user)
		case safariOrigin:
			err = ImportFromHTML(c, form.src, org, user)
		case chromeOrigin:
			err = ImportFromHTML(c, form.src, org, user)
		case firefoxOrigin:
			err = ImportFromHTML(c, form.src, org, user)
		default:
			// Should not be reached since this is validated in the form
			return fmt.Errorf("Invalid origin source for importer")
		}

		err = gctx.Server.QueueTask("import",
			ImportBookmarksTask(c, form.Origin, form.FilePath, org, user))
		if err != nil {
			return err
		}
		messages.Success(c, lt.Translate(
			"Your bookmark import is being processed. We will notify you once it's complete."))
		return c.Redirect(http.StatusMovedPermanently, c.Echo().Reverse("core:org_link_list", org.Slug))

	}
diff --git a/helpers.go b/helpers.go
index 125698a..c917207 100644
--- a/helpers.go
+++ b/helpers.go
@@ -1180,3 +1180,13 @@ func SanitizeUTF8(input string) string {
	}
	return b.String()
}

// StripURLFragment will simply return a URL without any fragment options
func StripURLFragment(furl string) string {
	baseURL, err := url.Parse(furl)
	if err != nil {
		return furl
	}
	baseURL.Fragment = ""
	return baseURL.String()
}
diff --git a/models/base_url.go b/models/base_url.go
index a70ea87..b8a96ef 100644
--- a/models/base_url.go
+++ b/models/base_url.go
@@ -150,14 +150,14 @@ func (b *BaseURL) Store(ctx context.Context) error {
				   ON     CONFLICT (url) DO UPDATE
				   SET    url = NULL
				   WHERE  FALSE
				   RETURNING id, url, created_on
				   RETURNING id, url, public_ready, created_on
				)
				SELECT id, url, created_on FROM ins
				SELECT id, url, public_ready, created_on FROM ins
				UNION  ALL
				SELECT id, url, created_on FROM base_urls
				SELECT id, url, public_ready, created_on FROM base_urls
				WHERE  url = $1
				LIMIT  1;`, b.URL, b.Hash)
			err := row.Scan(&b.ID, &b.URL, &b.CreatedOn)
			err := row.Scan(&b.ID, &b.URL, &b.PublicReady, &b.CreatedOn)
			if err != nil {
				return err
			}
diff --git a/templates/email_import_complete_html.html b/templates/email_import_complete_html.html
new file mode 100644
index 0000000..f0ecc54
--- /dev/null
+++ b/templates/email_import_complete_html.html
@@ -0,0 +1,5 @@
{{template "email_base" .}}
<p style="font-family: sans-serif; font-size: 14px; font-weight: normal; margin: 0; margin-bottom: 15px;">{{.pd.Data.hi}}</p>
<p style="font-family: sans-serif; font-size: 14px; font-weight: normal; margin: 0; margin-bottom: 15px;">{{.pd.Data.complete}}</p>
<p style="font-family: sans-serif; font-size: 14px; font-weight: normal; margin: 0; margin-bottom: 15px;">{{.pd.Data.team}}</p>
{{template "email_base_footer" .}}
diff --git a/templates/email_import_complete_subject.txt b/templates/email_import_complete_subject.txt
new file mode 100644
index 0000000..4d3b16b
--- /dev/null
+++ b/templates/email_import_complete_subject.txt
@@ -0,0 +1 @@
{{.pd.Title}}
diff --git a/templates/email_import_complete_text.txt b/templates/email_import_complete_text.txt
new file mode 100644
index 0000000..7e7ae8d
--- /dev/null
+++ b/templates/email_import_complete_text.txt
@@ -0,0 +1,6 @@
{{.pd.Data.hi}}

{{.pd.Data.complete}}

{{.pd.Data.team}}

-- 
2.47.2
Details
Message ID
<D84MGX25M25D.8HCJB67AIJF7@netlandish.com>
In-Reply-To
<20250301012823.10991-1-peter@netlandish.com> (view parent)
Sender timestamp
1740779447
DKIM signature
missing
Download raw message
Applied.

To git@git.code.netlandish.com:~netlandish/links
   ad6d7d7..63b52bd  master -> master
Reply to thread Export thread (mbox)