~netlandish/links-dev

links: Adding a simple go routine to process BaseURL metadata if they have previously failed. v1 APPLIED

Peter Sanchez: 1
 Adding a simple go routine to process BaseURL metadata if they have previously failed.

 12 files changed, 177 insertions(+), 22 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.code.netlandish.com/~netlandish/links-dev/patches/109/mbox | git am -3
Learn more about email & git

[PATCH links] Adding a simple go routine to process BaseURL metadata if they have previously failed. Export this patch

This will allow 1 day between attempts and a maximum of 3 tries before
giving up on parsing that particular URL.

Changelog-changed: Failed metadata collection on base url's will be
  attempted a max of 3 times in 1 day intervals.
---
The fix for removing the base url parsing on bulk imports. This allows
bulk imported base urls to be parsed (in time).

It will only parse groups of 2K every 30 minutes and limit the worker
pool to 5 while parsing. Simple limitations that could/should be tweaked
with more usage and data.

 api/graph/schema.resolvers.go                 |  8 +-
 cmd/links/main.go                             |  5 +
 cmd/links/parse.go                            | 94 +++++++++++++++++++
 cmd/migrations.go                             |  7 ++
 core/import.go                                |  1 -
 core/processors.go                            |  5 +-
 helpers.go                                    | 30 +++++-
 .../0004_add_parse_fields_baseurls.down.sql   |  2 +
 .../0004_add_parse_fields_baseurls.up.sql     |  2 +
 models/base_url.go                            | 23 ++++-
 models/models.go                              | 20 ++--
 models/schema.sql                             |  2 +
 12 files changed, 177 insertions(+), 22 deletions(-)
 create mode 100644 cmd/links/parse.go
 create mode 100644 migrations/0004_add_parse_fields_baseurls.down.sql
 create mode 100644 migrations/0004_add_parse_fields_baseurls.up.sql

diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go
index 7bdc796..c708a12 100644
--- a/api/graph/schema.resolvers.go
+++ b/api/graph/schema.resolvers.go
@@ -509,7 +509,7 @@ func (r *mutationResolver) AddLink(ctx context.Context, input *model.LinkInput)

	if input.Override == nil || (*input.Override && input.ParseBaseURL != nil && *input.ParseBaseURL) {
		if visibility == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
	}

@@ -680,7 +680,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi
		orgLink.BaseURLID = sql.NullInt64{Valid: true, Int64: int64(BaseURL.ID)}
		orgLink.URL = *input.URL
		if input.Visibility != nil && string(*input.Visibility) == models.OrgLinkVisibilityPublic {
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
	}

@@ -706,7 +706,7 @@ func (r *mutationResolver) UpdateLink(ctx context.Context, input *model.UpdateLi
			if err != nil {
				return nil, err
			}
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
			srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
		}
		orgLink.Visibility = string(*input.Visibility)
	}
@@ -1022,7 +1022,7 @@ func (r *mutationResolver) AddNote(ctx context.Context, input *model.NoteInput)

	// We process the based link metadata after saving the current note
	if OrgLinkNote.BaseURLID.Valid && OrgLinkNote.Visibility == models.OrgLinkVisibilityPublic {
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL))
		srv.QueueTask("general", core.ParseBaseURLTask(srv, BaseURL, nil))
	}

	if len(tags) > 0 {
diff --git a/cmd/links/main.go b/cmd/links/main.go
index 2d70c83..4372a40 100644
--- a/cmd/links/main.go
+++ b/cmd/links/main.go
@@ -1,6 +1,7 @@
package main

import (
	"context"
	"fmt"
	"net/http"
	"net/url"
@@ -381,6 +382,10 @@ func run() error {
		e.Reverse("ses-feedback:endpoint"),
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go parseBaseURLs(ctx, srv)
	srv.Run()

	return nil
diff --git a/cmd/links/parse.go b/cmd/links/parse.go
new file mode 100644
index 0000000..0cbe8a4
--- /dev/null
+++ b/cmd/links/parse.go
@@ -0,0 +1,94 @@
package main

import (
	"context"
	"fmt"
	"links/core"
	"links/models"
	"time"

	sq "github.com/Masterminds/squirrel"
	"netlandish.com/x/gobwebs/database"
	"netlandish.com/x/gobwebs/server"
	"netlandish.com/x/gobwebs/timezone"
)

func runParse(ctx context.Context, srv *server.Server) error {
	sig := make(chan int, 5)
	dayAgo := time.Now().AddDate(0, 0, -1).UTC()
	opts := &database.FilterOptions{
		Filter: sq.And{
			sq.Eq{"b.public_ready": false},
			sq.Lt{"b.parse_attempts": 3},
			sq.Gt{"b.counter": 0}, // Should only be > 0 if there are public links
			sq.Or{
				sq.Eq{"b.last_parse_attempt": nil},
				sq.LtOrEq{"b.last_parse_attempt": dayAgo},
			},
		},
		OrderBy: "id", // oldest first
		Limit:   2000, // TODO: Make this random or configurable?
	}

	burls, err := models.GetBaseURLs(ctx, opts)
	if err != nil {
		return err
	}

	parseCnt := 0
	for _, burl := range burls {
		srv.QueueTask("import", core.ParseBaseURLTask(srv, burl, sig))
		parseCnt++
		if parseCnt == 5 {
		loop:
			// Loop until at least one finishes or the context is canceled
			for {
				select {
				case <-ctx.Done():
					srv.Logger().Printf("runParse: Context canceled.")
					return nil
				case <-sig:
					parseCnt--
					break loop
				}
			}
		}
	}
	srv.Logger().Printf("runParse: Finished run")
	return nil
}

// This is a helper function to process base URL's that had errors
// when fetching them.
func parseBaseURLs(ctx context.Context, srv *server.Server) {
	nctx, cancel := context.WithCancel(context.Background())
	nctx = server.ServerContext(nctx, srv)
	nctx = database.Context(nctx, srv.DB)
	nctx = timezone.Context(nctx, "UTC")
	defer cancel()

	srv.Logger().Printf("parseBaseURLs: starting parser engine")
	err := runParse(nctx, srv)
	if err != nil {
		srv.Logger().Printf("!! parseBaseURLs (first run): Error %v", err)
		return
	}

	ticker := time.NewTicker(30 * time.Minute)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			cancel()
			fmt.Println("parseBaseURLs: Context canceled, shutting down.")
			return
		case <-ticker.C:
			err = runParse(nctx, srv)
			if err != nil {
				srv.Logger().Printf("!! parseBaseURLs (loop run): Error %v", err)
				return
			}
		}
	}
}
diff --git a/cmd/migrations.go b/cmd/migrations.go
index 4552650..fa2c179 100644
--- a/cmd/migrations.go
+++ b/cmd/migrations.go
@@ -45,5 +45,12 @@ func GetMigrations() []migrate.Migration {
			0,
			links.MigrateFS,
		),
		migrate.FSFileMigration(
			"0004_add_parse_fields_baseurls",
			"migrations/0004_add_parse_fields_baseurls.up.sql",
			"migrations/0004_add_parse_fields_baseurls.down.sql",
			0,
			links.MigrateFS,
		),
	}
}
diff --git a/core/import.go b/core/import.go
index 1797d51..759c5b0 100644
--- a/core/import.go
+++ b/core/import.go
@@ -469,7 +469,6 @@ func ImportFromPinBoard(ctx context.Context, path string,
			}

			totalCount += listlen
			fmt.Println("Processed", totalCount, "entries...")
			//time.Sleep(3 * time.Second) // Let the parse url workers catch up
		} else {
			break // No more items to process
diff --git a/core/processors.go b/core/processors.go
index 5720d32..9c38218 100644
--- a/core/processors.go
+++ b/core/processors.go
@@ -84,7 +84,7 @@ func ImportBookmarksTask(c echo.Context, origin int, path string,
}

// ParseBaseURLTask task to parse html title tags.
func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task {
func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL, sig chan int) *work.Task {
	return work.NewTask(func(ctx context.Context) error {
		ctx = server.ServerContext(ctx, srv)
		ctx = database.Context(ctx, srv.DB)
@@ -105,6 +105,9 @@ func ParseBaseURLTask(srv *server.Server, baseURL *models.BaseURL) *work.Task {
				"Failed task ParseBaseURLTask %s after %d attempts: %v",
				task.Metadata["id"], task.Attempts(), task.Result())
		}
		if sig != nil {
			sig <- 1
		}
	})
}

diff --git a/helpers.go b/helpers.go
index a5a8d47..b82b692 100644
--- a/helpers.go
+++ b/helpers.go
@@ -3,6 +3,7 @@ package links
import (
	"bytes"
	"context"
	"database/sql"
	"encoding/json"
	"encoding/xml"
	"errors"
@@ -402,16 +403,41 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error {
		return err
	}

	baseURL.ParseAttempts += 1
	baseURL.LastParseAttempt = sql.NullTime{Valid: true, Time: time.Now().UTC()}

	userAgent := BuildUserAgent(ctx)
	req.Header.Set("User-Agent", userAgent)
	resp, err := client.Do(req)
	if err != nil {
		parseErr := models.BaseURLParseError{
			Timestamp:    time.Now().UTC().Unix(),
			Attempt:      baseURL.ParseAttempts,
			ErrorMessage: err.Error(),
		}
		baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr)
		serr := baseURL.Store(ctx)
		if serr != nil {
			return serr
		}
		return err
	}
	defer resp.Body.Close()

	if resp.StatusCode > 299 {
		return fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode)
		err = fmt.Errorf("Failed to fetch %s, status code: %d", baseURL.URL, resp.StatusCode)
		parseErr := models.BaseURLParseError{
			Timestamp:    time.Now().UTC().Unix(),
			Attempt:      baseURL.ParseAttempts,
			StatusCode:   resp.StatusCode,
			ErrorMessage: err.Error(),
		}
		baseURL.Data.ParseErrors = append(baseURL.Data.ParseErrors, parseErr)
		serr := baseURL.Store(ctx)
		if serr != nil {
			return serr
		}
		return err
	}

	hm := extract(resp.Body)
@@ -423,7 +449,7 @@ func ParseBaseURL(ctx context.Context, baseURL *models.BaseURL) error {
		baseURL.Title = baseURL.Title[:150]
	}
	baseURL.PublicReady = true
	baseURL.Data = models.BaseURLData{Meta: *hm}
	baseURL.Data.Meta = *hm
	err = baseURL.Store(ctx)
	if err != nil {
		return err
diff --git a/migrations/0004_add_parse_fields_baseurls.down.sql b/migrations/0004_add_parse_fields_baseurls.down.sql
new file mode 100644
index 0000000..07fe6cb
--- /dev/null
+++ b/migrations/0004_add_parse_fields_baseurls.down.sql
@@ -0,0 +1,2 @@
ALTER TABLE base_urls DROP parse_attempts;
ALTER TABLE base_urls DROP last_parse_attempt;
diff --git a/migrations/0004_add_parse_fields_baseurls.up.sql b/migrations/0004_add_parse_fields_baseurls.up.sql
new file mode 100644
index 0000000..396c803
--- /dev/null
+++ b/migrations/0004_add_parse_fields_baseurls.up.sql
@@ -0,0 +1,2 @@
ALTER TABLE base_urls ADD parse_attempts INT DEFAULT 0;
ALTER TABLE base_urls ADD last_parse_attempt TIMESTAMPTZ;
diff --git a/models/base_url.go b/models/base_url.go
index b8a96ef..1442e05 100644
--- a/models/base_url.go
+++ b/models/base_url.go
@@ -25,9 +25,18 @@ type HTMLMeta struct {
	SiteName    string `json:"site_name"`
}

// BaseURLParseError is an error record for failed parsing of BaseURL instances
type BaseURLParseError struct {
	Attempt      int    `json:"attempt"`
	Timestamp    int64  `json:"timestamp"`
	StatusCode   int    `json:"status_cude"`
	ErrorMessage string `json:"error_msg"`
}

// BaseURLData holds metadata for the BaseURL model
type BaseURLData struct {
	Meta HTMLMeta `json:"meta"`
	Meta        HTMLMeta            `json:"meta"`
	ParseErrors []BaseURLParseError `json:"parse_errors,omitempty"`
}

// Value ...
@@ -55,7 +64,7 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL,
		q := opts.GetBuilder(nil)
		rows, err := q.
			Columns("b.id", "b.url", "b.title", "b.counter", "b.data", "b.public_ready", "b.hash",
				"b.created_on", "json_agg(t)::jsonb").
				"b.parse_attempts", "b.last_parse_attempt", "b.created_on", "json_agg(t)::jsonb").
			From("base_urls b").
			LeftJoin("org_links ol ON ol.base_url_id = b.id").
			LeftJoin("tag_links tl ON tl.org_link_id = ol.id").
@@ -77,7 +86,8 @@ func GetBaseURLs(ctx context.Context, opts *database.FilterOptions) ([]*BaseURL,
			var url BaseURL
			var tags string
			if err = rows.Scan(&url.ID, &url.URL, &url.Title, &url.Counter,
				&url.Data, &url.PublicReady, &url.Hash, &url.CreatedOn, &tags); err != nil {
				&url.Data, &url.PublicReady, &url.Hash, &url.ParseAttempts,
				&url.LastParseAttempt, &url.CreatedOn, &tags); err != nil {
				return err
			}
			re := regexp.MustCompile(`(,\s)?null,?`)
@@ -116,13 +126,14 @@ func (b *BaseURL) Load(ctx context.Context) error {
	tz := timezone.ForContext(ctx)
	err := database.WithTx(ctx, database.TxOptionsRO, func(tx *sql.Tx) error {
		err := sq.
			Select("id", "title", "url", "counter", "data", "public_ready", "hash", "created_on").
			Select("id", "title", "url", "counter", "data", "public_ready", "hash",
				"parse_attempts", "last_parse_attempt", "created_on").
			From("base_urls").
			Where("id = ?", b.ID).
			PlaceholderFormat(sq.Dollar).
			RunWith(tx).
			ScanContext(ctx, &b.ID, &b.Title, &b.URL, &b.Counter, &b.Data,
				&b.PublicReady, &b.Hash, &b.CreatedOn)
				&b.PublicReady, &b.Hash, &b.ParseAttempts, &b.LastParseAttempt, &b.CreatedOn)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil
@@ -168,6 +179,8 @@ func (b *BaseURL) Store(ctx context.Context) error {
				Set("data", b.Data).
				Set("public_ready", b.PublicReady).
				Set("hash", b.Hash).
				Set("parse_attempts", b.ParseAttempts).
				Set("last_parse_attempt", b.LastParseAttempt).
				Where("id = ?", b.ID).
				Suffix(`RETURNING (updated_on)`).
				PlaceholderFormat(sq.Dollar).
diff --git a/models/models.go b/models/models.go
index 6728306..0a9e61a 100644
--- a/models/models.go
+++ b/models/models.go
@@ -54,15 +54,17 @@ type Organization struct {

// BaseURL ...
type BaseURL struct {
	ID          int         `db:"id"`
	Title       string      `db:"title"`
	URL         string      `db:"url"`
	Counter     int         `db:"counter"`
	Data        BaseURLData `db:"data"`
	PublicReady bool        `db:"public_ready"`
	Hash        string      `db:"hash" json:"hash"`
	CreatedOn   time.Time   `db:"created_on"`
	UpdatedOn   time.Time   `db:"updated_on"`
	ID               int          `db:"id"`
	Title            string       `db:"title"`
	URL              string       `db:"url"`
	Counter          int          `db:"counter"`
	Data             BaseURLData  `db:"data"`
	PublicReady      bool         `db:"public_ready"`
	Hash             string       `db:"hash" json:"hash"`
	ParseAttempts    int          `db:"parse_attempts"`
	LastParseAttempt sql.NullTime `db:"last_parse_attempt"`
	CreatedOn        time.Time    `db:"created_on"`
	UpdatedOn        time.Time    `db:"updated_on"`

	Tags []Tag `db:"-" json:"tags"`
}
diff --git a/models/schema.sql b/models/schema.sql
index 16028d5..44d226c 100644
--- a/models/schema.sql
+++ b/models/schema.sql
@@ -151,6 +151,8 @@ CREATE TABLE base_urls (
  hash VARCHAR(128) UNIQUE NOT NULL,
  data JSONB DEFAULT '{}',
  counter INT DEFAULT 0,
  parse_attempts INT DEFAULT 0,
  last_parse_attempt TIMESTAMPTZ,
  created_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_on TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- 
2.47.2
Applied.

To git@git.code.netlandish.com:~netlandish/links
   fd727f9..02c85f0  master -> master