Skip to content

[Enh]: Add Expr.map_batches to pyspark#3579

Open
pedro-villanueva-bcom wants to merge 11 commits intonarwhals-dev:mainfrom
pedro-villanueva-bcom:add_pyspark_map_batches
Open

[Enh]: Add Expr.map_batches to pyspark#3579
pedro-villanueva-bcom wants to merge 11 commits intonarwhals-dev:mainfrom
pedro-villanueva-bcom:add_pyspark_map_batches

Conversation

@pedro-villanueva-bcom
Copy link
Copy Markdown
Contributor

Description

Expr.map _batches can be used when native expressions aren't enough, for example for statistical functions. Pyspark has several types of UDFs, including pandas UDFS that matches very well with map _batches. This PR implements map _batches using pandas UDFs. The optional param returns_scalar is not supported, as pyspark doesn't allow this. UDF must return either a pandas Series, something that can be transformed into one, or a scalar that will be broadcast to one.

The only change external to the spark backend is in the kind of the map_batches node, which has been changes from ordered to unordered.
Additionally, the testing fixture that creates the spark session now add the PYSPARK_PYTHON env var so that UDFs use that python to run (including using whatever packages are installed).

What type of PR is this? (check all applicable)

  • 💾 Refactor
  • ✨ Feature
  • 🐛 Bug Fix
  • 🔧 Optimization
  • 📝 Documentation
  • ✅ Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

@pedro-villanueva-bcom
Copy link
Copy Markdown
Contributor Author

pedro-villanueva-bcom commented Apr 28, 2026

I messed-up the commits, sorry fixed

@pedro-villanueva-bcom
Copy link
Copy Markdown
Contributor Author

I don't understand this test failure: https://github.com/narwhals-dev/narwhals/actions/runs/25050142610/job/73375481418?pr=3579
It runs fine on my local machine. The error says pyspark.errors.exceptions.base.PySparkTypeError: [NOT_COLUMN_OR_STR] Argument col should be a Column or str, got Column., which is really weird.

This test coverage test is also strange: https://github.com/narwhals-dev/narwhals/actions/runs/25050142614/job/73375481310?pr=3579
It doesn't run for pyspark, so the coverage is below 100% because of the new code, should I add pyspark to that test?

@pedro-villanueva-bcom pedro-villanueva-bcom force-pushed the add_pyspark_map_batches branch 2 times, most recently from 491bdd7 to 0d622c0 Compare April 28, 2026 14:30
@pedro-villanueva-bcom pedro-villanueva-bcom marked this pull request as draft April 28, 2026 14:31
@FBruzzesi
Copy link
Copy Markdown
Member

Hey @pedro-villanueva-bcom - thanks for taking the initiative! I am not sure we should support map_batches for lazy backends - I am open to see how this will play out!

Regarding your questions:

I don't understand this test failure: narwhals-dev/narwhals/actions/runs/25050142610/job/73375481418?pr=3579 It runs fine on my local machine. The error says pyspark.errors.exceptions.base.PySparkTypeError: [NOT_COLUMN_OR_STR] Argument col should be a Column or str, got Column., which is really weird.

I am not sure, but it would not be the first time that something is passing for pyspark but not for pyspark-connect.
It's ok to explicitly fail for spark-connect if you are not able to replicate and fix the issue.

This test coverage test is also strange: narwhals-dev/narwhals/actions/runs/25050142614/job/73375481310?pr=3579 It doesn't run for pyspark, so the coverage is below 100% because of the new code, should I add pyspark to that test?

Coverage is calculated with SQLFrame backend, so you will need to add a # pragma: no cover for the entire method if SQLFrame is not supported.

@pedro-villanueva-bcom
Copy link
Copy Markdown
Contributor Author

pedro-villanueva-bcom commented Apr 28, 2026

I am not sure we should support map_batches for lazy backends - I am open to see how this will play out!

Any specific reason for this? In my mind (and use case) udfs are just another type of expression to create a column. It has performance implications for sure, but in my case there's no other choice (that's mostly statistical functions like getting a p-value from a column with a z score for example).
Additionally, I wanted to look into how to support aggregation udfs. I see that polars has a map_group function. I also use the pyspark equivalent for, again, statistical summaries that can't be calculated in other ways (this case is a little less frequent though).

My use case is a library of statistical functions for large datasets that works for pyspark, pyspark connect and snowpark. I want to make it work for in-memory backends too to deal with small data and make testing faster too. I discovered narwhals and I'm quite happy with it. The syntax is nice (nicer than ibis) and migrating is not super hard.
Let me know if you want to talk about this more, happy to do it

@pedro-villanueva-bcom pedro-villanueva-bcom marked this pull request as ready for review April 29, 2026 10:09
@pedro-villanueva-bcom pedro-villanueva-bcom changed the title [Enh]: Add Expr.map _batches to pyspark [Enh]: Add Expr.map_batches to pyspark Apr 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enh]: Add map_batches to pyspark

2 participants