pyspark.sql.streaming.DataStreamWriter.toTable#
- DataStreamWriter.toTable(tableName, format=None, outputMode=None, partitionBy=None, queryName=None, **options)[source]#
Starts the execution of the streaming query, which will continually output results to the given table as new data arrives.
The returned
StreamingQuery
object can be used to interact with the stream.New in version 3.1.0.
Changed in version 3.5.0: Supports Spark Connect.
- Parameters
- tableNamestr
string, for the name of the table.
- formatstr, optional
the format used to save.
- outputModestr, optional
specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
append: Only the new rows in the streaming DataFrame/Dataset will be written to the sink
complete: All the rows in the streaming DataFrame/Dataset will be written to the sink every time these are some updates
update: only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn’t contain aggregations, it will be equivalent to append mode.
- partitionBystr or list, optional
names of partitioning columns
- queryNamestr, optional
unique name for the query
- **optionsdict
All other string options. You may want to provide a checkpointLocation.
Notes
This API is evolving.
For v1 table, partitioning columns provided by partitionBy will be respected no matter the table exists or not. A new table will be created if the table not exists.
For v2 table, partitionBy will be ignored if the table already exists. partitionBy will be respected only if the v2 table does not exist. Besides, the v2 table created by this API lacks some functionalities (e.g., customized properties, options, and serde info). If you need them, please create the v2 table manually before the execution to avoid creating a table with incomplete information.
Examples
Save a data stream to a table.
>>> import tempfile >>> import time >>> _ = spark.sql("DROP TABLE IF EXISTS my_table2") >>> with tempfile.TemporaryDirectory(prefix="toTable") as d: ... # Create a table with Rate source. ... q = spark.readStream.format("rate").option( ... "rowsPerSecond", 10).load().writeStream.toTable( ... "my_table2", ... queryName='that_query', ... outputMode="append", ... format='parquet', ... checkpointLocation=d) ... time.sleep(3) ... q.stop() ... spark.read.table("my_table2").show() ... _ = spark.sql("DROP TABLE my_table2") +...---------+-----+ |...timestamp|value| +...---------+-----+ ...