Peter Sanchez: 1 Adding a simple go routine to process BaseURL metadata if they have previously failed. 12 files changed, 177 insertions(+), 22 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.code.netlandish.com/~netlandish/links-dev/patches/109/mbox | git am -3Learn more about email & git
This will allow 1 day between attempts and a maximum of 3 tries before giving up on parsing that particular URL. Changelog-changed: Failed metadata collection on base url's will be attempted a max of 3 times in 1 day intervals. --- The fix for removing the base url parsing on bulk imports. This allows bulk imported base urls to be parsed (in time). It will only parse groups of 2K every 30 minutes and limit the worker pool to 5 while parsing. Simple limitations that could/should be tweaked with more usage and data. api/graph/schema.resolvers.go | 8 +- cmd/links/main.go | 5 + cmd/links/parse.go | 94 +++++++++++++++++++ cmd/migrations.go | 7 ++ core/import.go | 1 - core/processors.go | 5 +- helpers.go | 30 +++++- .../0004_add_parse_fields_baseurls.down.sql | 2 + .../0004_add_parse_fields_baseurls.up.sql | 2 + models/base_url.go | 23 ++++- models/models.go | 20 ++-- models/schema.sql | 2 + 12 files changed, 177 insertions(+), 22 deletions(-) create mode 100644 cmd/links/parse.go create mode 100644 migrations/0004_add_parse_fields_baseurls.down.sql create mode 100644 migrations/0004_add_parse_fields_baseurls.up.sql diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index 7bdc796..c708a12 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -509,7 +509,7 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput) if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) { if visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -680,7 +680,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)} orgLink.URL = *input.URL if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } } @@ -706,7 +706,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi if err != nil { return nil, err } - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } orgLink.Visibility = string(*input.Visibility) } @@ -1022,7 +1022,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput) // We process the based link metadata after saving the current note if OrgLinkNote.BaseURLID.Valid && OrgLinkNote.Visibility == models.OrgLinkVisibilityPublic { - srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL)) + srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil)) } if len(tags) > 0 { diff --git a/cmd/links/main.go b/cmd/links/main.go index 2d70c83..4372a40 100644 --- a/cmd/links/main.go +++ b/cmd/links/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "net/http" "net/url" @@ -381,6 +382,10 @@ func run() error { e.Reverse("ses-feedback:endpoint"), ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go parseBaseURLs(ctx, srv) srv.Run() return nil diff --git a/cmd/links/parse.go b/cmd/links/parse.go new file mode 100644 index 0000000..0cbe8a4 --- /dev/null +++ b/cmd/links/parse.go @@ -0,0 +1,94 @@ +package main + +import ( + "context" + "fmt" + "links/core" + "links/models" + "time" + + sq "github.com/Masterminds/squirrel" + "netlandish.com/x/gobwebs/database" + "netlandish.com/x/gobwebs/server" + "netlandish.com/x/gobwebs/timezone" +) + +func runParse(ctx context.Context, srv *server.Server) error { + sig := make(chan int, 5) + dayAgo := time.Now().AddDate(0, 0, -1).UTC() + opts := &database.FilterOptions{ + Filter: sq.And{ + sq.Eq{"b.public_ready": false}, + sq.Lt{"b.parse_attempts": 3}, + sq.Gt{"b.counter": 0}, // Should only be > 0 if there are public links + sq.Or{ + sq.Eq{"b.last_parse_attempt": nil}, + sq.LtOrEq{"b.last_parse_attempt": dayAgo}, + }, + }, + OrderBy: "id", // oldest first + Limit: 2000, // TODO: Make this random or configurable? + } + + burls, err := models.GetBaseURLs(ctx, opts) + if err != nil { + return err + } + + parseCnt := 0 + for _, burl := range burls { + srv.QueueTask("import", core.ParseBaseURLTask(srv, burl, sig)) + parseCnt++ + if parseCnt == 5 { + loop: + // Loop until at least one finishes or the context is canceled + for { + select { + case <-ctx.Done(): + srv.Logger().Printf("runParse: Context canceled.") + return nil + case <-sig: + parseCnt-- + break loop + } + } + } + } + srv.Logger().Printf("runParse: Finished run") + return nil +} + +// This is a helper function to process base URL's that had errors +// when fetching them. +func parseBaseURLs(ctx context.Context, srv *server.Server) { + nctx, cancel := context.WithCancel(context.Background()) + nctx = server.ServerContext(nctx, srv) + nctx = database.Context(nctx, srv.DB) + nctx = timezone.Context(nctx, "UTC") + defer cancel() + + srv.Logger().Printf("parseBaseURLs: starting parser engine") + err := runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (first run): Error %v", err) + return + } + + ticker := time.NewTicker(30 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + cancel() + fmt.Println("parseBaseURLs: Context canceled, shutting down.") + return + case <-ticker.C: + err = runParse(nctx, srv) + if err != nil { + srv.Logger().Printf("!! parseBaseURLs (loop run): Error %v", err) + return + } + } + } +} diff --git a/cmd/migrations.go b/cmd/migrations.go index 4552650..fa2c179 100644 --- a/cmd/migrations.go +++ b/cmd/migrations.go @@ -45,5 +45,12 @@ func GetMigrations() []migrate.Migration { 0, links.MigrateFS, ), + migrate.FSFileMigration( + "0004_add_parse_fields_baseurls", + "migrations/0004_add_parse_fields_baseurls.up.sql", + "migrations/0004_add_parse_fields_baseurls.down.sql", + 0, + links.MigrateFS, + ), } } diff --git a/core/import.go b/core/import.go index 1797d51..759c5b0 100644 --- a/core/import.go +++ b/core/import.go @@ -469,7 +469,6 @@ func ImportFromPinBoard(ctx context.Context, path string, } totalCount += listlen - fmt.Println("Processed", totalCount, "entries...") //time.Sleep(3 * time.Second) // Let the parse url workers catch up } else { break // No more items to process diff --git a/core/processors.go b/core/processors.go index 5720d32..9c38218 100644 --- a/core/processors.go +++ b/core/processors.go @@ -84,7 +84,7 @@ func ImportBookmarksTask(c echo.Context, origin int, path string, } // ParseBaseURLTask task to parse html title tags. -func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { +func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL, sig chan int) *work.Task { return work.NewTask(func(ctx context.Context) error { ctx = server.ServerContext(ctx, srv) ctx = database.Context(ctx, srv.DB) @@ -105,6 +105,9 @@ func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task { "Failed task ParseBaseURLTask %s after %d attempts: %v", task.Metadata["id"], task.Attempts(), task.Result()) } + if sig != nil { + sig <- 1 + } }) } diff --git a/helpers.go b/helpers.go index a5a8d47..b82b692 100644 --- a/helpers.go +++ b/helpers.go @@ -3,6 +3,7 @@ package links import ( "bytes" "context" + "database/sql" "encoding/json" "encoding/xml" "errors" @@ -402,16 +403,41 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { return err } + baseURL.ParseAttempts += 1 + baseURL.LastParseAttempt = sql.NullTime{Valid: true, Time: time.Now().UTC()} + userAgent := BuildUserAgent(ctx) req.Header.Set("User-Agent", userAgent) resp, err := client.Do(req) if err != nil { + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } return err } defer resp.Body.Close() if resp.StatusCode > 299 { - return fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + err = fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode) + parseErr := models.BaseURLParseError{ + Timestamp: time.Now().UTC().Unix(), + Attempt: baseURL.ParseAttempts, + StatusCode: resp.StatusCode, + ErrorMessage: err.Error(), + } + baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr) + serr := baseURL.Store(ctx) + if serr != nil { + return serr + } + return err } hm := extract(resp.Body) @@ -423,7 +449,7 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error { baseURL.Title = baseURL.Title[:150] } baseURL.PublicReady = true - baseURL.Data = models.BaseURLData{Meta: *hm} + baseURL.Data.Meta = *hm err = baseURL.Store(ctx) if err != nil { return err diff --git a/migrations/0004_add_parse_fields_baseurls.down.sql b/migrations/0004_add_parse_fields_baseurls.down.sql new file mode 100644 index 0000000..07fe6cb --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls DROP parse_attempts; +ALTER TABLE base_urls DROP last_parse_attempt; diff --git a/migrations/0004_add_parse_fields_baseurls.up.sql b/migrations/0004_add_parse_fields_baseurls.up.sql new file mode 100644 index 0000000..396c803 --- /dev/null +++ b/migrations/0004_add_parse_fields_baseurls.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE base_urls ADD parse_attempts INT DEFAULT 0; +ALTER TABLE base_urls ADD last_parse_attempt TIMESTAMPTZ; diff --git a/models/base_url.go b/models/base_url.go index b8a96ef..1442e05 100644 --- a/models/base_url.go +++ b/models/base_url.go @@ -25,9 +25,18 @@ type HTMLMeta struct { SiteName string `json:"site_name"` } +// BaseURLParseError is an error record for failed parsing of BaseURL instances +type BaseURLParseError struct { + Attempt int `json:"attempt"` + Timestamp int64 `json:"timestamp"` + StatusCode int `json:"status_cude"` + ErrorMessage string `json:"error_msg"` +} + // BaseURLData holds metadata for the BaseURL model type BaseURLData struct { - Meta HTMLMeta `json:"meta"` + Meta HTMLMeta `json:"meta"` + ParseErrors []BaseURLParseError `json:"parse_errors,omitempty"` } // Value ... @@ -55,7 +64,7 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, q := opts.GetBuilder(nil) rows, err := q. Columns("b.id", "b.url", "b.title", "b.counter", "b.data", "b.public_ready", "b.hash", - "b.created_on", "json_agg(t)::jsonb"). + "b.parse_attempts", "b.last_parse_attempt", "b.created_on", "json_agg(t)::jsonb"). From("base_urls b"). LeftJoin("org_links ol ON ol.base_url_id = b.id"). LeftJoin("tag_links tl ON tl.org_link_id = ol.id"). @@ -77,7 +86,8 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL, var url BaseURL var tags string if err = rows.Scan(&url.ID, &url.URL, &url.Title, &url.Counter, - &url.Data, &url.PublicReady, &url.Hash, &url.CreatedOn, &tags); err != nil { + &url.Data, &url.PublicReady, &url.Hash, &url.ParseAttempts, + &url.LastParseAttempt, &url.CreatedOn, &tags); err != nil { return err } re := regexp.MustCompile(`(,\s)?null,?`) @@ -116,13 +126,14 @@ func (b *BaseURL) Load(ctx context.Context) error { tz := timezone.ForContext(ctx) err := database.WithTx(ctx, database.TxOptionsRO, func(tx *sql.Tx) error { err := sq. - Select("id", "title", "url", "counter", "data", "public_ready", "hash", "created_on"). + Select("id", "title", "url", "counter", "data", "public_ready", "hash", + "parse_attempts", "last_parse_attempt", "created_on"). From("base_urls"). Where("id = ?", b.ID). PlaceholderFormat(sq.Dollar). RunWith(tx). ScanContext(ctx, &b.ID, &b.Title, &b.URL, &b.Counter, &b.Data, - &b.PublicReady, &b.Hash, &b.CreatedOn) + &b.PublicReady, &b.Hash, &b.ParseAttempts, &b.LastParseAttempt, &b.CreatedOn) if err != nil { if err == sql.ErrNoRows { return nil @@ -168,6 +179,8 @@ func (b *BaseURL) Store(ctx context.Context) error { Set("data", b.Data). Set("public_ready", b.PublicReady). Set("hash", b.Hash). + Set("parse_attempts", b.ParseAttempts). + Set("last_parse_attempt", b.LastParseAttempt). Where("id = ?", b.ID). Suffix(`RETURNING (updated_on)`). PlaceholderFormat(sq.Dollar). diff --git a/models/models.go b/models/models.go index 6728306..0a9e61a 100644 --- a/models/models.go +++ b/models/models.go @@ -54,15 +54,17 @@ type Organization struct { // BaseURL ... type BaseURL struct { - ID int `db:"id"` - Title string `db:"title"` - URL string `db:"url"` - Counter int `db:"counter"` - Data BaseURLData `db:"data"` - PublicReady bool `db:"public_ready"` - Hash string `db:"hash" json:"hash"` - CreatedOn time.Time `db:"created_on"` - UpdatedOn time.Time `db:"updated_on"` + ID int `db:"id"` + Title string `db:"title"` + URL string `db:"url"` + Counter int `db:"counter"` + Data BaseURLData `db:"data"` + PublicReady bool `db:"public_ready"` + Hash string `db:"hash" json:"hash"` + ParseAttempts int `db:"parse_attempts"` + LastParseAttempt sql.NullTime `db:"last_parse_attempt"` + CreatedOn time.Time `db:"created_on"` + UpdatedOn time.Time `db:"updated_on"` Tags []Tag `db:"-" json:"tags"` } diff --git a/models/schema.sql b/models/schema.sql index 16028d5..44d226c 100644 --- a/models/schema.sql +++ b/models/schema.sql @@ -151,6 +151,8 @@ CREATE TABLE base_urls ( hash VARCHAR(128) UNIQUE NOT NULL, data JSONB DEFAULT '{}', counter INT DEFAULT 0, + parse_attempts INT DEFAULT 0, + last_parse_attempt TIMESTAMPTZ, created_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP ); -- 2.47.2
Applied. To git@git.code.netlandish.com:~netlandish/links fd727f9..02c85f0 master -> master