API Reference¶
InfluxDBClient¶
-
class
influxdb_client.
InfluxDBClient
(url, token: str = None, debug=None, timeout=10000, enable_gzip=False, org: str = None, default_tags: dict = None, **kwargs)[source]¶ InfluxDBClient is client for InfluxDB v2.
Initialize defaults.
Parameters: - url – InfluxDB server API url (ex. http://localhost:8086).
- token –
token
to authenticate to the InfluxDB API - debug – enable verbose logging of http requests
- timeout – HTTP client timeout setting for a request specified in milliseconds. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts.
- enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- org – organization name (used as a default in Query, Write and Delete API)
Key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
Key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Key str cert_file: Path to the certificate that will be used for mTLS authentication.
Key str cert_key_file: Path to the file contains private key for mTLS certificate.
Key str cert_key_password: String or function which returns password for decrypting the mTLS private key.
Key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
Key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)
Key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
Key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3. Defaults to “multiprocessing.cpu_count() * 5”.
Key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
Key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
Key str username: username
to authenticate via username and password credentials to the InfluxDB 2.xKey str password: password
to authenticate via username and password credentials to the InfluxDB 2.xKey list[str] profilers: list of enabled Flux profilers
Create the Authorizations API instance.
Returns: authorizations api
-
buckets_api
() → influxdb_client.client.bucket_api.BucketsApi[source]¶ Create the Bucket API instance.
Returns: buckets api
-
build
() → str[source]¶ Return the build type of the connected InfluxDB Server.
Returns: The type of InfluxDB build.
-
delete_api
() → influxdb_client.client.delete_api.DeleteApi[source]¶ Get the delete metrics API instance.
Returns: delete api
-
classmethod
from_config_file
(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]¶ Configure client via configuration file. The configuration has to be under ‘influx’ section.
Parameters: - config_file – Path to configuration file
- debug – Enable verbose logging of http requests
- enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
Key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
Key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
Key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
- url
- org
- token
- timeout,
- verify_ssl
- ssl_ca_cert
- cert_file
- cert_key_file
- cert_key_password
- connection_pool_maxsize
- auth_basic
- profilers
- proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
-
classmethod
from_env_properties
(debug=None, enable_gzip=False, **kwargs)[source]¶ Configure client via environment properties.
Parameters: - debug – Enable verbose logging of http requests
- enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
Key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)
Key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
Key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
Key ssl.SSLContext ssl_context: Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
- INFLUXDB_V2_URL
- INFLUXDB_V2_ORG
- INFLUXDB_V2_TOKEN
- INFLUXDB_V2_TIMEOUT
- INFLUXDB_V2_VERIFY_SSL
- INFLUXDB_V2_SSL_CA_CERT
- INFLUXDB_V2_CERT_FILE
- INFLUXDB_V2_CERT_KEY_FILE
- INFLUXDB_V2_CERT_KEY_PASSWORD
- INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
- INFLUXDB_V2_AUTH_BASIC
- INFLUXDB_V2_PROFILERS
- INFLUXDB_V2_TAG
-
health
() → influxdb_client.domain.health_check.HealthCheck[source]¶ Get the health of an instance.
Returns: HealthCheck
-
invokable_scripts_api
() → influxdb_client.client.invokable_scripts_api.InvokableScriptsApi[source]¶ Create an InvokableScripts API instance.
Returns: InvokableScripts API instance
-
labels_api
() → influxdb_client.client.labels_api.LabelsApi[source]¶ Create the Labels API instance.
Returns: labels api
-
organizations_api
() → influxdb_client.client.organizations_api.OrganizationsApi[source]¶ Create the Organizations API instance.
Returns: organizations api
-
query_api
(query_options: influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) → influxdb_client.client.query_api.QueryApi[source]¶ Create an Query API instance.
Parameters: query_options – optional query api configuration Returns: Query api instance
-
ready
() → influxdb_client.domain.ready.Ready[source]¶ Get The readiness of the InfluxDB 2.0.
Returns: Ready
-
tasks_api
() → influxdb_client.client.tasks_api.TasksApi[source]¶ Create the Tasks API instance.
Returns: tasks api
-
users_api
() → influxdb_client.client.users_api.UsersApi[source]¶ Create the Users API instance.
Returns: users api
-
version
() → str[source]¶ Return the version of the connected InfluxDB Server.
Returns: The version of InfluxDB.
-
write_api
(write_options=<influxdb_client.client.write_api.WriteOptions object>, point_settings=<influxdb_client.client.write_api.PointSettings object>, **kwargs) → influxdb_client.client.write_api.WriteApi[source]¶ Create Write API instance.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
If you would like to use a background batching, you have to configure client like this:
from influxdb_client import InfluxDBClient # Initialize background batching instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: with client.write_api() as write_api: pass
There is also possibility to use callbacks to notify about state of background batches:
from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: callback = BatchingCallback() with client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: pass
Parameters: - write_options – Write API configuration
- point_settings – settings to store default tags
Key success_callback: The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Tuple:
[batching mode]
Key error_callback: The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Exception: an occurred error
- Tuple:
[batching mode]
Key retry_callback: The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Exception: an retryable error
- Tuple:
[batching mode]
Returns: write api instance
QueryApi¶
-
class
influxdb_client.
QueryApi
(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]¶ Implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
Parameters: influxdb_client – influxdb client -
query
(query: str, org=None, params: dict = None) → influxdb_client.client.flux_table.TableList[source]¶ Execute synchronous Flux query and return result as a
FluxTable
list.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - params – bind parameters
Returns: Return type: Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
-
query_csv
(query: str, org=None, dialect: influxdb_client.domain.dialect.Dialect = {'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: dict = None) → influxdb_client.client.flux_table.CSVIterator[source]¶ Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
Parameters: - query – a Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - dialect – csv dialect format
- params – bind parameters
Returns: Iterator[List[str]]
wrapped intoCSVIterator
Return type: Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = csv_iterator.to_values() print(output)
[ ['#datatype', 'string', 'long', 'dateTime:RFC3339', 'dateTime:RFC3339', 'dateTime:RFC3339', 'double', 'string', 'string', 'string'] ['#group', 'false', 'false', 'true', 'true', 'false', 'false', 'true', 'true', 'true'] ['#default', '_result', '', '', '', '', '', '', '', ''] ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
If you would like to turn off Annotated CSV header’s you can use following code:
from influxdb_client import InfluxDBClient, Dialect with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)', dialect=Dialect(header=False, annotations=[])) for csv_line in csv_iterator: print(csv_line)
[ ['', '_result', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '_result', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
-
query_data_frame
(query: str, org=None, data_frame_index: List[str] = None, params: dict = None)[source]¶ Execute synchronous Flux query and return Pandas DataFrame.
Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - data_frame_index – the list of columns that are used as DataFrame index
- params – bind parameters
Returns: DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
query_data_frame_stream
(query: str, org=None, data_frame_index: List[str] = None, params: dict = None)[source]¶ Execute synchronous Flux query and return stream of Pandas DataFrame as a
Generator[DataFrame]
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - data_frame_index – the list of columns that are used as DataFrame index
- params – bind parameters
Returns: Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
query_raw
(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: dict = None)[source]¶ Execute synchronous Flux query and return result as raw unprocessed result as a str.
Parameters: - query – a Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - dialect – csv dialect format
- params – bind parameters
Returns: str
-
query_stream
(query: str, org=None, params: dict = None) → Generator[influxdb_client.client.flux_table.FluxRecord, Any, None][source]¶ Execute synchronous Flux query and return stream of FluxRecord as a Generator[‘FluxRecord’].
Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used. - params – bind parameters
Returns: Generator[‘FluxRecord’]
-
-
class
influxdb_client.client.flux_table.
FluxTable
[source]¶ A table is set of records with a common set of columns and a group key.
The table can be serialized into JSON by:
import json from influxdb_client.client.flux_table import FluxStructureEncoder output = json.dumps(tables, cls=FluxStructureEncoder, indent=2) print(output)
Initialize defaults.
-
class
influxdb_client.client.flux_table.
FluxRecord
(table, values=None)[source]¶ A record is a tuple of named values and is represented using an object type.
Initialize defaults.
-
class
influxdb_client.client.flux_table.
TableList
[source]¶ FluxTable
list with additionally functional to better handle of query result.-
to_json
(columns: List[str] = None, **kwargs) → str[source]¶ Serialize query results to a JSON formatted
str
.Parameters: columns – if not None
then only specified columns are presented in resultsReturns: str
The query results is flattened to array:
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
The JSON format could be configured via
**kwargs
arguments:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
For all available options see - json.dump.
-
to_values
(columns: List[str] = None) → List[List[object]][source]¶ Serialize query results to a flattened list of values.
Parameters: columns – if not None
then only specified columns are presented in resultsReturns: list
of valuesOutput example:
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Configure required columns:
from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
-
WriteApi¶
-
class
influxdb_client.
WriteApi
(influxdb_client, write_options: influxdb_client.client.write_api.WriteOptions = <influxdb_client.client.write_api.WriteOptions object>, point_settings: influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>, **kwargs)[source]¶ Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
Initialize defaults.
Parameters: - influxdb_client – with default settings (organization)
- write_options – write api configuration
- point_settings – settings to store default tags.
Key success_callback: The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Tuple:
[batching mode]
Key error_callback: The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Exception: an occurred error
- Tuple:
[batching mode]
Key retry_callback: The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
- Tuple:
(bucket, organization, precision)
- str: written data
- Exception: an retryable error
- Tuple:
[batching mode]
-
write
(bucket: str, org: str = None, record: Union[str, Iterable[str], influxdb_client.client.write.point.Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], reactivex.observable.observable.Observable, NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]] = None, write_precision: influxdb_client.domain.write_precision.WritePrecision = 'ns', **kwargs) → Any[source]¶ Write time-series data into InfluxDB.
Parameters: - bucket (str) – specifies the destination bucket for writes (required)
- Organization org (str,) – specifies the destination organization for writes;
take the ID, Name or Organization.
If not specified the default value from
InfluxDBClient.org
is used. - write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
- record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or RxPY Observable to write
Key data_frame_measurement_name: name of measurement for writing Pandas DataFrame -
DataFrame
Key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
Key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
Key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column -
DataFrame
Key record_measurement_key: key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
Key record_measurement_name: static measurement name -
dictionary
,NamedTuple
,dataclass
Key record_time_key: key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
Key record_tag_keys: list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
Key record_field_keys: list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Example:
# Record as Line Protocol write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
-
class
influxdb_client.client.write.point.
Point
(measurement_name)[source]¶ Point defines the values that will be written to the database.
Ref: https://docs.influxdata.com/influxdb/latest/reference/key-concepts/data-elements/#point
Initialize defaults.
-
static
from_dict
(dictionary: dict, write_precision: influxdb_client.domain.write_precision.WritePrecision = 'ns', **kwargs)[source]¶ Initialize point from ‘dict’ structure.
- The expected dict structure is:
- measurement
- tags
- fields
- time
- Example:
# Use default dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1.0}, "time": 1 } point = Point.from_dict(dict_structure, WritePrecision.NS)
- Example:
# Use custom dictionary structure dictionary = { "name": "sensor_pt859", "location": "warehouse_125", "version": "2021.06.05.5874", "pressure": 125, "temperature": 10, "created": 1632208639, } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, record_measurement_key="name", record_time_key="created", record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"])
Parameters: - dictionary – dictionary for serialize into data Point
- write_precision – sets the precision for the supplied time values
Key record_measurement_key: key of dictionary with specified measurement
Key record_measurement_name: static measurement name for data Point
Key record_time_key: key of dictionary with specified timestamp
Key record_tag_keys: list of dictionary keys to use as a tag
Key record_field_keys: list of dictionary keys to use as a field
Returns: new data point
-
time
(time, write_precision='ns')[source]¶ Specify timestamp for DataPoint with declared precision.
If time doesn’t have specified timezone we assume that timezone is UTC.
- Examples::
- Point.measurement(“h2o”).field(“val”, 1).time(“2009-11-10T23:00:00.123456Z”) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000) Point.measurement(“h2o”).field(“val”, 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000, write_precision=WritePrecision.NS)
Parameters: - time – the timestamp for your data
- write_precision – sets the precision for the supplied time values
Returns: this point
-
to_line_protocol
(precision=None)[source]¶ Create LineProtocol.
param precision: required precision of LineProtocol. If it’s not set then use the precision from Point
.
-
write_precision
¶ Get precision.
-
static
-
class
influxdb_client.domain.write_precision.
WritePrecision
[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
WritePrecision - a model defined in OpenAPI.
-
NS
= 'ns'¶ - Attributes:
- openapi_types (dict): The key is attribute name
- and the value is attribute type.
- attribute_map (dict): The key is attribute name
- and the value is json key in definition.
-
BucketsApi¶
-
class
influxdb_client.
BucketsApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/buckets’ endpoint.
Initialize defaults.
-
create_bucket
(bucket=None, bucket_name=None, org_id=None, retention_rules=None, description=None, org=None) → influxdb_client.domain.bucket.Bucket[source]¶ Create a bucket.
Parameters: - bucket (Bucket|PostBucketRequest) – bucket to create
- bucket_name – bucket name
- description – bucket description
- org_id – org_id
- bucket_name – bucket name
- retention_rules – retention rules array or single BucketRetentionRules
- Organization org (str,) – specifies the organization for create the bucket;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
Returns: Bucket If the method is called asynchronously, returns the request thread.
-
delete_bucket
(bucket)[source]¶ Delete a bucket.
Parameters: bucket – bucket id or Bucket Returns: Bucket
-
find_bucket_by_name
(bucket_name)[source]¶ Find bucket by name.
Parameters: bucket_name – bucket name Returns: Bucket
-
find_buckets
(**kwargs)[source]¶ List buckets.
Key int offset: Offset for pagination Key int limit: Limit for pagination Key str after: The last resource ID from which to seek from (but not including). This is to be used instead of offset. Key str org: The organization name. Key str org_id: The organization ID. Key str name: Only returns buckets with a specific name. Returns: Buckets
-
-
class
influxdb_client.domain.
Bucket
(links=None, id=None, type='user', name=None, description=None, org_id=None, rp=None, schema_type=None, created_at=None, updated_at=None, retention_rules=None, labels=None)[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Bucket - a model defined in OpenAPI.
-
created_at
¶ Get the created_at of this Bucket.
Returns: The created_at of this Bucket. Return type: datetime
-
description
¶ Get the description of this Bucket.
Returns: The description of this Bucket. Return type: str
-
links
¶ Get the links of this Bucket.
Returns: The links of this Bucket. Return type: BucketLinks
-
retention_rules
¶ Get the retention_rules of this Bucket.
Rules to expire or retain data. No rules means data never expires.
Returns: The retention_rules of this Bucket. Return type: list[BucketRetentionRules]
-
schema_type
¶ Get the schema_type of this Bucket.
Returns: The schema_type of this Bucket. Return type: SchemaType
-
updated_at
¶ Get the updated_at of this Bucket.
Returns: The updated_at of this Bucket. Return type: datetime
-
LabelsApi¶
-
class
influxdb_client.
LabelsApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/labels’ endpoint.
Initialize defaults.
-
clone_label
(cloned_name: str, label: influxdb_client.domain.label.Label) → influxdb_client.domain.label.Label[source]¶ Create the new instance of the label as a copy existing label.
Parameters: - cloned_name – new label name
- label – existing label
Returns: clonned Label
-
create_label
(name: str, org_id: str, properties: Dict[str, str] = None) → influxdb_client.domain.label.Label[source]¶ Create a new label.
Parameters: - name – label name
- org_id – organization id
- properties – optional label properties
Returns: created label
-
delete_label
(label: Union[str, influxdb_client.domain.label.Label])[source]¶ Delete the label.
Parameters: label – label id or Label
-
find_label_by_id
(label_id: str)[source]¶ Retrieve the label by id.
Parameters: label_id – Returns: Label
-
find_label_by_org
(org_id) → List[influxdb_client.domain.label.Label][source]¶ Get the list of all labels for given organization.
Parameters: org_id – organization id Returns: list of labels
-
OrganizationsApi¶
-
class
influxdb_client.
OrganizationsApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/orgs’ endpoint.
Initialize defaults.
-
create_organization
(name: str = None, organization: influxdb_client.domain.organization.Organization = None) → influxdb_client.domain.organization.Organization[source]¶ Create an organization.
-
find_organizations
(**kwargs)[source]¶ List all organizations.
Key int offset: Offset for pagination Key int limit: Limit for pagination Key bool descending: Key str org: Filter organizations to a specific organization name. Key str org_id: Filter organizations to a specific organization ID. Key str user_id: Filter organizations to a specific user ID.
-
-
class
influxdb_client.domain.
Organization
(links=None, id=None, name=None, description=None, created_at=None, updated_at=None, status='active')[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Organization - a model defined in OpenAPI.
-
created_at
¶ Get the created_at of this Organization.
Returns: The created_at of this Organization. Return type: datetime
-
description
¶ Get the description of this Organization.
Returns: The description of this Organization. Return type: str
-
links
¶ Get the links of this Organization.
Returns: The links of this Organization. Return type: OrganizationLinks
-
status
¶ Get the status of this Organization.
If inactive the organization is inactive.
Returns: The status of this Organization. Return type: str
-
updated_at
¶ Get the updated_at of this Organization.
Returns: The updated_at of this Organization. Return type: datetime
-
UsersApi¶
-
class
influxdb_client.
UsersApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/users’ endpoint.
Initialize defaults.
-
delete_user
(user: Union[str, influxdb_client.domain.user.User, influxdb_client.domain.user_response.UserResponse]) → None[source]¶ Delete a user.
Parameters: user – user id or User Returns: None
-
find_users
(**kwargs) → influxdb_client.domain.users.Users[source]¶ List all users.
Key int offset: The offset for pagination. The number of records to skip. Key int limit: Limits the number of records returned. Default is 20. Key str after: The last resource ID from which to seek from (but not including). This is to be used instead of offset. Key str name: The user name. Key str id: The user ID. Returns: Buckets
-
-
class
influxdb_client.domain.
User
(id=None, oauth_id=None, name=None, status='active')[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
User - a model defined in OpenAPI.
TasksApi¶
-
class
influxdb_client.
TasksApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/tasks’ endpoint.
Initialize defaults.
-
add_label
(label_id: str, task_id: str) → influxdb_client.domain.label_response.LabelResponse[source]¶ Add a label to a task.
-
cancel_run
(task_id: str, run_id: str)[source]¶ Cancel a currently running run.
Parameters: - task_id –
- run_id –
-
clone_task
(task: influxdb_client.domain.task.Task) → influxdb_client.domain.task.Task[source]¶ Clone a task.
-
create_task
(task: influxdb_client.domain.task.Task = None, task_create_request: influxdb_client.domain.task_create_request.TaskCreateRequest = None) → influxdb_client.domain.task.Task[source]¶ Create a new task.
-
create_task_cron
(name: str, flux: str, cron: str, org_id: str) → influxdb_client.domain.task.Task[source]¶ Create a new task with cron repetition schedule.
-
create_task_every
(name, flux, every, organization) → influxdb_client.domain.task.Task[source]¶ Create a new task with every repetition schedule.
-
find_tasks
(**kwargs)[source]¶ List all tasks.
Key str name: only returns tasks with the specified name Key str after: returns tasks after specified ID Key str user: filter tasks to a specific user ID Key str org: filter tasks to a specific organization name Key str org_id: filter tasks to a specific organization ID Key int limit: the number of tasks to return Returns: Tasks
-
get_logs
(task_id: str) → List[influxdb_client.domain.log_event.LogEvent][source]¶ Retrieve all logs for a task.
Parameters: task_id – task id
-
get_run
(task_id: str, run_id: str) → influxdb_client.domain.run.Run[source]¶ Get run record for specific task and run id.
Parameters: - task_id – task id
- run_id – run id
Returns: Run for specified task and run id
-
get_run_logs
(task_id: str, run_id: str) → List[influxdb_client.domain.log_event.LogEvent][source]¶ Retrieve all logs for a run.
-
get_runs
(task_id, **kwargs) → List[influxdb_client.domain.run.Run][source]¶ Retrieve list of run records for a task.
Parameters: task_id – task id Key str after: returns runs after specified ID Key int limit: the number of runs to return Key datetime after_time: filter runs to those scheduled after this time, RFC3339 Key datetime before_time: filter runs to those scheduled before this time, RFC3339
-
retry_run
(task_id: str, run_id: str)[source]¶ Retry a task run.
Parameters: - task_id – task id
- run_id – run id
-
run_manually
(task_id: str, scheduled_for: <module 'datetime' from '/home/docs/.pyenv/versions/3.7.9/lib/python3.7/datetime.py'> = None)[source]¶ Manually start a run of the task now overriding the current schedule.
Parameters: - task_id –
- scheduled_for – planned execution
-
-
class
influxdb_client.domain.
Task
(id=None, type=None, org_id=None, org=None, name=None, owner_id=None, description=None, status=None, labels=None, authorization_id=None, flux=None, every=None, cron=None, offset=None, latest_completed=None, last_run_status=None, last_run_error=None, created_at=None, updated_at=None, links=None)[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Task - a model defined in OpenAPI.
Get the authorization_id of this Task.
The ID of the authorization used when the task communicates with the query engine.
Returns: The authorization_id of this Task. Return type: str
-
created_at
¶ Get the created_at of this Task.
Returns: The created_at of this Task. Return type: datetime
-
cron
¶ Get the cron of this Task.
[Cron expression](https://en.wikipedia.org/wiki/Cron#Overview) that defines the schedule on which the task runs. InfluxDB bases cron runs on the system time.
Returns: The cron of this Task. Return type: str
-
description
¶ Get the description of this Task.
The description of the task.
Returns: The description of this Task. Return type: str
-
every
¶ Get the every of this Task.
An interval ([duration literal](https://docs.influxdata.com/flux/v0.x/spec/lexical-elements/#duration-literals))) at which the task runs. every also determines when the task first runs, depending on the specified time.
Returns: The every of this Task. Return type: str
-
flux
¶ Get the flux of this Task.
The Flux script to run for this task.
Returns: The flux of this Task. Return type: str
-
last_run_error
¶ Get the last_run_error of this Task.
Returns: The last_run_error of this Task. Return type: str
-
last_run_status
¶ Get the last_run_status of this Task.
Returns: The last_run_status of this Task. Return type: str
-
latest_completed
¶ Get the latest_completed of this Task.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/flux/v0.x/data-types/basic/time/#time-syntax)) of the latest scheduled and completed run.
Returns: The latest_completed of this Task. Return type: datetime
-
links
¶ Get the links of this Task.
Returns: The links of this Task. Return type: TaskLinks
-
name
¶ Get the name of this Task.
The name of the task.
Returns: The name of this Task. Return type: str
-
offset
¶ Get the offset of this Task.
A [duration](https://docs.influxdata.com/flux/v0.x/spec/lexical-elements/#duration-literals) to delay execution of the task after the scheduled time has elapsed. 0 removes the offset.
Returns: The offset of this Task. Return type: str
-
org
¶ Get the org of this Task.
The name of the organization that owns the task.
Returns: The org of this Task. Return type: str
-
org_id
¶ Get the org_id of this Task.
The ID of the organization that owns the task.
Returns: The org_id of this Task. Return type: str
-
owner_id
¶ Get the owner_id of this Task.
The ID of the user who owns this Task.
Returns: The owner_id of this Task. Return type: str
-
status
¶ Get the status of this Task.
Returns: The status of this Task. Return type: TaskStatusType
-
type
¶ Get the type of this Task.
The type of the task, useful for filtering a task list.
Returns: The type of this Task. Return type: str
-
updated_at
¶ Get the updated_at of this Task.
Returns: The updated_at of this Task. Return type: datetime
InvokableScriptsApi¶
-
class
influxdb_client.
InvokableScriptsApi
(influxdb_client)[source]¶ Use API invokable scripts to create custom InfluxDB API endpoints that query, process, and shape data.
Initialize defaults.
-
create_script
(create_request: influxdb_client.domain.script_create_request.ScriptCreateRequest) → influxdb_client.domain.script.Script[source]¶ Create a script.
Parameters: create_request (ScriptCreateRequest) – The script to create. (required) Returns: The created script.
-
delete_script
(script_id: str) → None[source]¶ Delete a script.
Parameters: script_id (str) – The ID of the script to delete. (required) Returns: None
-
find_scripts
(**kwargs)[source]¶ List scripts.
Key int limit: The number of scripts to return. Key int offset: The offset for pagination. Returns: List of scripts. Return type: list[Script]
-
invoke_script
(script_id: str, params: dict = None) → influxdb_client.client.flux_table.TableList[source]¶ Invoke synchronously a script and return result as a TableList.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Parameters: - script_id (str) – The ID of the script to invoke. (required)
- params – bind parameters
Returns: Return type: Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
-
invoke_script_csv
(script_id: str, params: dict = None) → influxdb_client.client.flux_table.CSVIterator[source]¶ Invoke synchronously a script and return result as a CSV iterator. Each iteration returns a row of the CSV file.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Parameters: - script_id (str) – The ID of the script to invoke. (required)
- params – bind parameters
Returns: Iterator[List[str]]
wrapped intoCSVIterator
Return type: Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.invokable_scripts_api().invoke_script_csv(script_id="script-id") # Serialize to values output = csv_iterator.to_values() print(output)
[ ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
-
invoke_script_data_frame
(script_id: str, params: dict = None, data_frame_index: List[str] = None)[source]¶ Invoke synchronously a script and return Pandas DataFrame.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: Returns: DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
invoke_script_data_frame_stream
(script_id: str, params: dict = None, data_frame_index: List[str] = None)[source]¶ Invoke synchronously a script and return stream of Pandas DataFrame as a Generator[‘pd.DataFrame’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: Returns: Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
invoke_script_raw
(script_id: str, params: dict = None) → Iterator[List[str]][source]¶ Invoke synchronously a script and return result as raw unprocessed result as a str.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Parameters: - script_id (str) – The ID of the script to invoke. (required)
- params – bind parameters
Returns: Result as a str.
-
invoke_script_stream
(script_id: str, params: dict = None) → Generator[influxdb_client.client.flux_table.FluxRecord, Any, None][source]¶ Invoke synchronously a script and return result as a Generator[‘FluxRecord’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Parameters: - script_id (str) – The ID of the script to invoke. (required)
- params – bind parameters
Returns: Stream of FluxRecord.
Return type: Generator[‘FluxRecord’]
-
update_script
(script_id: str, update_request: influxdb_client.domain.script_update_request.ScriptUpdateRequest) → influxdb_client.domain.script.Script[source]¶ Update a script.
Parameters: - script_id (str) – The ID of the script to update. (required)
- update_request (ScriptUpdateRequest) – Script updates to apply (required)
Returns: The updated.
-
-
class
influxdb_client.domain.
Script
(id=None, name=None, description=None, org_id=None, script=None, language=None, url=None, created_at=None, updated_at=None)[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Script - a model defined in OpenAPI.
-
created_at
¶ Get the created_at of this Script.
Returns: The created_at of this Script. Return type: datetime
-
description
¶ Get the description of this Script.
Returns: The description of this Script. Return type: str
-
language
¶ Get the language of this Script.
Returns: The language of this Script. Return type: ScriptLanguage
-
script
¶ Get the script of this Script.
script to be executed
Returns: The script of this Script. Return type: str
-
updated_at
¶ Get the updated_at of this Script.
Returns: The updated_at of this Script. Return type: datetime
-
-
class
influxdb_client.domain.
ScriptCreateRequest
(name=None, description=None, script=None, language=None)[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
ScriptCreateRequest - a model defined in OpenAPI.
-
description
¶ Get the description of this ScriptCreateRequest.
Returns: The description of this ScriptCreateRequest. Return type: str
-
language
¶ Get the language of this ScriptCreateRequest.
Returns: The language of this ScriptCreateRequest. Return type: ScriptLanguage
-
name
¶ Get the name of this ScriptCreateRequest.
The name of the script. The name must be unique within the organization.
Returns: The name of this ScriptCreateRequest. Return type: str
-
DeleteApi¶
-
class
influxdb_client.
DeleteApi
(influxdb_client)[source]¶ Implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
-
delete
(start: Union[str, datetime.datetime], stop: Union[str, datetime.datetime], predicate: str, bucket: str, org: Union[str, influxdb_client.domain.organization.Organization, None] = None) → None[source]¶ Delete Time series data from InfluxDB.
Parameters: - datetime.datetime start (str,) – start time
- datetime.datetime stop (str,) – stop time
- predicate (str) – predicate
- bucket (str) – bucket id or name from which data will be deleted
- Organization org (str,) – specifies the organization to delete data from.
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
Returns:
-
-
class
influxdb_client.domain.
DeletePredicateRequest
(start=None, stop=None, predicate=None)[source]¶ NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
DeletePredicateRequest - a model defined in OpenAPI.
-
predicate
¶ Get the predicate of this DeletePredicateRequest.
An expression in [delete predicate syntax](https://docs.influxdata.com/influxdb/v2.2/reference/syntax/delete-predicate/).
Returns: The predicate of this DeletePredicateRequest. Return type: str
-
start
¶ Get the start of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/flux/v0.x/data-types/basic/time/#time-syntax)).
Returns: The start of this DeletePredicateRequest. Return type: datetime
-
stop
¶ Get the stop of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/flux/v0.x/data-types/basic/time/#time-syntax)).
Returns: The stop of this DeletePredicateRequest. Return type: datetime
-
Helpers¶
-
class
influxdb_client.client.util.date_utils.
DateHelper
(timezone: datetime.tzinfo = datetime.timezone.utc)[source]¶ DateHelper to groups different implementations of date operations.
If you would like to serialize the query results to custom timezone, you can use following code:
from influxdb_client.client.util import date_utils from influxdb_client.client.util.date_utils import DateHelper import dateutil.parser from dateutil import tz def parse_date(date_string: str): return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) date_utils.date_helper = DateHelper() date_utils.date_helper.parse_date = parse_date
Initialize defaults.
Parameters: timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”. -
parse_date
(date_string: str)[source]¶ Parse string into Date or Timestamp.
Returns: Returns a datetime.datetime
object or compliant implementation likeclass 'pandas._libs.tslibs.timestamps.Timestamp
-
to_nanoseconds
(delta)[source]¶ Get number of nanoseconds in timedelta.
Solution comes from v1 client. Thx. https://github.com/influxdata/influxdb-python/pull/811
-
-
class
influxdb_client.client.util.multiprocessing_helper.
MultiprocessingWriter
(**kwargs)[source]¶ The Helper class to write data into InfluxDB in independent OS process.
- Example:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) writer.start() for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") writer.__del__() if __name__ == '__main__': main()
- How to use with context_manager:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
- How to handle batch events:
from influxdb_client import WriteOptions from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def main(): callback = BatchingCallback() with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
Initialize defaults.
For more information how to initialize the writer see the examples above.
Parameters: kwargs – arguments are passed into __init__
function ofInfluxDBClient
andwrite_api
.