Skip to content
108 changes: 108 additions & 0 deletions docs/concepts/nodes/lambda_node.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ lambda_function_node:
- **`node_state`**:
Optional. Node-specific state key.

- **`node_name`**:
Optional. Node name used in the studio UI.

### Example Lambda Implementation

You can implement a lambda either as a class or a function:
Expand All @@ -38,6 +41,7 @@ You can implement a lambda either as a class or a function:
from sygra.core.graph.functions.lambda_function import LambdaFunction
from sygra.core.graph.sygra_state import SygraState

# Lambda function with sync apply(), to use async flow use AsyncLambdaFunction
class TestLambda(LambdaFunction):
def apply(lambda_node_dict: dict, state: SygraState):
state["return_key1"] = "hello world"
Expand All @@ -56,7 +60,111 @@ def lambda_function(lambda_node_dict: dict, state: SygraState):
- Lambda nodes give you full control over data transformation, allowing you to bridge, preprocess, or postprocess state as needed.
- All keys you want accessible in the next node should be listed in `output_keys`.
- Use lambda nodes for any custom task, especially when built-in nodes do not cover your use case.
- If you have async programming in your lambda function, use `AsyncLambdaFunction` instead of `LambdaFunction`. In this case, the `apply` function is async in nature, and you can call async function with `await` keyword.

----
### Example workflow with sync and async lambda function:

YAML configuration
```yaml
data_config:
source:
type: hf
repo_id: openai/gsm8k
config_name: main
split: train
graph_config:
nodes:
lambda_1:
node_type: lambda
lambda: tasks.examples.lambda_test.task_executor.Lambda1Function
node_name: Sync Node
lambda_2:
node_type: lambda
lambda: tasks.examples.lambda_test.task_executor.Lambda2Function
node_name: Async Node
edges:
- from: START
to: lambda_1
- from: lambda_1
to: lambda_2
- from: lambda_2
to: END
```

Task Executor Code:
```python
"""
Task executor for lambda test workflow having sync and async implementation.
"""
import asyncio
import time
from sygra.core.graph.functions.lambda_function import LambdaFunction, AsyncLambdaFunction
from sygra.core.graph.sygra_state import SygraState
from sygra.logger.logger_config import logger

async def count_async():
print("One")
logger.info("One...")
await asyncio.sleep(1)
print("Two")
logger.info("Two...")
await asyncio.sleep(1)

def count_sync(count:int):
print("One")
logger.info("One...")
time.sleep(1)
print("Two")
logger.info("Two...")
time.sleep(1)
logger.info("Count..." + str(count))

async def wrapper_count_sync(count:int):
return count_sync(count)

# sync lambda function
class Lambda1Function(LambdaFunction):
"""Execute custom logic on workflow state."""

@staticmethod
def apply(lambda_node_dict: dict, state: SygraState) -> SygraState:
"""Implement this method to apply lambda function.

Args:
lambda_node_dict: configuration dictionary
state: current state of the graph

Returns:
SygraState: the updated state object
"""
logger.info("sync function calling.......class1...")

count_sync(2)

logger.info("task done")
return state

#async lambda function
class Lambda2Function(AsyncLambdaFunction):
"""Execute custom logic on workflow state."""

@staticmethod
async def apply(lambda_node_dict: dict, state: SygraState) -> SygraState:
"""Implement this method to apply lambda function.

Args:
lambda_node_dict: configuration dictionary
state: current state of the graph

Returns:
SygraState: the updated state object
"""
logger.info("async function calling.......class2...")
await count_async()
return state

```
---

**Tip:** Keep your lambda logic modular and reusable across tasks for maximum flexibility.
Expand Down
3 changes: 2 additions & 1 deletion sygra/core/graph/backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class BackendFactory(ABC):
"""

@abstractmethod
def create_lambda_runnable(self, exec_wrapper):
def create_lambda_runnable(self, exec_wrapper, async_func=True):
"""
Abstract method to create a Lambda runnable.

Args:
exec_wrapper: Async function to execute
async_func: True if the function is async

Returns:
Any: backend specific runnable object like Runnable for backend=Langgraph
Expand Down
20 changes: 20 additions & 0 deletions sygra/core/graph/functions/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,23 @@ def apply(lambda_node_dict: dict, state: SygraState):
SygraState: the updated state object
"""
pass


class AsyncLambdaFunction(ABC):
"""
This is a function class represent Async Lambda Function class.
Implement async apply() method for lambda function to be called by graph node.
"""

@staticmethod
@abstractmethod
async def apply(lambda_node_dict: dict, state: SygraState):
"""
Implement this method to apply lambda function
Args:
lambda_node_dict: configuration dictionary
state: current state of the graph
Returns:
SygraState: the updated state object
"""
pass
10 changes: 7 additions & 3 deletions sygra/core/graph/langgraph/langgraph_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@ class LangGraphFactory(BackendFactory):
A factory class to convert Nodes into Runnable objects which LangGraph framework can execute.
"""

def create_lambda_runnable(self, exec_wrapper):
def create_lambda_runnable(self, exec_wrapper, async_func=True):
"""
Abstract method to create a Lambda runnable.

Args:
exec_wrapper: Async function to execute
exec_wrapper: Async/sync function to execute
async_func: True if the function is async

Returns:
Any: backend specific runnable object like Runnable for backend=Langgraph
"""
return RunnableLambda(lambda x: x, afunc=exec_wrapper)
if async_func:
return RunnableLambda(lambda x: x, afunc=exec_wrapper)
else:
return RunnableLambda(func=exec_wrapper)

def create_llm_runnable(self, exec_wrapper):
"""
Expand Down
40 changes: 38 additions & 2 deletions sygra/core/graph/nodes/lambda_node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import time
from inspect import isclass
from typing import Any
Expand All @@ -23,8 +24,35 @@ def __init__(self, node_name: str, config: dict):
self.func_to_execute = utils.get_func_from_str(self.node_config["lambda"])
if isclass(self.func_to_execute):
self.func_to_execute = self.func_to_execute.apply
# deduce if the function type is sync or async
if asyncio.iscoroutinefunction(self.func_to_execute):
self.func_type = "async"
else:
self.func_type = "sync"

async def _exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
async def _async_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
"""
Wrapper to track lambda node execution.

Args:
state: State of the node.

Returns:
Updated state
"""
start_time = time.time()
success = True

try:
result: dict[str, Any] = await self.func_to_execute(self.node_config, state)
return result
except Exception:
success = False
raise
finally:
self._record_execution_metadata(start_time, success)

def _sync_exec_wrapper(self, state: dict[str, Any]) -> dict[str, Any]:
"""
Wrapper to track lambda node execution.

Expand Down Expand Up @@ -53,7 +81,15 @@ def to_backend(self) -> Any:
Returns:
Any: platform specific runnable object like Runnable in LangGraph.
"""
return utils.backend_factory.create_lambda_runnable(self._exec_wrapper)
if self.func_type == "sync":
return utils.backend_factory.create_lambda_runnable(
self._sync_exec_wrapper, async_func=False
)
elif self.func_type == "async":
# default to async function as old behavior(default async_func is True)
return utils.backend_factory.create_lambda_runnable(self._async_exec_wrapper)
else:
raise Exception("Invalid function type")

def validate_node(self):
"""
Expand Down