Skip to content
Snippets Groups Projects
Commit 5d8b3372 authored by Alexander Kovalev's avatar Alexander Kovalev
Browse files

Merge branch 'scheduled-check-0.28' into 'release/v0.28'

check unread messages on schedule

See merge request mashroom/backend/gmailparser!21
parents 5720b190 d8467b3b
Branches
Tags
No related merge requests found
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
"net" "net"
"os" "os"
"regexp" "regexp"
"time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
...@@ -110,6 +111,13 @@ var ( ...@@ -110,6 +111,13 @@ var (
} }
) )
var (
envScheduleInterval = env.ResolveDuration(
"SCHEDULE_INTERVAL",
time.Minute*5,
)
)
func main() { func main() {
defer tracer.Close() defer tracer.Close()
...@@ -211,6 +219,7 @@ func main() { ...@@ -211,6 +219,7 @@ func main() {
Mixpanel: envMixpanelService, Mixpanel: envMixpanelService,
Account: envAccountService, Account: envAccountService,
}, },
ScheduleInterval: envScheduleInterval,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"errors" "errors"
"regexp" "regexp"
"strings" "strings"
"time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
account "gitlab.mashroom.com/mashroom/backend/account/api" account "gitlab.mashroom.com/mashroom/backend/account/api"
...@@ -95,6 +96,7 @@ type Config struct { ...@@ -95,6 +96,7 @@ type Config struct {
GmailService *gmail.Service GmailService *gmail.Service
PubsubClient *pubsub.Client PubsubClient *pubsub.Client
Connections Connections Connections Connections
ScheduleInterval time.Duration
} }
// Parser implementation // Parser implementation
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"time"
"cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub"
"github.com/nyaruka/phonenumbers" "github.com/nyaruka/phonenumbers"
...@@ -123,6 +124,10 @@ func (p *Parser) processUnread( ...@@ -123,6 +124,10 @@ func (p *Parser) processUnread(
return return
} }
if len(h.Messages) == 0 {
return
}
log.Infof("%d unread message(s) for '%s' label", len(h.Messages), v.labelID) log.Infof("%d unread message(s) for '%s' label", len(h.Messages), v.labelID)
for _, m := range h.Messages { for _, m := range h.Messages {
...@@ -353,6 +358,12 @@ func (p *Parser) handleSubscription(ctx context.Context, v *Subscription) error ...@@ -353,6 +358,12 @@ func (p *Parser) handleSubscription(ctx context.Context, v *Subscription) error
} }
}() }()
go func() {
for range time.Tick(p.ScheduleInterval) {
cond.Broadcast()
}
}()
go func() { go func() {
err = p.PubsubClient.Subscription(v.subscription).Receive(ctx, func(ctx context.Context, m *pubsub.Message) { err = p.PubsubClient.Subscription(v.subscription).Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
var payload notificationPayload var payload notificationPayload
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment