Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion internal/impl/oracledb/input_oracledb_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ociFieldMiningStrategy = "strategy"
ociFieldMaxTransactionEvents = "max_transaction_events"
ociFieldLOBEnabled = "lob_enabled"
ociFieldTransactionCache = "transaction_cache"
)

func init() {
Expand Down Expand Up @@ -134,6 +135,9 @@ When using the default Oracle based cache, the Connect user requires permission
service.NewBoolField(ociFieldLOBEnabled).
Description("When enabled, large object (CLOB, BLOB) columns are included in both snapshot and streaming change events. When disabled, these columns are still present but contain no values. Enabling this option introduces additional performance overhead and increases memory requirements.").
Default(logminer.DefaultLOBEnabled),
service.NewStringField(ociFieldTransactionCache).
Description("A https://www.docs.redpanda.com/redpanda-connect/components/caches/about[cache resource^] to use for buffering in-flight transactions. When set, DML events are serialized and stored in the named cache rather than held in memory, reducing connector memory usage for workloads with large or long-running transactions. If not set, an in-memory buffer is used.").
Optional(),
).Description("LogMiner configuration settings."),
).
Field(service.NewStringListField(ociFieldTablesInclude).
Expand Down Expand Up @@ -187,6 +191,7 @@ type Config struct {
SCNCacheKey string
CpCacheTableName string
PDBName string
TransactionCacheName string
}

type oracleDBCDCInput struct {
Expand Down Expand Up @@ -217,6 +222,7 @@ func newOracleDBCDCInput(conf *service.ParsedConfig, resources *service.Resource
cpCache service.Cache
cpCacheTableName string
lmCfg *logminer.Config
transactionCacheName string

logger = resources.Logger()
)
Expand All @@ -239,6 +245,14 @@ func newOracleDBCDCInput(conf *service.ParsedConfig, resources *service.Resource
if lmCfg, err = parseLogMinerConfig(conf); err != nil {
return nil, err
}
if conf.Contains(ociFieldLogMiner) {
lmConf := conf.Namespace(ociFieldLogMiner)
if lmConf.Contains(ociFieldTransactionCache) {
if transactionCacheName, err = lmConf.FieldString(ociFieldTransactionCache); err != nil {
return nil, err
}
}
}

// tables
if includes, err := conf.FieldStringList(ociFieldTablesInclude); err != nil {
Expand Down Expand Up @@ -321,6 +335,7 @@ func newOracleDBCDCInput(conf *service.ParsedConfig, resources *service.Resource
SCNCacheKey: scnCacheKey,
CpCacheTableName: cpCacheTableName,
PDBName: pdbName,
TransactionCacheName: transactionCacheName,
TablesFilter: &confx.RegexpFilter{
Include: tableIncludes,
Exclude: tableExcludes,
Expand Down Expand Up @@ -494,7 +509,15 @@ func (o *oracleDBCDCInput) Connect(ctx context.Context) (resErr error) {
}

if o.lmCfg != nil {
streaming = logminer.NewMiner(o.db, userTables, o.publisher, o.lmCfg, o.metrics, o.log)
var txnCache logminer.TransactionCache
if o.cfg.TransactionCacheName != "" {
if err := o.res.AccessCache(ctx, o.cfg.TransactionCacheName, func(c service.Cache) {
txnCache = logminer.NewConnectCacheResource(c, o.lmCfg.MaxTransactionEvents, o.metrics, o.log)
}); err != nil {
return fmt.Errorf("accessing transaction cache resource: %w", err)
}
}
streaming = logminer.NewMiner(o.db, userTables, o.publisher, o.lmCfg, txnCache, o.metrics, o.log)
} else {
return errors.New("logminer configuration required for streaming")
}
Expand Down
Loading
Loading