backend

package
v0.0.0-...-f5b6858 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2019 License: MIT Imports: 19 Imported by: 8

Documentation

Overview

Package backend is a generated protocol buffer package.

It is generated from these files:

meta.proto

It has these top-level messages:

StoreTaskMeta
StoreTaskMetaRun
StoreTaskMetaManualRun

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthMeta = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowMeta   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrManualQueueFull = errors.New("manual queue at capacity")

ErrManualQueueFull is returned when a manual run request cannot be completed.

View Source
var ErrOrgNotFound = errors.New("org not found")

ErrOrgNotFound is an error for when we can't find an org

View Source
var ErrRunCanceled = errors.New("run canceled")
View Source
var ErrRunNotFound error = errors.New("run not found")
View Source
var ErrTaskNameTaken = errors.New("task name already in use by current user or target organization")

ErrTaskNameTaken is an error for when a task name is already taken

View Source
var ErrTaskNotClaimed = errors.New("task not claimed")
View Source
var ErrUserNotFound = errors.New("user not found")

ErrUserNotFound is an error for when we can't find a user

Functions

func NewInMemRunReaderWriter

func NewInMemRunReaderWriter() *runReaderWriter

Types

type DesiredState

type DesiredState interface {
	// CreateNextRun requests the next run from the desired state, delegating to (*StoreTaskMeta).CreateNextRun.
	// This allows the scheduler to be "dumb" and just tell DesiredState what time the scheduler thinks it is,
	// and the DesiredState will create the appropriate run according to the task's cron schedule,
	// and according to what's in progress and what's been finished.
	//
	// If a Run is requested and the cron schedule says the schedule isn't ready, a RunNotYetDueError is returned.
	CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)

	// FinishRun indicates that the given run is no longer intended to be executed.
	// This may be called after a successful or failed execution, or upon cancellation.
	FinishRun(ctx context.Context, taskID, runID platform.ID) error
}

DesiredState persists the desired state of a run.

type Executor

type Executor interface {
	// Execute attempts to begin execution of a run.
	// If there is an error invoking execution, that error is returned and RunPromise is nil.
	// TODO(mr): this assumes you can execute a run just from a taskID and a now time.
	// We may need to include the script content in this method signature.
	Execute(ctx context.Context, run QueuedRun) (RunPromise, error)
}

Executor handles execution of a run.

type LogReader

type LogReader interface {
	// ListRuns returns a list of runs belonging to a task.
	ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)

	// FindRunByID finds a run given a taskID and runID.
	FindRunByID(ctx context.Context, orgID, taskID, runID platform.ID) (*platform.Run, error)

	// ListLogs lists logs for a task or a specified run of a task.
	ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)
}

LogReader reads log information and log data from a store.

type LogWriter

type LogWriter interface {
	// UpdateRunState sets the run state and the respective time.
	UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error

	// AddRunLog adds a log line to the run.
	AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error
}

LogWriter writes task logs and task state changes to a store.

type NopLogReader

type NopLogReader struct{}

NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.

func (NopLogReader) FindRunByID

func (NopLogReader) FindRunByID(ctx context.Context, orgID, taskID, runID platform.ID) (*platform.Run, error)

func (NopLogReader) ListLogs

func (NopLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)

func (NopLogReader) ListRuns

func (NopLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)

type NopLogWriter

type NopLogWriter struct{}

NopLogWriter is a LogWriter that doesn't do anything when its methods are called. This is useful for test, but not much else.

func (NopLogWriter) AddRunLog

func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error

func (NopLogWriter) UpdateRunState

func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error

type QueuedRun

type QueuedRun struct {
	TaskID, RunID platform.ID

	// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set
	// as the "now" option when executing the task.
	Now int64
}

QueuedRun is a task run that has been assigned an ID, but whose execution has not necessarily started.

type RunCreation

type RunCreation struct {
	Created QueuedRun

	// Unix timestamp for when the next run is due.
	NextDue int64

	// Whether there are any manual runs queued for this task.
	// If so, the scheduler should begin executing them after handling real-time tasks.
	HasQueue bool
}

RunCreation is returned by CreateNextRun.

type RunNotYetDueError

type RunNotYetDueError struct {
	// DueAt is the unix timestamp of when the next run is due.
	DueAt int64
}

RunNotYetDueError is returned from CreateNextRun if a run is not yet due.

func (RunNotYetDueError) Error

func (e RunNotYetDueError) Error() string

type RunPromise

type RunPromise interface {
	// Run returns the details about the queued run.
	Run() QueuedRun

	// Wait blocks until the run completes.
	// Wait may be called concurrently.
	// Subsequent calls to Wait will return identical values.
	Wait() (RunResult, error)

	// Cancel interrupts the RunFuture.
	// Calls to Wait() will immediately unblock and return nil, ErrRunCanceled.
	// Cancel is safe to call concurrently.
	// If Wait() has already returned, Cancel is a no-op.
	Cancel()
}

RunPromise represents an in-progress run whose result is not yet known.

type RunResult

type RunResult interface {
	// If the run did not succeed, Err returns the error associated with the run.
	Err() error

	// IsRetryable returns true if the error was non-terminal and the run is eligible for retry.
	IsRetryable() bool
}

type RunStatus

type RunStatus int
const (
	RunScheduled RunStatus = iota
	RunStarted
	RunSuccess
	RunFail
	RunCanceled
)

func (RunStatus) String

func (r RunStatus) String() string

type Scheduler

type Scheduler interface {
	// Tick updates the time of the scheduler.
	// Any owned tasks who are due to execute and who have a free concurrency slot,
	// will begin a new execution.
	Tick(now int64)

	// ClaimTask begins control of task execution in this scheduler.
	ClaimTask(task *StoreTask, meta *StoreTaskMeta) error

	// ReleaseTask immediately cancels any in-progress runs for the given task ID,
	// and releases any resources related to management of that task.
	ReleaseTask(taskID platform.ID) error
}

Scheduler accepts tasks and handles their scheduling.

TODO(mr): right now the methods on Scheduler are synchronous. We'll probably want to make them asynchronous in the near future, which likely means we will change the method signatures to something where we can wait for the result to complete and possibly inspect any relevant output.

func NewScheduler

func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...SchedulerOption) Scheduler

NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp.

type SchedulerOption

type SchedulerOption func(Scheduler)

func WithLogger

func WithLogger(logger *zap.Logger) SchedulerOption

WithLogger sets the logger for the scheduler. If not set, the scheduler will use a no-op logger.

func WithTicker

func WithTicker(ctx context.Context, d time.Duration) SchedulerOption

type Store

type Store interface {
	// CreateTask creates a task with the given script, belonging to the given org and user.
	// The scheduleAfter parameter is a Unix timestamp (seconds elapsed since January 1, 1970 UTC),
	// and the first run of the task will be run according to the earliest time after scheduleAfter,
	// matching the task's schedule via its cron or every option.
	CreateTask(ctx context.Context, org, user platform.ID, script string, scheduleAfter int64) (platform.ID, error)

	// ModifyTask updates the script of an existing task.
	// It returns an error if there was no task matching the given ID.
	ModifyTask(ctx context.Context, id platform.ID, newScript string) error

	// ListTasks lists the tasks in the store that match the search params.
	ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTask, error)

	// FindTaskByID returns the task with the given ID.
	// If no task matches the ID, the returned task is nil.
	FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error)

	// EnableTask updates task status to enabled.
	EnableTask(ctx context.Context, id platform.ID) error

	// disableTask updates task status to disabled.
	DisableTask(ctx context.Context, id platform.ID) error

	// FindTaskMetaByID returns the metadata about a task.
	FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error)

	// DeleteTask returns whether an entry matching the given ID was deleted.
	// If err is non-nil, deleted is false.
	// If err is nil, deleted is false if no entry matched the ID,
	// or deleted is true if there was a matching entry and it was deleted.
	DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error)

	// CreateNextRun creates the earliest needed run scheduled no later than the given Unix timestamp now.
	// Internally, the Store should rely on the underlying task's StoreTaskMeta to create the next run.
	CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)

	// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
	FinishRun(ctx context.Context, taskID, runID platform.ID) error

	// ManuallyRunTimeRange enqueues a request to run the task with the given ID for all schedules no earlier than start and no later than end (Unix timestamps).
	// requestedAt is the Unix timestamp when the request was initiated.
	// ManuallyRunTimeRange must delegate to an underlying StoreTaskMeta's ManuallyRunTimeRange method.
	ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) error

	// DeleteOrg deletes the org.
	DeleteOrg(ctx context.Context, orgID platform.ID) error

	// DeleteUser deletes a user with userID.
	DeleteUser(ctx context.Context, userID platform.ID) error

	// Close closes the store for usage and cleans up running processes.
	Close() error
}

Store is the interface around persisted tasks.

func NewInMemStore

func NewInMemStore() Store

NewInMemStore returns a new in-memory store. This store is not designed to be efficient, it is here for testing purposes.

type StoreTask

type StoreTask struct {
	ID platform.ID

	// IDs for the owning organization and user.
	Org, User platform.ID

	// The user-supplied name of the Task.
	Name string

	// The script content of the task.
	Script string
}

StoreTask is a stored representation of a Task.

type StoreTaskMeta

type StoreTaskMeta struct {
	MaxConcurrency int32 `protobuf:"varint,1,opt,name=max_concurrency,json=maxConcurrency,proto3" json:"max_concurrency,omitempty"`
	// latest_completed is the unix timestamp of the latest "naturally" completed run.
	// If a run for time t finishes before a run for time t - u, latest_completed will reflect time t.
	LatestCompleted int64 `protobuf:"varint,2,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"`
	// status indicates if the task is enabled or disabled.
	Status string `protobuf:"bytes,3,opt,name=status,proto3" json:"status,omitempty"`
	// currently_running is the collection of runs in-progress.
	// If a runner crashes or otherwise disappears, this indicates to the new runner what needs to be picked up.
	CurrentlyRunning []*StoreTaskMetaRun `protobuf:"bytes,4,rep,name=currently_running,json=currentlyRunning" json:"currently_running,omitempty"`
	// effective_cron is the effective cron string as reported by the task's options.
	EffectiveCron string `protobuf:"bytes,5,opt,name=effective_cron,json=effectiveCron,proto3" json:"effective_cron,omitempty"`
	// Task's configured delay, in seconds.
	Delay      int32                     `protobuf:"varint,6,opt,name=delay,proto3" json:"delay,omitempty"`
	ManualRuns []*StoreTaskMetaManualRun `protobuf:"bytes,16,rep,name=manual_runs,json=manualRuns" json:"manual_runs,omitempty"`
}

StoreTaskMeta is the internal state of a task.

func (*StoreTaskMeta) CreateNextRun

func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, error)) (RunCreation, error)

CreateNextRun attempts to update stm's CurrentlyRunning slice with a new run. The new run's now is assigned the earliest possible time according to stm.EffectiveCron, that is later than any in-progress run and stm's LatestCompleted timestamp. If the run's now would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.

makeID is a function provided by the caller to create an ID, in case we can create a run. Because a StoreTaskMeta doesn't know the ID of the task it belongs to, it never sets RunCreation.Created.TaskID.

func (*StoreTaskMeta) Descriptor

func (*StoreTaskMeta) Descriptor() ([]byte, []int)

func (*StoreTaskMeta) FinishRun

func (stm *StoreTaskMeta) FinishRun(runID platform.ID) bool

FinishRun removes the run matching runID from m's CurrentlyRunning slice, and if that run's Now value is greater than m's LatestCompleted value, updates the value of LatestCompleted to the run's Now value.

If runID matched a run, FinishRun returns true. Otherwise it returns false.

func (*StoreTaskMeta) GetCurrentlyRunning

func (m *StoreTaskMeta) GetCurrentlyRunning() []*StoreTaskMetaRun

func (*StoreTaskMeta) GetDelay

func (m *StoreTaskMeta) GetDelay() int32

func (*StoreTaskMeta) GetEffectiveCron

func (m *StoreTaskMeta) GetEffectiveCron() string

func (*StoreTaskMeta) GetLatestCompleted

func (m *StoreTaskMeta) GetLatestCompleted() int64

func (*StoreTaskMeta) GetManualRuns

func (m *StoreTaskMeta) GetManualRuns() []*StoreTaskMetaManualRun

func (*StoreTaskMeta) GetMaxConcurrency

func (m *StoreTaskMeta) GetMaxConcurrency() int32

func (*StoreTaskMeta) GetStatus

func (m *StoreTaskMeta) GetStatus() string

func (*StoreTaskMeta) ManuallyRunTimeRange

func (stm *StoreTaskMeta) ManuallyRunTimeRange(start, end, requestedAt int64) error

ManuallyRunTimeRange requests a manual run covering the approximate range specified by the Unix timestamps start and end. More specifically, it requests runs scheduled no earlier than start, but possibly later than start, if start does not land on the task's schedule; and as late as, but not necessarily equal to, end. requestedAt is the Unix timestamp indicating when this run range was requested.

If adding the range would exceed the queue size, ManuallyRunTimeRange returns ErrManualQueueFull.

func (*StoreTaskMeta) Marshal

func (m *StoreTaskMeta) Marshal() (dAtA []byte, err error)

func (*StoreTaskMeta) MarshalTo

func (m *StoreTaskMeta) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMeta) NextDueRun

func (stm *StoreTaskMeta) NextDueRun() (int64, error)

NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready. The returned timestamp reflects the task's delay, so it does not necessarily exactly match the schedule time.

func (*StoreTaskMeta) ProtoMessage

func (*StoreTaskMeta) ProtoMessage()

func (*StoreTaskMeta) Reset

func (m *StoreTaskMeta) Reset()

func (*StoreTaskMeta) Size

func (m *StoreTaskMeta) Size() (n int)

func (*StoreTaskMeta) String

func (m *StoreTaskMeta) String() string

func (*StoreTaskMeta) Unmarshal

func (m *StoreTaskMeta) Unmarshal(dAtA []byte) error

type StoreTaskMetaManualRun

type StoreTaskMetaManualRun struct {
	// start is the earliest allowable unix time stamp for this queue of runs.
	Start int64 `protobuf:"varint,1,opt,name=start,proto3" json:"start,omitempty"`
	// end is the latest allowable unix time stamp for this queue of runs.
	End int64 `protobuf:"varint,2,opt,name=end,proto3" json:"end,omitempty"`
	// latest_completed is the timestamp of the latest completed run from this queue.
	LatestCompleted int64 `protobuf:"varint,3,opt,name=latest_completed,json=latestCompleted,proto3" json:"latest_completed,omitempty"`
	// requested_at is the unix timestamp indicating when this run was requested.
	RequestedAt int64 `protobuf:"varint,4,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
}

StoreTaskMetaManualRun indicates a manually requested run for a time range. It has a start and end pair of unix timestamps indicating the time range covered by the request.

func (*StoreTaskMetaManualRun) Descriptor

func (*StoreTaskMetaManualRun) Descriptor() ([]byte, []int)

func (*StoreTaskMetaManualRun) GetEnd

func (m *StoreTaskMetaManualRun) GetEnd() int64

func (*StoreTaskMetaManualRun) GetLatestCompleted

func (m *StoreTaskMetaManualRun) GetLatestCompleted() int64

func (*StoreTaskMetaManualRun) GetRequestedAt

func (m *StoreTaskMetaManualRun) GetRequestedAt() int64

func (*StoreTaskMetaManualRun) GetStart

func (m *StoreTaskMetaManualRun) GetStart() int64

func (*StoreTaskMetaManualRun) Marshal

func (m *StoreTaskMetaManualRun) Marshal() (dAtA []byte, err error)

func (*StoreTaskMetaManualRun) MarshalTo

func (m *StoreTaskMetaManualRun) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMetaManualRun) ProtoMessage

func (*StoreTaskMetaManualRun) ProtoMessage()

func (*StoreTaskMetaManualRun) Reset

func (m *StoreTaskMetaManualRun) Reset()

func (*StoreTaskMetaManualRun) Size

func (m *StoreTaskMetaManualRun) Size() (n int)

func (*StoreTaskMetaManualRun) String

func (m *StoreTaskMetaManualRun) String() string

func (*StoreTaskMetaManualRun) Unmarshal

func (m *StoreTaskMetaManualRun) Unmarshal(dAtA []byte) error

type StoreTaskMetaRun

type StoreTaskMetaRun struct {
	// now is the unix timestamp of the "now" value for the run.
	Now   int64  `protobuf:"varint,1,opt,name=now,proto3" json:"now,omitempty"`
	Try   uint32 `protobuf:"varint,2,opt,name=try,proto3" json:"try,omitempty"`
	RunID []byte `protobuf:"bytes,3,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	// range_start is the start of the manual run's time range.
	RangeStart int64 `protobuf:"varint,4,opt,name=range_start,json=rangeStart,proto3" json:"range_start,omitempty"`
	// range_end is the end of the manual run's time range.
	RangeEnd int64 `protobuf:"varint,5,opt,name=range_end,json=rangeEnd,proto3" json:"range_end,omitempty"`
	// requested_at is the unix timestamp indicating when this run was requested.
	// It is the same value as the "parent" StoreTaskMetaManualRun, if this run was the result of a manual request.
	RequestedAt int64 `protobuf:"varint,6,opt,name=requested_at,json=requestedAt,proto3" json:"requested_at,omitempty"`
}

func (*StoreTaskMetaRun) Descriptor

func (*StoreTaskMetaRun) Descriptor() ([]byte, []int)

func (*StoreTaskMetaRun) GetNow

func (m *StoreTaskMetaRun) GetNow() int64

func (*StoreTaskMetaRun) GetRangeEnd

func (m *StoreTaskMetaRun) GetRangeEnd() int64

func (*StoreTaskMetaRun) GetRangeStart

func (m *StoreTaskMetaRun) GetRangeStart() int64

func (*StoreTaskMetaRun) GetRequestedAt

func (m *StoreTaskMetaRun) GetRequestedAt() int64

func (*StoreTaskMetaRun) GetRunID

func (m *StoreTaskMetaRun) GetRunID() []byte

func (*StoreTaskMetaRun) GetTry

func (m *StoreTaskMetaRun) GetTry() uint32

func (*StoreTaskMetaRun) Marshal

func (m *StoreTaskMetaRun) Marshal() (dAtA []byte, err error)

func (*StoreTaskMetaRun) MarshalTo

func (m *StoreTaskMetaRun) MarshalTo(dAtA []byte) (int, error)

func (*StoreTaskMetaRun) ProtoMessage

func (*StoreTaskMetaRun) ProtoMessage()

func (*StoreTaskMetaRun) Reset

func (m *StoreTaskMetaRun) Reset()

func (*StoreTaskMetaRun) Size

func (m *StoreTaskMetaRun) Size() (n int)

func (*StoreTaskMetaRun) String

func (m *StoreTaskMetaRun) String() string

func (*StoreTaskMetaRun) Unmarshal

func (m *StoreTaskMetaRun) Unmarshal(dAtA []byte) error

type StoreValidation

type StoreValidation struct{}

StoreValidation is used for namespacing the store validation methods.

var StoreValidator StoreValidation

StoreValidator is a package-level StoreValidation, so that you can write

backend.StoreValidator.CreateArgs(...)

func (StoreValidation) CreateArgs

func (StoreValidation) CreateArgs(org, user platform.ID, script string) (options.Options, error)

CreateArgs returns the script's parsed options, and an error if any of the provided fields are invalid for creating a task.

func (StoreValidation) ModifyArgs

func (StoreValidation) ModifyArgs(taskID platform.ID, script string) (options.Options, error)

ModifyArgs returns the script's parsed options, and an error if any of the provided fields are invalid for modifying a task.

type TaskSearchParams

type TaskSearchParams struct {
	// Return tasks belonging to this exact organization ID. May be nil.
	Org platform.ID

	// Return tasks belonging to this exact user ID. May be nil.
	User platform.ID

	// Return tasks starting after this ID.
	After platform.ID

	// Size of each page. Must be non-negative.
	// If zero, the implementation picks an appropriate default page size.
	// Valid page sizes are implementation-dependent.
	PageSize int
}

TaskSearchParams is used when searching or listing tasks.

type TaskStatus

type TaskStatus string
const (
	TaskEnabled  TaskStatus = "enabled"
	TaskDisabled TaskStatus = "disabled"
)

Directories

Path Synopsis
Package bolt provides an bolt-backed store implementation.
Package bolt provides an bolt-backed store implementation.
Package executor contains implementations of backend.Executor that depend on the query service.
Package executor contains implementations of backend.Executor that depend on the query service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL