Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions engine/access/state_stream/filter/address.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package filter

import (
"fmt"

"github.com/onflow/flow-go/model/events"
"github.com/onflow/flow-go/model/flow"
)

type AddressFilter struct {
Addresses map[string]struct{}
}

var _ Matcher = (*AddressFilter)(nil)

func NewAddressFilter(addresses []string, chain flow.Chain) (*AddressFilter, error) {
addressesMap := make(map[string]struct{}, len(addresses))
for _, address := range addresses {
addr := flow.HexToAddress(address)
if err := validateAddress(addr, chain); err != nil {
return nil, err
}
// use the parsed address to make sure it will match the event address string exactly
addressesMap[addr.String()] = struct{}{}
}
return &AddressFilter{Addresses: addressesMap}, nil
}

func (f *AddressFilter) Match(event *flow.Event) (bool, error) {
parsed, err := events.ParseEvent(event.Type)
if err != nil {
return false, fmt.Errorf("error parsing event type: %w", err)
}

if parsed.Type != events.AccountEventType {
return false, nil
}

_, ok := f.Addresses[parsed.Address]
return ok, nil
}

// validateAddress ensures that the address is valid for the given chain
func validateAddress(address flow.Address, chain flow.Chain) error {
if !chain.IsValid(address) {
return fmt.Errorf("invalid address for chain: %s", address)
}
return nil
}
52 changes: 52 additions & 0 deletions engine/access/state_stream/filter/contract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package filter

import (
"fmt"
"strings"

"github.com/onflow/flow-go/model/events"
"github.com/onflow/flow-go/model/flow"
)

type ContractFilter struct {
Contracts map[string]struct{}
}

var _ Matcher = (*ContractFilter)(nil)

func NewContractFilter(contracts []string) (*ContractFilter, error) {
contractsMap := make(map[string]struct{}, len(contracts))
for _, contract := range contracts {
if err := validateContract(contract); err != nil {
return nil, err
}
contractsMap[contract] = struct{}{}
}
return &ContractFilter{Contracts: contractsMap}, nil
}

func (f *ContractFilter) Match(event *flow.Event) (bool, error) {
parsed, err := events.ParseEvent(event.Type)
if err != nil {
return false, fmt.Errorf("error parsing event type: %w", err)
}

if _, ok := f.Contracts[parsed.Contract]; ok {
return true, nil
}

return false, nil
}

// validateContract ensures that the contract is in the correct format
func validateContract(contract string) error {
if contract == "flow" {
return nil
}

parts := strings.Split(contract, ".")
if len(parts) != 3 || parts[0] != "A" {
return fmt.Errorf("invalid contract: %s", contract)
}
return nil
}
174 changes: 174 additions & 0 deletions engine/access/state_stream/filter/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package filter

import (
"fmt"

"github.com/onflow/cadence"

Check failure on line 6 in engine/access/state_stream/filter/fields.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not properly formatted (goimports)
"github.com/onflow/flow-go/model/flow"
)

type MatchOperation string

const (
MatchOperationEqual MatchOperation = "EQUAL" // any type
MatchOperationNotEqual MatchOperation = "NOT_EQUAL" // any type
MatchOperationGreaterThan MatchOperation = "GREATER_THAN" // any numeric type
MatchOperationLessThan MatchOperation = "LESS_THAN" // any numeric type
MatchOperationGreaterThanOrEqual MatchOperation = "GREATER_THAN_OR_EQUAL" // any numeric type
MatchOperationLessThanOrEqual MatchOperation = "LESS_THAN_OR_EQUAL" // any numeric type

// TODO: complete implementation and testing
MatchOperationContains MatchOperation = "CONTAINS" // string, array, dictionary. allow string/value/array of values
MatchOperationDoesNotContain MatchOperation = "DOES_NOT_CONTAIN" // string, array, dictionary. allow string/value/array of values
MatchOperationIn MatchOperation = "IN" // any type. allow array of values
MatchOperationNotIn MatchOperation = "NOT_IN" // any type. allow array of values
)

var (
allowedMatchOperators = map[MatchOperation]struct{}{
MatchOperationEqual: {},
MatchOperationNotEqual: {},
MatchOperationGreaterThan: {},
MatchOperationLessThan: {},
MatchOperationGreaterThanOrEqual: {},
MatchOperationLessThanOrEqual: {},
}
)

// Only allow filtering fields numeric, string, array, optional

type TypedFieldFilter struct {
FieldName string
Operation MatchOperation
Value cadence.Value

// TODO: is this the best way to handle this?
AllowedValues []cadence.Value
}

var _ Matcher = (*TypedFieldFilter)(nil)

func NewTypedFieldFilter(fieldName string, operation MatchOperation, value cadence.Value, allowedValues []cadence.Value) (*TypedFieldFilter, error) {
if fieldName == "" {
return nil, fmt.Errorf("field name must not be empty")
}
if _, ok := allowedMatchOperators[operation]; !ok {
return nil, fmt.Errorf("field name must not be empty")
}
if !isAllowedFilterType(value) {
return nil, fmt.Errorf("unsupported filter by type %s", value.Type().ID())
}

// make sure all values have the same type
var valueType cadence.Type
for _, allowedValue := range allowedValues {
if !isAllowedFilterType(allowedValue) {
return nil, fmt.Errorf("unsupported filter by type %s", allowedValue.Type().ID())
}

if valueType == nil { // TODO: not sure this will work since valueType is an interface
valueType = allowedValue.Type()
}
ok, err := typesEqual(allowedValue, valueType)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("type of values in allowedValues must match filter type %s", value.Type().ID())
}
}

return &TypedFieldFilter{
FieldName: fieldName,
Operation: operation,
Value: value,
AllowedValues: allowedValues,
}, nil
}

func (f *TypedFieldFilter) Match(event *flow.Event) (bool, error) {
fields, err := getEventFields(event)
if err != nil {
return false, fmt.Errorf("error getting event fields: %w", err)
}

fieldValue, ok := fields[f.FieldName]
if !ok {
return false, nil
}

ok, err = typesEqual(fieldValue, f.Value.Type())
if err != nil {
return false, err
}
if !ok {
return false, nil
}

switch f.Operation {
case MatchOperationEqual:
return equal(fieldValue, f.Value)

case MatchOperationNotEqual:
equal, err := equal(fieldValue, f.Value)
if err != nil {
return false, err
}
return !equal, nil

case MatchOperationGreaterThan:
cmp, err := cmp(fieldValue, f.Value)
if err != nil {
return false, err
}
return cmp > 0, nil

case MatchOperationLessThan:
cmp, err := cmp(fieldValue, f.Value)
if err != nil {
return false, err
}
return cmp < 0, nil

case MatchOperationGreaterThanOrEqual:
cmp, err := cmp(fieldValue, f.Value)
if err != nil {
return false, err
}
return cmp >= 0, nil

case MatchOperationLessThanOrEqual:
cmp, err := cmp(fieldValue, f.Value)
if err != nil {
return false, err
}
return cmp <= 0, nil

case MatchOperationContains:
return contains(fieldValue, f.Value)

case MatchOperationDoesNotContain:
contains, err := contains(fieldValue, f.Value)
if err != nil {
return false, err
}
return !contains, nil

case MatchOperationIn:
if len(f.AllowedValues) == 0 {
return false, nil
}

return in(fieldValue, f.AllowedValues)

case MatchOperationNotIn:
in, err := in(fieldValue, f.AllowedValues)
if err != nil {
return false, err
}
return !in, nil

default:
return false, fmt.Errorf("unsupported filter operation: %s", f.Operation)
}
}
7 changes: 7 additions & 0 deletions engine/access/state_stream/filter/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package filter

import "github.com/onflow/flow-go/model/flow"

type Matcher interface {
Match(event *flow.Event) (bool, error)
}
68 changes: 68 additions & 0 deletions engine/access/state_stream/filter/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package filter

import (
"fmt"

"github.com/onflow/flow-go/model/flow"
)

type GroupingOperation string

const (
GroupingOperationAnd GroupingOperation = "AND"
GroupingOperationOr GroupingOperation = "OR"
GroupingOperationNot GroupingOperation = "NOT"
)

type FieldFilterSet struct {
GroupingOperation GroupingOperation
Filters []Matcher
}

var _ Matcher = (*FieldFilterSet)(nil)

func NewFieldFilterSet(groupingOperation GroupingOperation, filters []Matcher) (*FieldFilterSet, error) {
if len(filters) == 0 {
return nil, fmt.Errorf("filter set cannot be empty")
}

return &FieldFilterSet{
GroupingOperation: groupingOperation,
Filters: filters,
}, nil
}

func (f *FieldFilterSet) Match(event *flow.Event) (bool, error) {
matchCount := 0
for i, filter := range f.Filters {
matched, err := filter.Match(event)
if err != nil {
return false, fmt.Errorf("error matching filter %d: %w", i, err)
}

switch f.GroupingOperation {
case GroupingOperationOr:
if matched {
return true, nil
}
case GroupingOperationAnd:
if !matched {
return false, nil
}
case GroupingOperationNot:
if matched {
return false, nil
}
}
}

switch f.GroupingOperation {
case GroupingOperationAnd:
return matchCount == len(f.Filters), nil
case GroupingOperationOr:
return matchCount > 0, nil
case GroupingOperationNot:
return matchCount == 0, nil
}
return false, nil
}
Loading
Loading