Documentation
¶
Overview ¶
memorystore/memorystore.go Package memorystore provides a simple in-memory cache implementation with automatic cleanup of expired items. It supports both raw byte storage and JSON serialization/deserialization of structured data.
memorystore/pubsub_memory.go
Index ¶
- Variables
- type Config
- type GCPPubSub
- type InMemoryPubSub
- type MemoryStore
- func (m *MemoryStore) Delete(key string)
- func (m *MemoryStore) Get(key string) ([]byte, bool)
- func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error)
- func (m *MemoryStore) GetMetrics() StoreMetrics
- func (m *MemoryStore) GetMulti(keys []string) map[string][]byte
- func (m *MemoryStore) IsStopped() bool
- func (m *MemoryStore) Publish(topic string, message []byte) error
- func (m *MemoryStore) Set(key string, value []byte, duration time.Duration) error
- func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Duration) error
- func (m *MemoryStore) SetMulti(items map[string][]byte, duration time.Duration) error
- func (m *MemoryStore) Stop() error
- func (m *MemoryStore) Subscribe(topic string) (<-chan []byte, error)
- func (m *MemoryStore) SubscriberCount(pattern string) int
- func (m *MemoryStore) Unsubscribe(topic string) error
- type PubSubClient
- type StoreMetrics
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidTopic = errors.New("invalid topic") ErrStoreStopped = errors.New("store has been stopped") ErrNotImplemented = errors.New("feature not implemented by provider") )
Common errors for PubSub operations
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// GCPProjectID is the Google Cloud Project ID.
// If set, MemoryStore will attempt to use GCP PubSub.
GCPProjectID string
// PubSubTimeout is the timeout for PubSub operations.
PubSubTimeout time.Duration
}
Config holds configuration for the MemoryStore and its components.
type GCPPubSub ¶
type GCPPubSub struct {
// contains filtered or unexported fields
}
GCPPubSub implements PubSubClient using Google Cloud PubSub
func NewGCPPubSub ¶
func NewGCPPubSub(ctx context.Context, projectID string, opts ...option.ClientOption) (*GCPPubSub, error)
NewGCPPubSub creates a new GCP PubSub client
func (*GCPPubSub) Subscribe ¶
Subscribe subscribes to a topic. Note: In GCP PubSub, we must create a Subscription to the Topic. Since this is a temporary/session-based subscription, we create a unique subscription ID and delete it when Unsubscribe is called or the client is closed.
func (*GCPPubSub) Unsubscribe ¶
Unsubscribe stops the subscription and deletes it
type InMemoryPubSub ¶
type InMemoryPubSub struct {
// contains filtered or unexported fields
}
InMemoryPubSub handles all publish/subscribe operations in memory
func (*InMemoryPubSub) Close ¶
func (ps *InMemoryPubSub) Close() error
Close shuts down the PubSub manager
func (*InMemoryPubSub) Publish ¶
func (ps *InMemoryPubSub) Publish(topic string, message []byte) error
Publish sends a message to all subscribers matching the given topic
func (*InMemoryPubSub) Subscribe ¶
func (ps *InMemoryPubSub) Subscribe(topic string) (<-chan []byte, error)
Subscribe creates a new subscription for the given topic
func (*InMemoryPubSub) Unsubscribe ¶
func (ps *InMemoryPubSub) Unsubscribe(topic string) error
Unsubscribe cancels all subscriptions for the given topic
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore implements an in-memory cache with automatic cleanup of expired items. It is safe for concurrent use by multiple goroutines.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore creates and initializes a new MemoryStore instance. It checks for GOOGLE_CLOUD_PROJECT environment variable to decide whether to use GCP PubSub. Use NewMemoryStoreWithConfig for more control.
func NewMemoryStoreWithConfig ¶
func NewMemoryStoreWithConfig(config Config) *MemoryStore
NewMemoryStoreWithConfig creates a new MemoryStore with the provided configuration.
func (*MemoryStore) Delete ¶
func (m *MemoryStore) Delete(key string)
Delete removes an item from the cache. If the key doesn't exist, the operation is a no-op.
func (*MemoryStore) Get ¶
func (m *MemoryStore) Get(key string) ([]byte, bool)
Get retrieves a value from the cache. Returns the value and a boolean indicating whether the key was found. If the item has expired, returns (nil, false).
func (*MemoryStore) GetJSON ¶
func (m *MemoryStore) GetJSON(key string, dest interface{}) (bool, error)
GetJSON retrieves and deserializes a JSON value from the cache into the provided interface. Returns a boolean indicating if the key was found and any error that occurred during deserialization.
Example:
var user User
exists, err := cache.GetJSON("user:123", &user)
if err != nil {
// Handle error
} else if exists {
fmt.Printf("Found user: %+v\n", user)
}
func (*MemoryStore) GetMetrics ¶
func (m *MemoryStore) GetMetrics() StoreMetrics
GetMetrics returns the current statistics of the MemoryStore. It returns a copy of the metrics to ensure thread safety.
func (*MemoryStore) GetMulti ¶
func (m *MemoryStore) GetMulti(keys []string) map[string][]byte
GetMulti retrieves multiple values from the cache. It returns a map of found items. Keys that don't exist or are expired are omitted.
func (*MemoryStore) IsStopped ¶
func (m *MemoryStore) IsStopped() bool
IsStopped returns true if the MemoryStore has been stopped and can no longer be used. This method is safe for concurrent use.
Example:
if store.IsStopped() {
log.Println("Store is no longer available")
return
}
func (*MemoryStore) Publish ¶
func (m *MemoryStore) Publish(topic string, message []byte) error
Publish publishes a message to a topic.
func (*MemoryStore) Set ¶
Set stores a raw byte slice in the cache with the specified key and duration. The item will automatically expire after the specified duration. If an error occurs, it will be returned to the caller.
func (*MemoryStore) SetJSON ¶
func (m *MemoryStore) SetJSON(key string, value interface{}, duration time.Duration) error
SetJSON stores a JSON-serializable value in the cache. The value is serialized to JSON before storage. Returns an error if JSON marshaling fails.
Example:
type User struct {
Name string
Age int
}
user := User{Name: "John", Age: 30}
err := cache.SetJSON("user:123", user, 1*time.Hour)
func (*MemoryStore) SetMulti ¶
SetMulti stores multiple key-value pairs in the cache. This is more efficient than calling Set multiple times as it groups keys by shard. All items will have the same expiration duration.
func (*MemoryStore) Stop ¶
func (m *MemoryStore) Stop() error
Stop gracefully shuts down the MemoryStore by stopping the cleanup goroutine and releasing associated resources. After calling Stop, the store cannot be used. Multiple calls to Stop will not cause a panic and return nil.
Example:
store := NewMemoryStore() defer store.Stop()
func (*MemoryStore) Subscribe ¶
func (m *MemoryStore) Subscribe(topic string) (<-chan []byte, error)
Subscribe subscribes to a topic.
func (*MemoryStore) SubscriberCount ¶
func (m *MemoryStore) SubscriberCount(pattern string) int
SubscriberCount returns the number of subscribers for a pattern. Note: This is not supported by the common interface and will return 0 or error in future. For now, it only works if the underlying implementation is In-Memory.
func (*MemoryStore) Unsubscribe ¶
func (m *MemoryStore) Unsubscribe(topic string) error
Unsubscribe unsubscribes from a topic.
type PubSubClient ¶
type PubSubClient interface {
// Subscribe subscribes to a topic and returns a channel for messages.
// For GCP PubSub, 'topic' maps to a Topic, and a temporary subscription is created.
// For In-Memory, 'topic' is the pattern/channel name.
Subscribe(topic string) (<-chan []byte, error)
// Publish sends a message to a topic.
Publish(topic string, message []byte) error
// Unsubscribe stops receiving messages for a topic and cleans up resources.
Unsubscribe(topic string) error
// Close shuts down the client and cleans up resources.
Close() error
}
PubSubClient defines the interface for Publish/Subscribe operations to make the underlying implementation agnostic (In-Memory, GCP, etc).