diff --git a/Makefile b/Makefile index 19e18d231..700849bed 100644 --- a/Makefile +++ b/Makefile @@ -113,7 +113,7 @@ spellcheck: # Spellcheck docs .PHONY: cover cover: get-envoy ## Runs go test with coverage @echo "==> $@" - $(GO) test -race -coverprofile=coverage.txt -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration) + $(GO) test -race -coverprofile=coverage.txt -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration | grep -v github.com/pomerium/pomerium/pkg/storage/postgres) @sed -i.bak '/\.pb\.go\:/d' coverage.txt @sed -i.bak '/\/mock\.go\:/d' coverage.txt @sort -o coverage.txt coverage.txt diff --git a/config/helpers.go b/config/helpers.go index 2e9d5aa40..39d9e60fb 100644 --- a/config/helpers.go +++ b/config/helpers.go @@ -15,6 +15,8 @@ const ( ServiceDataBroker = "databroker" // StorageRedisName is the name of the redis storage backend StorageRedisName = "redis" + // StoragePostgresName is the name of the Postgres storage backend + StoragePostgresName = "postgres" // StorageInMemoryName is the name of the in-memory storage backend StorageInMemoryName = "memory" ) diff --git a/config/options.go b/config/options.go index 74c5eedb7..e930c6297 100644 --- a/config/options.go +++ b/config/options.go @@ -540,7 +540,7 @@ func (o *Options) Validate() error { switch o.DataBrokerStorageType { case StorageInMemoryName: - case StorageRedisName: + case StorageRedisName, StoragePostgresName: if o.DataBrokerStorageConnectionString == "" { return errors.New("config: missing databroker storage backend dsn") } diff --git a/go.mod b/go.mod index 4ae8fe3e8..bd6b27ad4 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,9 @@ require ( require ( github.com/CAFxX/httpcompression v0.0.8 + github.com/jackc/pgconn v1.12.1 + github.com/jackc/pgtype v1.11.0 + github.com/jackc/pgx/v4 v4.16.1 github.com/kentik/patricia v1.0.0 ) @@ -169,6 +172,12 @@ require ( github.com/iancoleman/strcase v0.2.0 // indirect github.com/imdario/mergo v0.3.12 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect + github.com/jackc/puddle v1.2.1 // indirect github.com/jgautheron/goconst v1.5.1 // indirect github.com/jingyugao/rowserrcheck v1.1.1 // indirect github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect diff --git a/go.sum b/go.sum index 15d4472b0..50e6bd72f 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,7 @@ github.com/Masterminds/goutils v1.1.0/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy86 github.com/Masterminds/semver v1.4.2/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/Masterminds/sprig v2.15.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/Masterminds/sprig v2.22.0+incompatible/go.mod h1:y6hNFY5UBTIWBxnzTeuNhlNS5hqE0NB0E6fgfo2Br3o= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -282,6 +283,8 @@ github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 h1:KwaoQzs/WeUxxJqiJsZ4euOly1Az/IgZXXSxlD/UBNk= github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/cockroachdb/datadriven v0.0.0-20200714090401-bf6692d28da5/go.mod h1:h6jFvWxBdQXxjopDMZyH2UVceIRfR84bdzbkoKrsWNo= github.com/cockroachdb/errors v1.2.4/go.mod h1:rQD95gz6FARkaKkQXUksEje/d9a6wBJoCr5oaCLELYA= @@ -402,6 +405,7 @@ github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190620071333-e64a0ec8b42a/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -620,6 +624,7 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU= @@ -896,6 +901,54 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/intel/goresctrl v0.2.0/go.mod h1:+CZdzouYFn5EsxgqAQTEzMfwKwuc0fVdMrT9FCCAVRQ= github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/j-keck/arping v1.0.2/go.mod h1:aJbELhR92bSk7tp79AWM/ftfc90EfEi2bQJrbBFOsPw= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= +github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= +github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgconn v1.12.1 h1:rsDFzIpRk7xT4B8FufgpCCeyjdNpKyghZeSefViE5W8= +github.com/jackc/pgconn v1.12.1/go.mod h1:ZkhRC59Llhrq3oSfrikvwQ5NaxYExr6twkdkMLaKono= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.0 h1:brH0pCGBDkBW07HWlN/oSBXrmo3WB0UvZd1pIuDcL8Y= +github.com/jackc/pgproto3/v2 v2.3.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= +github.com/jackc/pgtype v1.11.0 h1:u4uiGPz/1hryuXzyaBhSk6dnIyyG2683olG2OV+UUgs= +github.com/jackc/pgtype v1.11.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= +github.com/jackc/pgx/v4 v4.16.1 h1:JzTglcal01DrghUqt+PmzWsZx/Yh7SC/CTQmSBMTd0Y= +github.com/jackc/pgx/v4 v4.16.1/go.mod h1:SIhx0D5hoADaiXZVyv+3gSm3LCIIINTVO0PficsvWGQ= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw= +github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= @@ -968,6 +1021,7 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= @@ -987,8 +1041,11 @@ github.com/leonklingele/grouper v1.1.0/go.mod h1:uk3I3uDfi9B6PeUjsCKi6ndcf63Uy7s github.com/letsencrypt/pkcs11key/v4 v4.0.0/go.mod h1:EFUvBDay26dErnNb70Nd0/VW3tJiIbETBPTl9ATXQag= github.com/lib/pq v0.0.0-20180327071824-d34b9ff171c2/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libdns/libdns v0.2.1 h1:Wu59T7wSHRgtA0cfxC+n1c/e+O3upJGWytknkmFEDis= @@ -1021,6 +1078,7 @@ github.com/matoous/godox v0.0.0-20210227103229-6504466cf951/go.mod h1:1BELzlh859 github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -1029,6 +1087,8 @@ github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZb github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= @@ -1346,7 +1406,10 @@ github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4 github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.8.2 h1:KCooALfAYGs415Cwu5ABvv9n9509fSiG5SQJn/AQo4U= github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= @@ -1379,6 +1442,9 @@ github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c h1:W65qqJCIOVP4jpqP github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs= github.com/shirou/gopsutil/v3 v3.22.4 h1:srAQaiX6jX/cYL6q29aE0m8lOskT9CurZ9N61YR3yoI= github.com/shirou/gopsutil/v3 v3.22.4/go.mod h1:D01hZJ4pVHPpCTZ3m3T2+wDF2YAGfd+H4ifUguaQzHM= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= @@ -1572,6 +1638,7 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= gitlab.com/bosi/decorder v0.2.1 h1:ehqZe8hI4w7O4b1vgsDZw1YU1PE7iJXrQWFMsocbQ1w= gitlab.com/bosi/decorder v0.2.1/go.mod h1:6C/nhLSbF6qZbYD8bRmISBwc6vcWdNsiIBkRvjJFrH0= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1653,6 +1720,7 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= @@ -1666,6 +1734,7 @@ golang.org/x/crypto v0.0.0-20181009213950-7c1a557ab941/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -1678,10 +1747,13 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210920023735-84f357641f63/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -1857,6 +1929,7 @@ golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1872,6 +1945,7 @@ golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -2017,6 +2091,7 @@ golang.org/x/tools v0.0.0-20190321232350-e250d351ecad/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190322203728-c1a832b0ad89/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -2026,6 +2101,7 @@ golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190706070813-72ffa07ba3db/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190910044552-dd2b5c81c578/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -2044,6 +2120,7 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200117220505-0cba7a3a9ee9/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -2110,6 +2187,8 @@ golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a h1:ofrrl6c6NG5/IOSx/R1cyiQxxjqlur0h/TvbUhkH0II= golang.org/x/tools v0.1.11-0.20220316014157-77aa08bb151a/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -2333,6 +2412,7 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/internal/databroker/registry.go b/internal/databroker/registry.go index 9d67839de..b858246d7 100644 --- a/internal/databroker/registry.go +++ b/internal/databroker/registry.go @@ -90,7 +90,7 @@ func (srv *Server) newRegistryLocked() (registry.Interface, error) { ctx := context.Background() switch srv.cfg.storageType { - case config.StorageInMemoryName: + case config.StorageInMemoryName, config.StoragePostgresName: log.Info(ctx).Msg("using in-memory registry") return inmemory.New(ctx, srv.cfg.registryTTL), nil case config.StorageRedisName: diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 9608e78b8..d9a463460 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -23,6 +23,7 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" "github.com/pomerium/pomerium/pkg/storage/inmemory" + "github.com/pomerium/pomerium/pkg/storage/postgres" "github.com/pomerium/pomerium/pkg/storage/redis" ) @@ -186,14 +187,21 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr defer span.End() records := req.GetRecords() - var recordType string - for _, record := range records { - recordType = record.GetType() + if len(records) == 1 { + log.Info(ctx). + Str("record-type", records[0].GetType()). + Str("record-id", records[0].GetId()). + Msg("put") + } else { + var recordType string + for _, record := range records { + recordType = record.GetType() + } + log.Info(ctx). + Int("record-count", len(records)). + Str("record-type", recordType). + Msg("put") } - log.Info(ctx). - Int("record-count", len(records)). - Str("record-type", recordType). - Msg("put") db, err := srv.getBackend() if err != nil { @@ -398,6 +406,9 @@ func (srv *Server) newBackendLocked() (backend storage.Backend, err error) { case config.StorageInMemoryName: log.Info(ctx).Msg("using in-memory store") return inmemory.New(), nil + case config.StoragePostgresName: + log.Info(ctx).Msg("using postgres store") + backend = postgres.New(srv.cfg.storageConnectionString) case config.StorageRedisName: log.Info(ctx).Msg("using redis store") backend, err = redis.New( @@ -407,15 +418,15 @@ func (srv *Server) newBackendLocked() (backend storage.Backend, err error) { if err != nil { return nil, fmt.Errorf("failed to create new redis storage: %w", err) } + if srv.cfg.secret != nil { + backend, err = storage.NewEncryptedBackend(srv.cfg.secret, backend) + if err != nil { + return nil, err + } + } default: return nil, fmt.Errorf("unsupported storage type: %s", srv.cfg.storageType) } - if srv.cfg.secret != nil { - backend, err = storage.NewEncryptedBackend(srv.cfg.secret, backend) - if err != nil { - return nil, err - } - } return backend, nil } diff --git a/internal/identity/oauth/github/github.go b/internal/identity/oauth/github/github.go index 36c4c6cc2..8e3715282 100644 --- a/internal/identity/oauth/github/github.go +++ b/internal/identity/oauth/github/github.go @@ -42,7 +42,7 @@ const ( refreshDeadline = time.Minute * 60 ) -var maxTime = time.Unix(1<<63-1, 0) +var maxTime = time.Unix(253370793661, 0) // year 9999 // https://developer.github.com/apps/building-oauth-apps/understanding-scopes-for-oauth-apps/ var defaultScopes = []string{"user:email", "read:org"} diff --git a/internal/testutil/postgres.go b/internal/testutil/postgres.go new file mode 100644 index 000000000..8dbcd6749 --- /dev/null +++ b/internal/testutil/postgres.go @@ -0,0 +1,55 @@ +package testutil + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v4" + "github.com/ory/dockertest/v3" + + "github.com/pomerium/pomerium/internal/log" +) + +// WithTestPostgres starts a test DB and runs the given handler with the connection to it. +func WithTestPostgres(handler func(dsn string) error) error { + ctx, clearTimeout := context.WithTimeout(context.Background(), maxWait) + defer clearTimeout() + + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + return err + } + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "14", + Env: []string{"POSTGRES_DB=pomeriumtest", "POSTGRES_HOST_AUTH_METHOD=trust"}, + }) + if err != nil { + return err + } + _ = resource.Expire(uint(maxWait.Seconds())) + + dsn := fmt.Sprintf("postgresql://postgres@localhost:%s/pomeriumtest?sslmode=disable", resource.GetPort("5432/tcp")) + if err := pool.Retry(func() error { + conn, err := pgx.Connect(ctx, dsn) + if err != nil { + log.Error(ctx).Err(err).Send() + return err + } + _ = conn.Close(ctx) + return nil + }); err != nil { + _ = pool.Purge(resource) + return err + } + + e := handler(dsn) + + if err := pool.Purge(resource); err != nil { + return err + } + + return e +} diff --git a/pkg/contextutil/contextutil.go b/pkg/contextutil/contextutil.go new file mode 100644 index 000000000..cd12f17e6 --- /dev/null +++ b/pkg/contextutil/contextutil.go @@ -0,0 +1,66 @@ +// Package contextutil contains functions for working with contexts. +package contextutil + +import ( + "context" + "time" +) + +type mergedCtx struct { + ctx1, ctx2 context.Context + + doneCtx context.Context + doneCancel context.CancelFunc +} + +// Merge merges two contexts into a single context. +func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { + mc := &mergedCtx{ + ctx1: ctx1, + ctx2: ctx2, + } + mc.doneCtx, mc.doneCancel = context.WithCancel(context.Background()) + go func() { + select { + case <-ctx1.Done(): + case <-ctx2.Done(): + case <-mc.doneCtx.Done(): + } + mc.doneCancel() + }() + return mc, mc.doneCancel +} + +func (mc *mergedCtx) Deadline() (deadline time.Time, ok bool) { + if deadline, ok = mc.ctx1.Deadline(); ok { + return deadline, ok + } + if deadline, ok = mc.ctx2.Deadline(); ok { + return deadline, ok + } + return mc.doneCtx.Deadline() +} + +func (mc *mergedCtx) Done() <-chan struct{} { + return mc.doneCtx.Done() +} + +func (mc *mergedCtx) Err() error { + if err := mc.ctx1.Err(); err != nil { + return mc.ctx1.Err() + } + if err := mc.ctx2.Err(); err != nil { + return mc.ctx2.Err() + } + return mc.doneCtx.Err() +} + +func (mc *mergedCtx) Value(key interface{}) interface{} { + if value := mc.ctx1.Value(key); value != nil { + return value + } + if value := mc.ctx2.Value(key); value != nil { + return value + } + return mc.doneCtx.Value(key) +} diff --git a/pkg/contextutil/contextutil_test.go b/pkg/contextutil/contextutil_test.go new file mode 100644 index 000000000..316ebc9ad --- /dev/null +++ b/pkg/contextutil/contextutil_test.go @@ -0,0 +1,34 @@ +package contextutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMerge(t *testing.T) { + t.Run("value", func(t *testing.T) { + ctx1 := context.WithValue(context.Background(), "key1", "value1") //nolint + ctx2 := context.WithValue(context.Background(), "key2", "value2") //nolint + ctx3, _ := Merge(ctx1, ctx2) + assert.Equal(t, "value1", ctx3.Value("key1")) + assert.Equal(t, "value2", ctx3.Value("key2")) + }) + t.Run("cancel", func(t *testing.T) { + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + ctx2, cancel2 := context.WithCancel(context.Background()) + ctx3, _ := Merge(ctx1, ctx2) + cancel2() + assert.Eventually(t, func() bool { + select { + case <-ctx3.Done(): + return true + default: + return false + } + }, time.Second, time.Millisecond*100) + }) +} diff --git a/pkg/cryptutil/helpers.go b/pkg/cryptutil/helpers.go index 34ac7df48..030107814 100644 --- a/pkg/cryptutil/helpers.go +++ b/pkg/cryptutil/helpers.go @@ -30,6 +30,13 @@ func NewRandomStringN(c int) string { return base64.StdEncoding.EncodeToString(randomBytes(c)) } +// NewRandomUInt32 returns a random uint32. +// +// Panics if source of randomness fails. +func NewRandomUInt32() uint32 { + return binary.LittleEndian.Uint32(randomBytes(4)) +} + // NewRandomUInt64 returns a random uint64. // // Panics if source of randomness fails. diff --git a/pkg/storage/postgres/backend.go b/pkg/storage/postgres/backend.go new file mode 100644 index 000000000..e8013efc6 --- /dev/null +++ b/pkg/storage/postgres/backend.go @@ -0,0 +1,358 @@ +package postgres + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/signal" + "github.com/pomerium/pomerium/pkg/contextutil" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/storage" +) + +// Backend is a storage Backend implemented with Postgres. +type Backend struct { + cfg *config + dsn string + onChange *signal.Signal + + closeCtx context.Context + close context.CancelFunc + + mu sync.RWMutex + pool *pgxpool.Pool + serverVersion uint64 +} + +// New creates a new Backend. +func New(dsn string, options ...Option) *Backend { + backend := &Backend{ + cfg: getConfig(options...), + dsn: dsn, + onChange: signal.New(), + } + backend.closeCtx, backend.close = context.WithCancel(context.Background()) + go backend.doPeriodically(func(ctx context.Context) error { + _, pool, err := backend.init(ctx) + if err != nil { + return err + } + + return deleteChangesBefore(ctx, pool, time.Now().Add(-backend.cfg.expiry)) + }, time.Minute) + go backend.doPeriodically(func(ctx context.Context) error { + _, pool, err := backend.init(backend.closeCtx) + if err != nil { + return err + } + + conn, err := pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + _, err = conn.Exec(ctx, `LISTEN `+recordChangeNotifyName) + if err != nil { + return err + } + + _, err = conn.Conn().WaitForNotification(ctx) + if err != nil { + return err + } + + backend.onChange.Broadcast(ctx) + + return nil + }, time.Millisecond*100) + return backend +} + +// Close closes the underlying database connection. +func (backend *Backend) Close() error { + backend.mu.Lock() + defer backend.mu.Unlock() + + backend.close() + + if backend.pool != nil { + backend.pool.Close() + backend.pool = nil + } + return nil +} + +// Get gets a record from the database. +func (backend *Backend) Get( + ctx context.Context, + recordType, recordID string, +) (*databroker.Record, error) { + ctx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + _, conn, err := backend.init(ctx) + if err != nil { + return nil, err + } + + return getRecord(ctx, conn, recordType, recordID) +} + +// GetOptions returns the options for the given record type. +func (backend *Backend) GetOptions( + ctx context.Context, + recordType string, +) (*databroker.Options, error) { + ctx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + _, conn, err := backend.init(ctx) + if err != nil { + return nil, err + } + + return getOptions(ctx, conn, recordType) +} + +// Lease attempts to acquire a lease for the given name. +func (backend *Backend) Lease( + ctx context.Context, + leaseName, leaseID string, + ttl time.Duration, +) (acquired bool, err error) { + ctx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + _, conn, err := backend.init(ctx) + if err != nil { + return false, err + } + + leaseHolderID, err := maybeAcquireLease(ctx, conn, leaseName, leaseID, ttl) + if err != nil { + return false, err + } + + return leaseHolderID == leaseID, nil +} + +// Put puts a record into Postgres. +func (backend *Backend) Put( + ctx context.Context, + records []*databroker.Record, +) (serverVersion uint64, err error) { + ctx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + serverVersion, pool, err := backend.init(ctx) + if err != nil { + return 0, err + } + + err = pool.BeginTxFunc(ctx, pgx.TxOptions{ + IsoLevel: pgx.Serializable, + AccessMode: pgx.ReadWrite, + }, func(tx pgx.Tx) error { + now := timestamppb.Now() + + recordVersion, err := getLatestRecordVersion(ctx, tx) + if err != nil { + return fmt.Errorf("storage/postgres: error getting latest record version: %w", err) + } + + // add all the records + recordTypes := map[string]struct{}{} + for i, record := range records { + recordTypes[record.GetType()] = struct{}{} + + record = dup(record) + record.ModifiedAt = now + record.Version = recordVersion + uint64(i) + 1 + err := putRecordChange(ctx, tx, record) + if err != nil { + return fmt.Errorf("storage/postgres: error saving record change: %w", err) + } + + err = putRecord(ctx, tx, record) + if err != nil { + return fmt.Errorf("storage/postgres: error saving record: %w", err) + } + records[i] = record + } + + // enforce options for each record type + for recordType := range recordTypes { + options, err := getOptions(ctx, tx, recordType) + if err != nil { + return fmt.Errorf("storage/postgres: error getting options: %w", err) + } + err = enforceOptions(ctx, tx, recordType, options) + if err != nil { + return fmt.Errorf("storage/postgres: error enforcing options: %w", err) + } + } + + return nil + }) + if err != nil { + return serverVersion, err + } + + err = signalRecordChange(ctx, pool) + return serverVersion, err +} + +// SetOptions sets the options for the given record type. +func (backend *Backend) SetOptions( + ctx context.Context, + recordType string, + options *databroker.Options, +) error { + ctx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + _, conn, err := backend.init(ctx) + if err != nil { + return err + } + + return setOptions(ctx, conn, recordType, options) +} + +// Sync syncs the records. +func (backend *Backend) Sync( + ctx context.Context, + serverVersion, recordVersion uint64, +) (storage.RecordStream, error) { + // the original ctx will be used for the stream, this ctx used for pre-stream calls + callCtx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + currentServerVersion, _, err := backend.init(callCtx) + if err != nil { + return nil, err + } + if currentServerVersion != serverVersion { + return nil, storage.ErrInvalidServerVersion + } + + return newChangedRecordStream(ctx, backend, recordVersion), nil +} + +// SyncLatest syncs the latest version of each record. +func (backend *Backend) SyncLatest( + ctx context.Context, + recordType string, + expr storage.FilterExpression, +) (serverVersion, recordVersion uint64, stream storage.RecordStream, err error) { + // the original ctx will be used for the stream, this ctx used for pre-stream calls + callCtx, cancel := contextutil.Merge(ctx, backend.closeCtx) + defer cancel() + + serverVersion, pool, err := backend.init(callCtx) + if err != nil { + return 0, 0, nil, err + } + + recordVersion, err = getLatestRecordVersion(callCtx, pool) + if err != nil { + return 0, 0, nil, err + } + + if recordType != "" { + f := storage.EqualsFilterExpression{ + Fields: []string{"type"}, + Value: recordType, + } + if expr != nil { + expr = storage.AndFilterExpression{expr, f} + } else { + expr = f + } + } + + stream = newRecordStream(ctx, backend, expr) + return serverVersion, recordVersion, stream, nil +} + +func (backend *Backend) init(ctx context.Context) (serverVersion uint64, pool *pgxpool.Pool, err error) { + backend.mu.RLock() + serverVersion = backend.serverVersion + pool = backend.pool + backend.mu.RUnlock() + + if pool != nil { + return serverVersion, pool, nil + } + + backend.mu.Lock() + defer backend.mu.Unlock() + + // double-checked locking, might have already initialized, so just return + serverVersion = backend.serverVersion + pool = backend.pool + if pool != nil { + return serverVersion, pool, nil + } + + config, err := pgxpool.ParseConfig(backend.dsn) + if err != nil { + return serverVersion, nil, err + } + + pool, err = pgxpool.ConnectConfig(context.Background(), config) + if err != nil { + return serverVersion, nil, err + } + + err = pool.BeginFunc(ctx, func(tx pgx.Tx) error { + var err error + serverVersion, err = migrate(ctx, tx) + return err + }) + if err != nil { + return serverVersion, nil, err + } + + backend.serverVersion = serverVersion + backend.pool = pool + return serverVersion, pool, nil +} + +func (backend *Backend) doPeriodically(f func(ctx context.Context) error, dur time.Duration) { + ctx := backend.closeCtx + + ticker := time.NewTicker(dur) + defer ticker.Stop() + + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 0 + + for { + err := f(ctx) + if err == nil { + bo.Reset() + select { + case <-backend.closeCtx.Done(): + return + case <-ticker.C: + } + } else { + log.Error(ctx).Err(err).Msg("storage/postgres") + select { + case <-backend.closeCtx.Done(): + return + case <-time.After(bo.NextBackOff()): + } + } + } +} diff --git a/pkg/storage/postgres/backend_test.go b/pkg/storage/postgres/backend_test.go new file mode 100644 index 000000000..f482afb1c --- /dev/null +++ b/pkg/storage/postgres/backend_test.go @@ -0,0 +1,148 @@ +package postgres + +import ( + "context" + "fmt" + "os" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/pomerium/pomerium/internal/testutil" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/protoutil" + "github.com/pomerium/pomerium/pkg/storage" +) + +func TestBackend(t *testing.T) { + if os.Getenv("GITHUB_ACTION") != "" && runtime.GOOS == "darwin" { + t.Skip("Github action can not run docker on MacOS") + } + + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) + defer clearTimeout() + + require.NoError(t, testutil.WithTestPostgres(func(dsn string) error { + backend := New(dsn) + defer backend.Close() + + t.Run("put", func(t *testing.T) { + serverVersion, err := backend.Put(ctx, []*databroker.Record{ + {Type: "test-1", Id: "r1", Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{ + "k1": protoutil.NewStructString("v1"), + }))}, + {Type: "test-1", Id: "r2", Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{ + "k2": protoutil.NewStructString("v2"), + }))}, + }) + assert.NotEqual(t, 0, serverVersion) + assert.NoError(t, err) + }) + + t.Run("capacity", func(t *testing.T) { + err := backend.SetOptions(ctx, "capacity-test", &databroker.Options{ + Capacity: proto.Uint64(3), + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + _, err = backend.Put(ctx, []*databroker.Record{{ + Type: "capacity-test", + Id: fmt.Sprint(i), + Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{})), + }}) + require.NoError(t, err) + } + + _, _, stream, err := backend.SyncLatest(ctx, "capacity-test", nil) + require.NoError(t, err) + defer stream.Close() + + records, err := storage.RecordStreamToList(stream) + require.NoError(t, err) + assert.Len(t, records, 3) + + var ids []string + for _, r := range records { + ids = append(ids, r.GetId()) + } + assert.Equal(t, []string{"7", "8", "9"}, ids, "should contain recent records") + }) + + t.Run("lease", func(t *testing.T) { + acquired, err := backend.Lease(ctx, "lease-test", "client-1", time.Second) + assert.NoError(t, err) + assert.True(t, acquired) + + acquired, err = backend.Lease(ctx, "lease-test", "client-2", time.Second) + assert.NoError(t, err) + assert.False(t, acquired) + }) + + t.Run("latest", func(t *testing.T) { + for i := 0; i < 100; i++ { + _, err := backend.Put(ctx, []*databroker.Record{{ + Type: "latest-test", + Id: fmt.Sprint(i), + Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{})), + }}) + require.NoError(t, err) + } + + _, _, stream, err := backend.SyncLatest(ctx, "latest-test", nil) + require.NoError(t, err) + defer stream.Close() + + count := map[string]int{} + + for stream.Next(true) { + count[stream.Record().GetId()]++ + } + assert.NoError(t, err) + + for i := 0; i < 100; i++ { + assert.Equal(t, 1, count[fmt.Sprint(i)]) + } + }) + + t.Run("changed", func(t *testing.T) { + serverVersion, recordVersion, stream, err := backend.SyncLatest(ctx, "", nil) + require.NoError(t, err) + assert.NoError(t, stream.Close()) + + stream, err = backend.Sync(ctx, serverVersion, recordVersion) + require.NoError(t, err) + defer stream.Close() + + go func() { + for i := 0; i < 10; i++ { + _, err := backend.Put(ctx, []*databroker.Record{{ + Type: "sync-test", + Id: fmt.Sprint(i), + Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{})), + }}) + assert.NoError(t, err) + time.Sleep(50 * time.Millisecond) + } + }() + + for i := 0; i < 10; i++ { + if assert.True(t, stream.Next(true)) { + assert.Equal(t, fmt.Sprint(i), stream.Record().GetId()) + assert.Equal(t, "sync-test", stream.Record().GetType()) + } else { + break + } + } + assert.False(t, stream.Next(false)) + assert.NoError(t, stream.Err()) + }) + + return nil + })) +} diff --git a/pkg/storage/postgres/filter.go b/pkg/storage/postgres/filter.go new file mode 100644 index 000000000..6df5d7596 --- /dev/null +++ b/pkg/storage/postgres/filter.go @@ -0,0 +1,51 @@ +package postgres + +import ( + "fmt" + "strings" + + "github.com/pomerium/pomerium/pkg/storage" +) + +func addFilterExpressionToQuery(query *string, args *[]interface{}, expr storage.FilterExpression) error { + compoundExpression := func(subexprs []storage.FilterExpression, op string) error { + *query += "( " + for i, subexpr := range subexprs { + if i > 0 { + *query += " " + op + " " + } + err := addFilterExpressionToQuery(query, args, subexpr) + if err != nil { + return err + } + } + *query += " )" + return nil + } + + switch expr := expr.(type) { + case storage.AndFilterExpression: + return compoundExpression(expr, "AND") + case storage.OrFilterExpression: + return compoundExpression(expr, "OR") + case storage.EqualsFilterExpression: + switch strings.Join(expr.Fields, ".") { + case "type": + *query += schemaName + "." + recordsTableName + ".type = " + fmt.Sprintf("$%d", len(*args)+1) + *args = append(*args, expr.Value) + return nil + case "id": + *query += schemaName + "." + recordsTableName + ".id = " + fmt.Sprintf("$%d", len(*args)+1) + *args = append(*args, expr.Value) + return nil + case "$index": + *query += schemaName + "." + recordsTableName + ".index_cidr >>= " + fmt.Sprintf("$%d", len(*args)+1) + *args = append(*args, expr.Value) + return nil + default: + return fmt.Errorf("unsupported equals filter: %v", expr.Fields) + } + default: + return fmt.Errorf("unsupported filter expression: %T", expr) + } +} diff --git a/pkg/storage/postgres/filter_test.go b/pkg/storage/postgres/filter_test.go new file mode 100644 index 000000000..fce2056a0 --- /dev/null +++ b/pkg/storage/postgres/filter_test.go @@ -0,0 +1,32 @@ +package postgres + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/pomerium/pomerium/pkg/storage" +) + +func TestAddFilterExpressionToQuery(t *testing.T) { + query := "" + args := []any{} + addFilterExpressionToQuery(&query, &args, storage.AndFilterExpression{ + storage.OrFilterExpression{ + storage.EqualsFilterExpression{ + Fields: []string{"id"}, + Value: "v1", + }, + storage.EqualsFilterExpression{ + Fields: []string{"$index"}, + Value: "v2", + }, + }, + storage.EqualsFilterExpression{ + Fields: []string{"type"}, + Value: "v3", + }, + }) + assert.Equal(t, "( ( pomerium.records.id = $1 OR pomerium.records.index_cidr >>= $2 ) AND pomerium.records.type = $3 )", query) + assert.Equal(t, []any{"v1", "v2", "v3"}, args) +} diff --git a/pkg/storage/postgres/migrate.go b/pkg/storage/postgres/migrate.go new file mode 100644 index 000000000..fb22fc768 --- /dev/null +++ b/pkg/storage/postgres/migrate.go @@ -0,0 +1,131 @@ +package postgres + +import ( + "context" + "errors" + + "github.com/jackc/pgx/v4" + + "github.com/pomerium/pomerium/pkg/cryptutil" +) + +var migrations = []func(context.Context, pgx.Tx) error{ + 1: func(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, ` + CREATE TABLE `+schemaName+`.`+recordsTableName+` ( + type TEXT NOT NULL, + id TEXT NOT NULL, + version BIGINT NOT NULL, + data JSONB NOT NULL, + modified_at TIMESTAMPTZ NOT NULL DEFAULT(NOW()), + + index_cidr INET NULL, + + PRIMARY KEY (type, id) + ) + `) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, ` + CREATE INDEX ON `+schemaName+`.`+recordsTableName+` + USING gist (index_cidr inet_ops); + `) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, ` + CREATE TABLE `+schemaName+`.`+recordChangesTableName+` ( + type TEXT NOT NULL, + id TEXT NOT NULL, + version BIGINT NOT NULL, + data JSONB NOT NULL, + modified_at TIMESTAMPTZ NOT NULL, + deleted_at TIMESTAMPTZ NULL, + + PRIMARY KEY (version) + ) + `) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, ` + CREATE TABLE `+schemaName+`.`+recordOptionsTableName+` ( + type TEXT NOT NULL, + capacity BIGINT NULL, + + PRIMARY KEY (type) + ) + `) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, ` + CREATE TABLE `+schemaName+`.`+leasesTableName+` ( + name TEXT NOT NULL, + id TEXT NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + + PRIMARY KEY (name) + ) + `) + if err != nil { + return err + } + + return nil + }, +} + +func migrate(ctx context.Context, tx pgx.Tx) (serverVersion uint64, err error) { + _, err = tx.Exec(ctx, `CREATE SCHEMA IF NOT EXISTS `+schemaName) + if err != nil { + return serverVersion, err + } + + _, err = tx.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS `+schemaName+`.`+migrationInfoTableName+` ( + server_version BIGINT NOT NULL, + migration_version SMALLINT NOT NULL + ) + `) + if err != nil { + return serverVersion, err + } + + var migrationVersion uint64 + err = tx.QueryRow(ctx, ` + SELECT server_version, migration_version + FROM `+schemaName+`.migration_info + `).Scan(&serverVersion, &migrationVersion) + if errors.Is(err, pgx.ErrNoRows) { + serverVersion = uint64(cryptutil.NewRandomUInt32()) // we can't actually store a uint64, just an int64, so just generate a uint32 + _, err = tx.Exec(ctx, ` + INSERT INTO `+schemaName+`.`+migrationInfoTableName+` (server_version, migration_version) + VALUES ($1, $2) + `, serverVersion, 0) + } + if err != nil { + return serverVersion, err + } + + for version := migrationVersion + 1; version < uint64(len(migrations)); version++ { + err = migrations[version](ctx, tx) + if err != nil { + return serverVersion, err + } + _, err = tx.Exec(ctx, ` + UPDATE `+schemaName+`.`+migrationInfoTableName+` + SET migration_version = $1 + `, version) + if err != nil { + return serverVersion, err + } + } + + return serverVersion, nil +} diff --git a/pkg/storage/postgres/option.go b/pkg/storage/postgres/option.go new file mode 100644 index 000000000..6626f3b56 --- /dev/null +++ b/pkg/storage/postgres/option.go @@ -0,0 +1,30 @@ +package postgres + +import ( + "time" +) + +const defaultExpiry = time.Hour * 24 + +type config struct { + expiry time.Duration +} + +// Option customizes a Backend. +type Option func(*config) + +// WithExpiry sets the expiry for changes. +func WithExpiry(expiry time.Duration) Option { + return func(cfg *config) { + cfg.expiry = expiry + } +} + +func getConfig(options ...Option) *config { + cfg := new(config) + WithExpiry(defaultExpiry)(cfg) + for _, o := range options { + o(cfg) + } + return cfg +} diff --git a/pkg/storage/postgres/postgres.go b/pkg/storage/postgres/postgres.go new file mode 100644 index 000000000..da8b55ea0 --- /dev/null +++ b/pkg/storage/postgres/postgres.go @@ -0,0 +1,323 @@ +// Package postgres contains an implementation of the storage.Backend backed by postgres. +package postgres + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgconn" + "github.com/jackc/pgtype" + "github.com/jackc/pgx/v4" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/storage" +) + +var ( + schemaName = "pomerium" + migrationInfoTableName = "migration_info" + recordsTableName = "records" + recordChangesTableName = "record_changes" + recordChangeNotifyName = "pomerium_record_change" + recordOptionsTableName = "record_options" + leasesTableName = "leases" +) + +type querier interface { + Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error) + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row +} + +func deleteChangesBefore(ctx context.Context, q querier, cutoff time.Time) error { + _, err := q.Exec(ctx, ` + DELETE FROM `+schemaName+`.`+recordChangesTableName+` + WHERE modified_at < $1 + `, cutoff) + return err +} + +func dup(record *databroker.Record) *databroker.Record { + return proto.Clone(record).(*databroker.Record) +} + +func enforceOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error { + if options == nil || options.Capacity == nil { + return nil + } + + _, err := q.Exec(ctx, ` + DELETE FROM `+schemaName+`.`+recordsTableName+` + WHERE type=$1 + AND id NOT IN ( + SELECT id + FROM `+schemaName+`.`+recordsTableName+` + WHERE type=$1 + ORDER BY version DESC + LIMIT $2 + ) + `, recordType, options.GetCapacity()) + return err +} + +func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint64, err error) { + err = q.QueryRow(ctx, ` + SELECT version + FROM `+schemaName+`.`+recordChangesTableName+` + ORDER BY version DESC + LIMIT 1 + `).Scan(&recordVersion) + if isNotFound(err) { + err = nil + } + return recordVersion, err +} + +func getNextChangedRecord(ctx context.Context, q querier, afterRecordVersion uint64) (*databroker.Record, error) { + var recordType, recordID string + var version uint64 + var data pgtype.JSONB + var modifiedAt pgtype.Timestamptz + var deletedAt pgtype.Timestamptz + err := q.QueryRow(ctx, ` + SELECT type, id, version, data, modified_at, deleted_at + FROM `+schemaName+`.`+recordChangesTableName+` + WHERE version > $1 + `, afterRecordVersion).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) + if isNotFound(err) { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, err + } + + var any anypb.Any + err = protojson.Unmarshal(data.Bytes, &any) + if err != nil { + return nil, err + } + + return &databroker.Record{ + Version: version, + Type: recordType, + Id: recordID, + Data: &any, + ModifiedAt: timestamppbFromTimestamptz(modifiedAt), + DeletedAt: timestamppbFromTimestamptz(deletedAt), + }, nil +} + +func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) { + var capacity pgtype.Int8 + err := q.QueryRow(ctx, ` + SELECT capacity + FROM `+schemaName+`.`+recordOptionsTableName+` + WHERE type=$1 + `, recordType).Scan(&capacity) + if err != nil && !isNotFound(err) { + return nil, err + } + options := new(databroker.Options) + if capacity.Status == pgtype.Present { + options.Capacity = proto.Uint64(uint64(capacity.Int)) + } + return options, nil +} + +func getRecord(ctx context.Context, q querier, recordType, recordID string) (*databroker.Record, error) { + var version uint64 + var data pgtype.JSONB + var modifiedAt pgtype.Timestamptz + err := q.QueryRow(ctx, ` + SELECT version, data, modified_at + FROM `+schemaName+`.`+recordsTableName+` + WHERE type=$1 AND id=$2 + `, recordType, recordID).Scan(&version, &data, &modifiedAt) + if isNotFound(err) { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, err + } + + var any anypb.Any + err = protojson.Unmarshal(data.Bytes, &any) + if err != nil { + return nil, err + } + + return &databroker.Record{ + Version: version, + Type: recordType, + Id: recordID, + Data: &any, + ModifiedAt: timestamppbFromTimestamptz(modifiedAt), + }, nil +} + +func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, offset, limit int) ([]*databroker.Record, error) { + args := []interface{}{offset, limit} + query := ` + SELECT type, id, version, data, modified_at + FROM ` + schemaName + `.` + recordsTableName + ` + ` + if expr != nil { + query += "WHERE " + err := addFilterExpressionToQuery(&query, &args, expr) + if err != nil { + return nil, err + } + } + query += ` + ORDER BY type, id + LIMIT $2 + OFFSET $1 + ` + rows, err := q.Query(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []*databroker.Record + for rows.Next() { + var recordType, id string + var version uint64 + var data pgtype.JSONB + var modifiedAt pgtype.Timestamptz + err = rows.Scan(&recordType, &id, &version, &data, &modifiedAt) + if err != nil { + return nil, err + } + + var any anypb.Any + err = protojson.Unmarshal(data.Bytes, &any) + if err != nil { + return nil, err + } + + records = append(records, &databroker.Record{ + Version: version, + Type: recordType, + Id: id, + Data: &any, + ModifiedAt: timestamppbFromTimestamptz(modifiedAt), + }) + } + return records, rows.Err() +} + +func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) { + tbl := schemaName + "." + leasesTableName + expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl))) + now := timestamptzFromTimestamppb(timestamppb.Now()) + err = q.QueryRow(ctx, ` + INSERT INTO `+tbl+` (name, id, expires_at) + VALUES ($1, $2, $3) + ON CONFLICT (name) DO UPDATE + SET id=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $2 ELSE `+tbl+`.id END, + expires_at=CASE WHEN `+tbl+`.expires_at<$4 OR `+tbl+`.id=$2 THEN $3 ELSE `+tbl+`.expires_at END + RETURNING `+tbl+`.id + `, leaseName, leaseID, expiresAt, now).Scan(&leaseHolderID) + return leaseHolderID, err +} + +func putRecordChange(ctx context.Context, q querier, record *databroker.Record) error { + data, err := jsonbFromAny(record.GetData()) + if err != nil { + return err + } + + modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt()) + deletedAt := timestamptzFromTimestamppb(record.GetDeletedAt()) + _, err = q.Exec(ctx, ` + INSERT INTO `+schemaName+`.`+recordChangesTableName+` (type, id, version, data, modified_at, deleted_at) + VALUES ($1, $2, $3, $4, $5, $6) + `, record.GetType(), record.GetId(), record.GetVersion(), data, modifiedAt, deletedAt) + if err != nil { + return err + } + + return nil +} + +func putRecord(ctx context.Context, q querier, record *databroker.Record) error { + data, err := jsonbFromAny(record.GetData()) + if err != nil { + return err + } + + modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt()) + if record.GetDeletedAt() == nil { + _, err = q.Exec(ctx, ` + INSERT INTO `+schemaName+`.`+recordsTableName+` (type, id, version, data, modified_at) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (type, id) DO UPDATE + SET version=$3, data=$4, modified_at=$5 + `, record.GetType(), record.GetId(), record.GetVersion(), data, modifiedAt) + } else { + _, err = q.Exec(ctx, ` + DELETE FROM `+schemaName+`.`+recordsTableName+` + WHERE type=$1 AND id=$2 AND version<$3 + `, record.GetType(), record.GetId(), record.GetVersion()) + } + if err != nil { + return err + } + return nil +} + +func setOptions(ctx context.Context, q querier, recordType string, options *databroker.Options) error { + capacity := pgtype.Int8{Status: pgtype.Null} + if options != nil && options.Capacity != nil { + capacity.Int = int64(options.GetCapacity()) + capacity.Status = pgtype.Present + } + + _, err := q.Exec(ctx, ` + INSERT INTO `+schemaName+`.`+recordOptionsTableName+` (type, capacity) + VALUES ($1, $2) + ON CONFLICT (type) DO UPDATE + SET capacity=$2 + `, recordType, capacity) + return err +} + +func signalRecordChange(ctx context.Context, q querier) error { + _, err := q.Exec(ctx, `NOTIFY `+recordChangeNotifyName) + return err +} + +func jsonbFromAny(any *anypb.Any) (pgtype.JSONB, error) { + if any == nil { + return pgtype.JSONB{Status: pgtype.Null}, nil + } + + bs, err := protojson.Marshal(any) + if err != nil { + return pgtype.JSONB{Status: pgtype.Null}, err + } + + return pgtype.JSONB{Bytes: bs, Status: pgtype.Present}, nil +} + +func timestamppbFromTimestamptz(ts pgtype.Timestamptz) *timestamppb.Timestamp { + if ts.Status != pgtype.Present { + return nil + } + return timestamppb.New(ts.Time) +} + +func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz { + if !ts.IsValid() { + return pgtype.Timestamptz{Status: pgtype.Null} + } + return pgtype.Timestamptz{Time: ts.AsTime(), Status: pgtype.Present} +} + +func isNotFound(err error) bool { + return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound) +} diff --git a/pkg/storage/postgres/stream.go b/pkg/storage/postgres/stream.go new file mode 100644 index 000000000..17945c82a --- /dev/null +++ b/pkg/storage/postgres/stream.go @@ -0,0 +1,165 @@ +package postgres + +import ( + "context" + "time" + + "github.com/jackc/pgx/v4/pgxpool" + + "github.com/pomerium/pomerium/pkg/contextutil" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/storage" +) + +const recordBatchSize = 64 + +type recordStream struct { + backend *Backend + expr storage.FilterExpression + + ctx context.Context + cancel context.CancelFunc + offset int + pending []*databroker.Record + err error +} + +func newRecordStream( + ctx context.Context, + backend *Backend, + expr storage.FilterExpression, +) *recordStream { + stream := &recordStream{ + backend: backend, + expr: expr, + } + stream.ctx, stream.cancel = contextutil.Merge(ctx, backend.closeCtx) + return stream +} + +func (stream *recordStream) Close() error { + stream.cancel() + return nil +} + +func (stream *recordStream) Next(block bool) bool { + if stream.err != nil { + return false + } + + if len(stream.pending) > 1 { + stream.pending = stream.pending[1:] + return true + } + + var pool *pgxpool.Pool + _, pool, stream.err = stream.backend.init(stream.ctx) + if stream.err != nil { + return false + } + + stream.pending, stream.err = listRecords(stream.ctx, pool, stream.expr, stream.offset, recordBatchSize) + if stream.err != nil { + return false + } + stream.offset += recordBatchSize + + return len(stream.pending) > 0 +} + +func (stream *recordStream) Record() *databroker.Record { + if len(stream.pending) == 0 { + return nil + } + return stream.pending[0] +} + +func (stream *recordStream) Err() error { + return stream.err +} + +const watchPollInterval = 30 * time.Second + +type changedRecordStream struct { + backend *Backend + recordVersion uint64 + + ctx context.Context + cancel context.CancelFunc + record *databroker.Record + err error + ticker *time.Ticker + changed chan context.Context +} + +func newChangedRecordStream( + ctx context.Context, + backend *Backend, + recordVersion uint64, +) storage.RecordStream { + stream := &changedRecordStream{ + backend: backend, + recordVersion: recordVersion, + ticker: time.NewTicker(watchPollInterval), + changed: backend.onChange.Bind(), + } + stream.ctx, stream.cancel = contextutil.Merge(ctx, backend.closeCtx) + return stream +} + +func (stream *changedRecordStream) Close() error { + stream.cancel() + stream.ticker.Stop() + stream.backend.onChange.Unbind(stream.changed) + return nil +} + +func (stream *changedRecordStream) Next(block bool) bool { + for { + if stream.err != nil { + return false + } + + var pool *pgxpool.Pool + _, pool, stream.err = stream.backend.init(stream.ctx) + if stream.err != nil { + return false + } + + stream.record, stream.err = getNextChangedRecord( + stream.ctx, + pool, + stream.recordVersion, + ) + if isNotFound(stream.err) { + stream.err = nil + } else if stream.err != nil { + return false + } + + if stream.record != nil { + stream.recordVersion = stream.record.GetVersion() + return true + } + + if !block { + return false + } + + select { + case <-stream.ctx.Done(): + stream.err = stream.ctx.Err() + return false + case <-stream.ticker.C: + case <-stream.changed: + } + } +} + +func (stream *changedRecordStream) Record() *databroker.Record { + return stream.record +} + +func (stream *changedRecordStream) Err() error { + return stream.err +} diff --git a/pkg/storage/stream.go b/pkg/storage/stream.go index 732cf451d..a9531c3e3 100644 --- a/pkg/storage/stream.go +++ b/pkg/storage/stream.go @@ -118,3 +118,57 @@ func RecordListToStream(ctx context.Context, records []*databroker.Record) Recor }, }, nil) } + +type concatenatedRecordStream struct { + streams []RecordStream + index int +} + +// NewConcatenatedRecordStream creates a new record stream that streams all the records from the +// first stream before streaming all the records of the subsequent streams. +func NewConcatenatedRecordStream(streams ...RecordStream) RecordStream { + return &concatenatedRecordStream{ + streams: streams, + } +} + +func (stream *concatenatedRecordStream) Close() error { + var err error + for _, s := range stream.streams { + if e := s.Close(); e != nil { + err = e + } + } + return err +} + +func (stream *concatenatedRecordStream) Next(block bool) bool { + for { + if stream.index >= len(stream.streams) { + return false + } + + if stream.streams[stream.index].Next(block) { + return true + } + + if stream.streams[stream.index].Err() != nil { + return false + } + stream.index++ + } +} + +func (stream *concatenatedRecordStream) Record() *databroker.Record { + if stream.index >= len(stream.streams) { + return nil + } + return stream.streams[stream.index].Record() +} + +func (stream *concatenatedRecordStream) Err() error { + if stream.index >= len(stream.streams) { + return nil + } + return stream.streams[stream.index].Err() +}