Received: from mail.netlandish.com (mail.netlandish.com [174.136.98.166]) by code.netlandish.com (Postfix) with ESMTP id 17E8537 for <~netlandish/links-dev@lists.code.netlandish.com>; Fri, 07 Mar 2025 01:28:30 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.219.179; helo=mail-yb1-f179.google.com; envelope-from=peter@netlandish.com; receiver= Authentication-Results: mail.netlandish.com; dkim=pass (1024-bit key; unprotected) header.d=netlandish.com header.i=@netlandish.com header.b=D498DJoe Received: from mail-yb1-f179.google.com (mail-yb1-f179.google.com [209.85.219.179]) by mail.netlandish.com (Postfix) with ESMTP id 7E09A1D643F for <~netlandish/links-dev@lists.code.netlandish.com>; Fri, 07 Mar 2025 01:28:27 +0000 (UTC) Received: by mail-yb1-f179.google.com with SMTP id 3f1490d57ef6-e60b75f8723so956052276.0 for <~netlandish/links-dev@lists.code.netlandish.com>; Thu, 06 Mar 2025 17:28:27 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=netlandish.com; s=google; t=1741310906; x=1741915706; darn=lists.code.netlandish.com; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:from:to:cc:subject:date:message-id:reply-to; bh=OznaMlEuUI7pFQACKxX+nTTKR6P65hw1E6+SNtws1tE=; b=D498DJoe5Jwc+1Nny8XdUDWdNPinAwlrS8iDBj86N6sBT/uAMnjvRJTpYITF18Lv8q J5v1qBWI1x8+Bzu/sqJdevqsWbk4xzt4DZbhiauIj799AL9KpsEbqrVIO9ml2lLlvKWm jr/me873CFcIrjjrt4DEK9CjyGVsDt5dDxMnM= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20230601; t=1741310906; x=1741915706; h=content-transfer-encoding:mime-version:message-id:date:subject:cc :to:from:x-gm-message-state:from:to:cc:subject:date:message-id :reply-to; bh=OznaMlEuUI7pFQACKxX+nTTKR6P65hw1E6+SNtws1tE=; b=oNsPRFZlHLeZEje77Go4/bofi2P6FmX4uMlY1kzNyhi/HSMXktl3XZkKgxqwMBkPZX QXNCRINLcRfFlWNtwBdfB8H6jM9r6bdkP5jaa9bALDBUVf4JI/zztmMhQti8xLfzanew zE+fqydxcctfinhnjyolZ5nF1GHKuCdMW5f/FjaERa621wRJFv7z9SQgdw7rGvqrBDN1 zpchfkxcxoeiSlcsMK1kvCX3J85kkBYitTd3Ouw+zm5bYcPFSS0k6fDeAhDrSzk9uQeL wG5GonXWip6wgSIXm7E/aNavla/kmv8fbOSdMqfAhs2U7tbKV9mK2KQOaj8GNCpiEQu1 hqRg== X-Gm-Message-State: AOJu0YxXEKX7Gbi21RPTazaG7RI4JKg6Ny8AtVV6+wMlIxutWs9KDYDS ZSofzNNAzwnmgDeSAXmFKPQR0+VGIsd7+Z8qNjPctMVsyHQG2M+DoCeV9yyrdTEpAlPih6hlb2B KwIc= X-Gm-Gg: ASbGnctOAVCHkCsAfFs6fiTI5etxx2iHQFcpYbAoX4KC0OWUjOwliRhHAPCu1OvQmJi WmDwh7W0+qeghZYIiFnkROuXRuGJyBjoNtUMno4j9msAS+VaLp4r5SOjpTU3U1SuFJG4yaZBeom 91pNYN4/aKELRgcuLjTV7HP73e5WPmWOAir0gGRwHQ4wN//KE2ptuTKjA4Nx7nKRgxhhrRwcc2S 8YQSouuhg6YTVkNAZKdnvnq+baOx2qUS27+3gQAvJAopKVHqUZvSN9sTuvcEPOevOKguoIltos7 r1Au7aIfhmU0POKa4o/o2Tu6PskvU7NkQlQ9WrS2zwZw X-Google-Smtp-Source: AGHT+IH9zzE+HAEzbtdIKYbNbppsB3uOk5Z7u9xNGXYd/RBLtIKQevjpu1Bxmr73Wmv71p+WZCvvHw== X-Received: by 2002:a05:6902:a84:b0:e5b:34b8:5d5e with SMTP id 3f1490d57ef6-e635c175991mr2276863276.26.1741310906406; Thu, 06 Mar 2025 17:28:26 -0800 (PST) Received: from localhost ([2803:2d60:1118:5ee:d6b8:3463:4fe8:293]) by smtp.gmail.com with ESMTPSA id 3f1490d57ef6-e634b85646bsm591942276.33.2025.03.06.17.28.24 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 06 Mar 2025 17:28:25 -0800 (PST) From: Peter Sanchez To: ~netlandish/links-dev@lists.code.netlandish.com Cc: Peter Sanchez Subject: [PATCH links] Adding a simple go routine to process BaseURL metadata if they have previously failed. Date: Thu, 6 Mar 2025 19:27:03 -0600 Message-ID: <20250307012821.11139-1-peter@netlandish.com> X-Mailer: git-send-email 2.47.2 MIME-Version: 1.0 Content-Transfer-Encoding: 8bit This will allow 1 day between attempts and a maximum of 3 tries before giving up on parsing that particular URL. Changelog-changed: Failed metadata collection on base url's will be attempted a max of 3 times in 1 day intervals. --- The fix for removing the base url parsing on bulk imports. This allows bulk imported base urls to be parsed (in time). It will only parse groups of 2K every 30 minutes and limit the worker pool to 5 while parsing. Simple limitations that could/should be tweaked with more usage and data. api/graph/schema.resolvers.go | 8 +- cmd/links/main.go | 5 + cmd/links/parse.go | 94 +++++++++++++++++++ cmd/migrations.go | 7 ++ core/import.go | 1 - core/processors.go | 5 +- helpers.go | 30 +++++- .../0004_add_parse_fields_baseurls.down.sql | 2 + .../0004_add_parse_fields_baseurls.up.sql | 2 + models/base_url.go | 23 ++++- models/models.go | 20 ++-- models/schema.sql | 2 + 12 files changed, 177 insertions(+), 22 deletions(-) create mode 100644 cmd/links/parse.go create mode 100644 migrations/0004_add_parse_fields_baseurls.down.sql create mode 100644 migrations/0004_add_parse_fields_baseurls.up.sql diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index 7bdc796..c708a12 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -509,7 +509,7 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) { if visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -680,7 +680,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)} orgLink.URL = *input.URL if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -706,7 +706,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi if err != nil { return nil, err } - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } orgLink.Visibility = string(*input.Visibility) } @@ -1022,7 +1022,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput) // We process the based link metadata after saving the current note if OrgLinkNote.BaseURLID.Valid && OrgLinkNote.Visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } if len(tags) > 0 { diff --git a/cmd/links/main.go b/cmd/links/main.go index 2d70c83..4372a40 100644 --- a/cmd/links/main.go +++ b/cmd/links/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/url" @@ -381,6 +382,10 @@ func run() error { e.Reverse("ses-feedback:endpoint"), ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go parseBaseURLs(ctx, srv) srv.Run() return nil diff --git a/cmd/links/parse.go b/cmd/links/parse.go new file mode 100644 index 0000000..0cbe8a4 --- /dev/null +++ b/cmd/links/parse.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "fmt" + "links/core" + "links/models" + "time" + + sq "github.com/Masterminds/squirrel" + "netlandish.com/x/gobwebs/database" + "netlandish.com/x/gobwebs/server" + "netlandish.com/x/gobwebs/timezone" +) + +func runParse(ctx context.Context, srv *server.Server) error { + sig := make(chan int, 5) + dayAgo := time.Now().AddDate(0, 0, -1).UTC() + opts := &database.FilterOptions{ + Filter: sq.And{ + sq.Eq{"b.public_ready": false}, + sq.Lt{"b.parse_attempts": 3}, + sq.Gt{"b.counter": 0}, // Should only be > 0 if there are public links + sq.Or{ + sq.Eq{"b.last_parse_attempt": nil}, + sq.LtOrEq{"b.last_parse_attempt": dayAgo}, + }, + }, + OrderBy: "id", // oldest first + Limit: 2000, // TODO: Make this random or configurable? + } + + burls, err := models.GetBaseURLs(ctx, opts) + if err != nil { + return err + } + + parseCnt := 0 + for _, burl := range burls { + srv.QueueTask("import", core.ParseBaseURLTask(srv, burl, sig)) + parseCnt++ + if parseCnt == 5 { + loop: + // Loop until at least one finishes or the context is canceled + for { + select { + case <-ctx.Done(): + srv.Logger().Printf("runParse: Context canceled.") + return nil + case <-sig: + parseCnt-- + break loop + } + } + } + } + srv.Logger().Printf("runParse: Finished run") + return nil +} + +// This is a helper function to process base URL's that had errors +// when fetching them. +func parseBaseURLs(ctx context.Context, srv *server.Server) { + nctx, cancel := context.WithCancel(context.Background()) + nctx = server.ServerContext(nctx, srv) + nctx = database.Context(nctx, srv.DB) + nctx = timezone.Context(nctx, "UTC") + defer cancel() + + srv.Logger().Printf("parseBaseURLs: starting parser engine") + err := runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (first run): Error %v", err) + return + } + + ticker := time.NewTicker(30 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + cancel() + fmt.Println("parseBaseURLs: Context canceled, shutting down.") + return + case <-ticker.C: + err = runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (loop run): Error %v", err) + return + } + } + } +} diff --git a/cmd/migrations.go b/cmd/migrations.go index 4552650..fa2c179 100644 --- a/cmd/migrations.go +++ b/cmd/migrations.go @@ -45,5 +45,12 @@ func GetMigrations() []migrate.Migration { 0, links.MigrateFS, ), + migrate.FSFileMigration( + "0004_add_parse_fields_baseurls", + "migrations/0004_add_parse_fields_baseurls.up.sql", + "migrations/0004_add_parse_fields_baseurls.down.sql", + 0, + links.MigrateFS, + ), } } diff --git a/core/import.go b/core/import.go index 1797d51..759c5b0 100644 --- a/core/import.go +++ b/core/import.go @@ -469,7 +469,6 @@ func ImportFromPinBoard(ctx context.Context, path string, } totalCount += listlen - fmt.Println("Processed", totalCount, "entries...") //time.Sleep(3 * time.Second) // Let the parse url workers catch up } else { break // No more items to process diff --git a/core/processors.go b/core/processors.go index 5720d32..9c38218 100644 --- a/core/processors.go +++ b/core/processors.go @@ -84,7 +84,7 @@ func ImportBookmarksTask(c echo.Context, origin int, path string, } // ParseBaseURLTask task to parse html title tags. -func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { +func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL, sig chan int) *work.Task { return work.NewTask(func(ctx context.Context) error { ctx = server.ServerContext(ctx, srv) ctx = database.Context(ctx, srv.DB) @@ -105,6 +105,9 @@ func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { "Failed task ParseBaseURLTask %s after %d attempts: %v", task.Metadata["id"], task.Attempts(), task.Result()) } + if sig != nil { + sig <- 1 + } }) } diff --git a/helpers.go b/helpers.go index a5a8d47..b82b692 100644 --- a/helpers.go +++ b/helpers.go @@ -3,6 +3,7 @@ package links import ( "bytes" "context" + "database/sql" "encoding/json" "encoding/xml" "errors" @@ -402,16 +403,41 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { return err } + baseURL.ParseAttempts += 1 + baseURL.LastParseAttempt = sql.NullTime{Valid: true, Time: time.Now().UTC()} + userAgent := BuildUserAgent(ctx) req.Header.Set("User-Agent", userAgent) resp, err := client.Do(req) if err != nil { + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } return err } defer resp.Body.Close() if resp.StatusCode > 299 { - return fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + err = fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + StatusCode: resp.StatusCode, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } + return err } hm := extract(resp.Body) @@ -423,7 +449,7 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { baseURL.Title = baseURL.Title[:150] } baseURL.PublicReady = true - baseURL.Data = models.BaseURLData{Meta: *hm} + baseURL.Data.Meta = *hm err = baseURL.Store(ctx) if err != nil { return err diff --git a/migrations/0004_add_parse_fields_baseurls.down.sql b/migrations/0004_add_parse_fields_baseurls.down.sql new file mode 100644 index 0000000..07fe6cb --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls DROP parse_attempts; +ALTER TABLE base_urls DROP last_parse_attempt; diff --git a/migrations/0004_add_parse_fields_baseurls.up.sql b/migrations/0004_add_parse_fields_baseurls.up.sql new file mode 100644 index 0000000..396c803 --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls ADD parse_attempts INT DEFAULT 0; +ALTER TABLE base_urls ADD last_parse_attempt TIMESTAMPTZ; diff --git a/models/base_url.go b/models/base_url.go index b8a96ef..1442e05 100644 --- a/models/base_url.go +++ b/models/base_url.go @@ -25,9 +25,18 @@ type HTMLMeta struct { SiteName string `json:"site_name"` } +// BaseURLParseError is an error record for failed parsing of BaseURL instances +type BaseURLParseError struct { + Attempt int `json:"attempt"` + Timestamp int64 `json:"timestamp"` + StatusCode int `json:"status_cude"` + ErrorMessage string `json:"error_msg"` +} + // BaseURLData holds metadata for the BaseURL model type BaseURLData struct { - Meta HTMLMeta `json:"meta"` + Meta HTMLMeta `json:"meta"` + ParseErrors []BaseURLParseError `json:"parse_errors,omitempty"` } // Value ... @@ -55,7 +64,7 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, q := opts.GetBuilder(nil) rows, err := q. Columns("b.id", "b.url", "b.title", "b.counter", "b.data", "b.public_ready", "b.hash", - "b.created_on", "json_agg(t)::jsonb"). + "b.parse_attempts", "b.last_parse_attempt", "b.created_on", "json_agg(t)::jsonb"). From("base_urls b"). LeftJoin("org_links ol ON ol.base_url_id = b.id"). LeftJoin("tag_links tl ON tl.org_link_id = ol.id"). @@ -77,7 +86,8 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, var url BaseURL var tags string if err = rows.Scan(&url.ID, &url.URL, &url.Title, &url.Counter, - &url.Data, &url.PublicReady, &url.Hash, &url.CreatedOn, &tags); err != nil { + &url.Data, &url.PublicReady, &url.Hash, &url.ParseAttempts, + &url.LastParseAttempt, &url.CreatedOn, &tags); err != nil { return err } re := regexp.MustCompile(`(,\s)?null,?`) @@ -116,13 +126,14 @@ func (b *BaseURL) Load(ctx context.Context) error { tz := timezone.ForContext(ctx) err := database.WithTx(ctx, database.TxOptionsRO, func(tx *sql.Tx) error { err := sq. - Select("id", "title", "url", "counter", "data", "public_ready", "hash", "created_on"). + Select("id", "title", "url", "counter", "data", "public_ready", "hash", + "parse_attempts", "last_parse_attempt", "created_on"). From("base_urls"). Where("id = ?", b.ID). PlaceholderFormat(sq.Dollar). RunWith(tx). ScanContext(ctx, &b.ID, &b.Title, &b.URL, &b.Counter, &b.Data, - &b.PublicReady, &b.Hash, &b.CreatedOn) + &b.PublicReady, &b.Hash, &b.ParseAttempts, &b.LastParseAttempt, &b.CreatedOn) if err != nil { if err == sql.ErrNoRows { return nil @@ -168,6 +179,8 @@ func (b *BaseURL) Store(ctx context.Context) error { Set("data", b.Data). Set("public_ready", b.PublicReady). Set("hash", b.Hash). + Set("parse_attempts", b.ParseAttempts). + Set("last_parse_attempt", b.LastParseAttempt). Where("id = ?", b.ID). Suffix(`RETURNING (updated_on)`). PlaceholderFormat(sq.Dollar). diff --git a/models/models.go b/models/models.go index 6728306..0a9e61a 100644 --- a/models/models.go +++ b/models/models.go @@ -54,15 +54,17 @@ type Organization struct { // BaseURL ... type BaseURL struct { - ID int `db:"id"` - Title string `db:"title"` - URL string `db:"url"` - Counter int `db:"counter"` - Data BaseURLData `db:"data"` - PublicReady bool `db:"public_ready"` - Hash string `db:"hash" json:"hash"` - CreatedOn time.Time `db:"created_on"` - UpdatedOn time.Time `db:"updated_on"` + ID int `db:"id"` + Title string `db:"title"` + URL string `db:"url"` + Counter int `db:"counter"` + Data BaseURLData `db:"data"` + PublicReady bool `db:"public_ready"` + Hash string `db:"hash" json:"hash"` + ParseAttempts int `db:"parse_attempts"` + LastParseAttempt sql.NullTime `db:"last_parse_attempt"` + CreatedOn time.Time `db:"created_on"` + UpdatedOn time.Time `db:"updated_on"` Tags []Tag `db:"-" json:"tags"` } diff --git a/models/schema.sql b/models/schema.sql index 16028d5..44d226c 100644 --- a/models/schema.sql +++ b/models/schema.sql @@ -151,6 +151,8 @@ CREATE TABLE base_urls ( hash VARCHAR(128) UNIQUE NOT NULL, data JSONB DEFAULT '{}', counter INT DEFAULT 0, + parse_attempts INT DEFAULT 0, + last_parse_attempt TIMESTAMPTZ, created_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- 2.47.2