Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions iotdb-core/ainode/iotdb/ainode/core/ai_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
#
import os
import shutil
import signal
import threading
from datetime import datetime
Expand Down Expand Up @@ -83,6 +84,42 @@ def _generate_system_properties(ainode_id: int):
}


def _backup_system_properties(system_properties_file: str):
if not os.path.exists(system_properties_file):
return None
backup_file = "{}.{}.bak".format(
system_properties_file, datetime.now().strftime("%Y%m%d%H%M%S%f")
)
shutil.copy2(system_properties_file, backup_file)
logger.warning("Backed up AINode system properties to", backup_file)
return backup_file


def _write_system_properties(system_properties_file: str, system_properties):
tmp_file = system_properties_file + ".tmp"
try:
with open(tmp_file, "w") as f:
f.write("#" + str(datetime.now()) + "\n")
for key, value in system_properties.items():
f.write(key + "=" + str(value) + "\n")
os.replace(tmp_file, system_properties_file)
except Exception:
if os.path.exists(tmp_file):
os.remove(tmp_file)
raise


def _verify_registered_ainode_id(system_properties_file: str):
ainode_id = AINodeDescriptor().get_config().get_ainode_id()
if ainode_id < 0:
_backup_system_properties(system_properties_file)
raise RuntimeError(
"AINode system.properties exists but does not contain a valid ainode_id. "
"Please restore the local system.properties or explicitly remove it before "
"registering a new AINode."
)


class AINode:
def __init__(self):
self._rpc_service = None
Expand Down Expand Up @@ -110,10 +147,7 @@ def start(self):
)
AINodeDescriptor().get_config().set_ainode_id(ainode_id)
system_properties = _generate_system_properties(ainode_id)
with open(system_properties_file, "w") as f:
f.write("#" + str(datetime.now()) + "\n")
for key, value in system_properties.items():
f.write(key + "=" + str(value) + "\n")
_write_system_properties(system_properties_file, system_properties)
except Exception as e:
logger.error(
"IoTDB-AINode failed to register to IoTDB cluster: {}".format(e)
Expand All @@ -122,13 +156,21 @@ def start(self):
else:
# If the system.properties file does exist, the AINode will just restart.
try:
_verify_registered_ainode_id(system_properties_file)
logger.info("IoTDB-AINode is restarting...")
ClientManager().borrow_config_node_client().node_restart(
AINodeDescriptor().get_config().get_cluster_name(),
_generate_configuration(),
_generate_version_info(),
)
except Exception as e:
if AINodeDescriptor().get_config().get_ainode_id() >= 0:
try:
_backup_system_properties(system_properties_file)
except Exception as backup_error:
logger.warning(
"Failed to back up AINode system properties:", backup_error
)
logger.error("IoTDB-AINode failed to restart: {}".format(e))
raise e

Expand Down
233 changes: 122 additions & 111 deletions iotdb-core/ainode/iotdb/ainode/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
import ast
import os
import re

Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(self):
self._build_info = AINODE_BUILD_INFO

# Cluster configuration
self._ainode_id = 0
self._ainode_id = -1
self._cluster_name = AINODE_CLUSTER_NAME
self._ain_target_config_node_list: TEndPoint = AINODE_TARGET_CONFIG_NODE_LIST
self._ain_rpc_address: str = AINODE_RPC_ADDRESS
Expand Down Expand Up @@ -294,14 +295,6 @@ def __init__(self):
logger.info("AINodeDescriptor is init successfully.")

def _load_config_from_file(self) -> None:
system_properties_file = os.path.join(
self._config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME
)
if os.path.exists(system_properties_file):
system_configs = load_properties(system_properties_file)
if "ainode_id" in system_configs:
self._config.set_ainode_id(int(system_configs["ainode_id"]))

git_file = os.path.join(AINODE_CONF_DIRECTORY_NAME, AINODE_CONF_GIT_FILE_NAME)
if os.path.exists(git_file):
git_configs = load_properties(git_file)
Expand All @@ -325,112 +318,130 @@ def _load_config_from_file(self) -> None:
conf_file
)
)
return

# noinspection PyBroadException
try:
file_configs = load_properties(conf_file)

config_keys = file_configs.keys()

if "ain_rpc_address" in config_keys:
self._config.set_ain_rpc_address(file_configs["ain_rpc_address"])

if "ain_rpc_port" in config_keys:
self._config.set_ain_rpc_port(int(file_configs["ain_rpc_port"]))

if "ain_inference_batch_interval_in_ms" in config_keys:
self._config.set_ain_inference_batch_interval_in_ms(
int(file_configs["ain_inference_batch_interval_in_ms"])
else:
# noinspection PyBroadException
try:
file_configs = load_properties(conf_file)

config_keys = file_configs.keys()

if "ain_rpc_address" in config_keys:
self._config.set_ain_rpc_address(file_configs["ain_rpc_address"])

if "ain_rpc_port" in config_keys:
self._config.set_ain_rpc_port(int(file_configs["ain_rpc_port"]))

if "ain_inference_batch_interval_in_ms" in config_keys:
self._config.set_ain_inference_batch_interval_in_ms(
int(file_configs["ain_inference_batch_interval_in_ms"])
)

if "ain_inference_model_mem_usage_map" in config_keys:
self._config.set_ain_inference_model_mem_usage_map(
ast.literal_eval(
file_configs["ain_inference_model_mem_usage_map"]
)
)

if "ain_inference_memory_usage_ratio" in config_keys:
self._config.set_ain_inference_memory_usage_ratio(
float(file_configs["ain_inference_memory_usage_ratio"])
)

if "ain_inference_extra_memory_ratio" in config_keys:
self._config.set_ain_inference_extra_memory_ratio(
float(file_configs["ain_inference_extra_memory_ratio"])
)

if "ain_models_dir" in config_keys:
self._config.set_ain_models_dir(file_configs["ain_models_dir"])

if "ain_models_builtin_dir" in config_keys:
self._config.set_ain_models_builtin_dir(
file_configs["ain_models_builtin_dir"]
)

if "ain_system_dir" in config_keys:
self._config.set_ain_system_dir(file_configs["ain_system_dir"])

if "ain_seed_config_node" in config_keys:
self._config.set_ain_target_config_node_list(
file_configs["ain_seed_config_node"]
)

if "cluster_name" in config_keys:
self._config.set_cluster_name(file_configs["cluster_name"])

if "ain_thrift_compression_enabled" in config_keys:
self._config.set_ain_thrift_compression_enabled(
int(file_configs["ain_thrift_compression_enabled"])
)

if "ain_cluster_ingress_ssl_enabled" in config_keys:
self._config.set_ain_cluster_ingress_ssl_enabled(
int(file_configs["ain_cluster_ingress_ssl_enabled"])
)

if "ain_thrift_ssl_cert_file" in config_keys:
self._config.set_ain_thrift_ssl_cert_file(
file_configs["ain_thrift_ssl_cert_file"]
)

if "ain_thrift_ssl_key_file" in config_keys:
self._config.set_ain_thrift_ssl_key_file(
file_configs["ain_thrift_ssl_key_file"]
)

if "ain_logs_dir" in config_keys:
log_dir = file_configs["ain_logs_dir"]
self._config.set_ain_logs_dir(log_dir)

if "ain_cluster_ingress_address" in config_keys:
self._config.set_ain_cluster_ingress_address(
file_configs["ain_cluster_ingress_address"]
)

if "ain_cluster_ingress_port" in config_keys:
self._config.set_ain_cluster_ingress_port(
int(file_configs["ain_cluster_ingress_port"])
)

if "ain_cluster_ingress_username" in config_keys:
self._config.set_ain_cluster_ingress_username(
file_configs["ain_cluster_ingress_username"]
)

if "ain_cluster_ingress_password" in config_keys:
self._config.set_ain_cluster_ingress_password(
file_configs["ain_cluster_ingress_password"]
)

except BadNodeUrlException:
logger.warning(
"Cannot load AINode conf file, use default configuration."
)

if "ain_inference_model_mem_usage_map" in config_keys:
self._config.set_ain_inference_model_mem_usage_map(
eval(file_configs["ain_inference_model_mem_usage_map"])
)

if "ain_inference_memory_usage_ratio" in config_keys:
self._config.set_ain_inference_memory_usage_ratio(
float(file_configs["ain_inference_memory_usage_ratio"])
)

if "ain_inference_extra_memory_ratio" in config_keys:
self._config.set_ain_inference_extra_memory_ratio(
float(file_configs["ain_inference_extra_memory_ratio"])
)

if "ain_models_dir" in config_keys:
self._config.set_ain_models_dir(file_configs["ain_models_dir"])

if "ain_models_builtin_dir" in config_keys:
self._config.set_ain_models_builtin_dir(
file_configs["ain_models_builtin_dir"]
)

if "ain_system_dir" in config_keys:
self._config.set_ain_system_dir(file_configs["ain_system_dir"])

if "ain_seed_config_node" in config_keys:
self._config.set_ain_target_config_node_list(
file_configs["ain_seed_config_node"]
)

if "cluster_name" in config_keys:
self._config.set_cluster_name(file_configs["cluster_name"])

if "ain_thrift_compression_enabled" in config_keys:
self._config.set_ain_thrift_compression_enabled(
int(file_configs["ain_thrift_compression_enabled"])
except Exception as e:
logger.warning(
"Cannot load AINode conf file caused by: {}, use default configuration. ".format(
e
)
)

if "ain_cluster_ingress_ssl_enabled" in config_keys:
self._config.set_ain_cluster_ingress_ssl_enabled(
int(file_configs["ain_cluster_ingress_ssl_enabled"])
)

if "ain_thrift_ssl_cert_file" in config_keys:
self._config.set_ain_thrift_ssl_cert_file(
file_configs["ain_thrift_ssl_cert_file"]
)

if "ain_thrift_ssl_key_file" in config_keys:
self._config.set_ain_thrift_ssl_key_file(
file_configs["ain_thrift_ssl_key_file"]
)

if "ain_logs_dir" in config_keys:
log_dir = file_configs["ain_logs_dir"]
self._config.set_ain_logs_dir(log_dir)

if "ain_cluster_ingress_address" in config_keys:
self._config.set_ain_cluster_ingress_address(
file_configs["ain_cluster_ingress_address"]
)

if "ain_cluster_ingress_port" in config_keys:
self._config.set_ain_cluster_ingress_port(
int(file_configs["ain_cluster_ingress_port"])
)

if "ain_cluster_ingress_username" in config_keys:
self._config.set_ain_cluster_ingress_username(
file_configs["ain_cluster_ingress_username"]
)

if "ain_cluster_ingress_password" in config_keys:
self._config.set_ain_cluster_ingress_password(
file_configs["ain_cluster_ingress_password"]
)

except BadNodeUrlException:
logger.warning("Cannot load AINode conf file, use default configuration.")

except Exception as e:
logger.warning(
"Cannot load AINode conf file caused by: {}, use default configuration. ".format(
e
)
)
system_properties_file = os.path.join(
self._config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME
)
if os.path.exists(system_properties_file):
system_configs = load_properties(system_properties_file)
if "ainode_id" in system_configs:
try:
self._config.set_ainode_id(int(system_configs["ainode_id"]))
except ValueError:
logger.warning(
"Cannot load ainode_id from '{}', keep AINode unregistered.".format(
system_properties_file
)
)

def get_config(self) -> AINodeConfig:
return self._config
Expand Down
11 changes: 10 additions & 1 deletion iotdb-core/ainode/iotdb/ainode/core/rpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def node_restart(
cluster_name: str,
configuration: TAINodeConfiguration,
version_info: TNodeVersionInfo,
) -> None:
) -> TSStatus:
req = TAINodeRestartReq(
clusterName=cluster_name,
aiNodeConfiguration=configuration,
Expand All @@ -212,6 +212,15 @@ def node_restart(
try:
resp = self._client.restartAINode(req)
if not self._update_config_node_leader(resp.status):
if (
resp.status.code
!= TSStatusCode.SUCCESS_STATUS.get_status_code()
):
logger.warning(
"AINode restart is rejected by ConfigNode. "
"The local system.properties will be kept and AINode will not "
"register a new id automatically."
)
verify_success(
resp.status, "An error occurs when calling node_restart()"
)
Expand Down
Loading