# -*- coding: utf-8 -*-
"""
This module implements all "Operation" mentioned in :ref:`operation-and-workflow`.
Each Operation represents a discrete task on the server. Typically,
these operation methods accept the following parameters:
1. ``bsm``: An AWS Boto3 Session Manager object.
2. ``check``: A boolean flag that determines whether to verify
if the current state meets the operation's prerequisites.
3. ``wait``: A boolean flag that specifies whether to await
completion of asynchronous operations.
4. ``auto_resolve``: A boolean flag that controls automatic state resolution.
When set to True, the system will:
- Attempt to wait for the state to meet the operation's prerequisites
if they're not initially satisfied.
- Throw an exception if it's impossible to reach the required state
within a reasonable timeframe.
It is only used when ``check`` is set to True.
"""
import typing as T
from pathlib import Path
from datetime import datetime, timezone
import acore_server_metadata.exc
from boto_session_manager import BotoSesManager
from aws_ssm_run_command.api import better_boto as ssm_better_boto
import simple_aws_ec2.api as simple_aws_ec2
import simple_aws_rds.api as simple_aws_rds
from acore_paths.api import path_acore_server_bootstrap_cli
from acore_constants.api import TagKey
from acore_server_config.api import EnvEnum
from acore_db_ssh_tunnel import api as acore_db_ssh_tunnel
from .vendor.hashes import hashes
from .constants import EC2_USERNAME, DB_PORT
from .wserver_infra_exports import StackExports
from .logger import logger
from .utils import get_utc_now
if T.TYPE_CHECKING: # pragma: no cover
from .server import Server
from mypy_boto3_ec2.type_defs import (
CreateImageResultTypeDef,
ReservationResponseTypeDef,
StartInstancesResultTypeDef,
StopInstancesResultTypeDef,
TerminateInstancesResultTypeDef,
)
from mypy_boto3_rds.type_defs import (
CreateDBSnapshotResultTypeDef,
CreateDBInstanceResultTypeDef,
RestoreDBInstanceFromDBSnapshotResultTypeDef,
StartDBInstanceResultTypeDef,
StopDBInstanceResultTypeDef,
DeleteDBInstanceResultTypeDef,
)
[docs]class ServerOperationMixin: # pragma: no cover
"""
Server Operation Mixin class that contains all the server operation methods.
"""
def _get_ec2_ami_name(
self: "Server",
utc_now: T.Optional[datetime] = None,
) -> str:
"""
Get the EC2 AMI id for this server, the snapshot id
naming convention is "${server_id}-%Y-%m-%d-%H-%M-%S".
"""
if utc_now is None:
utc_now = get_utc_now()
snapshot_id = "{}-{}".format(
self.id,
utc_now.strftime("%Y-%m-%d-%H-%M-%S"),
)
return snapshot_id
def _get_db_snapshot_id(
self: "Server",
utc_now: T.Optional[datetime] = None,
) -> str:
"""
Get the db snapshot id for this server, the snapshot id
naming convention is "${server_id}-%Y-%m-%d-%H-%M-%S".
"""
if utc_now is None:
utc_now = get_utc_now()
snapshot_id = "{}-{}".format(
self.id,
utc_now.strftime("%Y-%m-%d-%H-%M-%S"),
)
return snapshot_id
[docs] def build_bootstrap_command(
self: "Server",
python_version: str = "3",
acore_soap_app_version: T.Optional[str] = None,
acore_db_app_version: T.Optional[str] = None,
acore_server_bootstrap_version: T.Optional[str] = None,
) -> str:
"""
构建需要在 EC2 服务器上运行的 bootstrap 命令. 如果没有指定版本号, 则使用 config 中的版本号.
如果 config 中也没有版本号, 那么就使用 main branch 上的最新版本.
:param python_version: pyenv 中的 Python 版本号. 可以是 3, 3.8, 3.9 等.
:param acore_soap_app_version: `acore_soap_app <https://github.com/MacHu-GWU/acore_soap_app-project/releases>`_ 库的版本.
:param acore_db_app_version: `acore_db_app <https://github.com/MacHu-GWU/acore_db_app-project/releases>`_ 库的版本.
:param acore_server_bootstrap_version: `acore_server_bootstrap <https://github.com/MacHu-GWU/acore_server_bootstrap-project/releases>`_ 库的版本.
"""
script_url = "https://raw.githubusercontent.com/MacHu-GWU/acore_server_bootstrap-project/main/install.py"
main_part = (
f"sudo /home/ubuntu/.pyenv/shims/python{python_version} -c "
f'"$(curl -fsSL {script_url})"'
)
parts = [main_part]
if acore_soap_app_version:
parts.append(f"--acore_soap_app_version {acore_soap_app_version}")
else:
if self.config.acore_soap_app_version:
parts.append(
f"--acore_soap_app_version {self.config.acore_soap_app_version}"
)
if acore_db_app_version:
parts.append(f"--acore_db_app_version {acore_db_app_version}")
else:
if self.config.acore_db_app_version:
parts.append(
f"--acore_db_app_version {self.config.acore_db_app_version}"
)
if acore_server_bootstrap_version:
parts.append(
f"--acore_server_bootstrap_version {acore_server_bootstrap_version}"
)
else:
if self.config.acore_server_bootstrap_version:
parts.append(
f"--acore_server_bootstrap_version {self.config.acore_server_bootstrap_version}"
)
cmd = " ".join(parts)
return cmd
[docs] @logger.emoji_block(
msg="🆕🖥Create EC2 Instance",
emoji="🖥",
)
def create_ec2(
self: "Server",
bsm: "BotoSesManager",
stack_exports: "StackExports",
ami_id: T.Optional[str] = None,
instance_type: T.Optional[str] = None,
python_version: str = "3",
acore_soap_app_version: T.Optional[str] = None,
acore_db_app_version: T.Optional[str] = None,
acore_server_bootstrap_version: T.Optional[str] = None,
tags: T.Optional[T.Dict[str, str]] = None,
check: bool = True,
wait: bool = True,
**kwargs,
) -> "ReservationResponseTypeDef":
"""
为服务器创建一台新的 EC2. 注意, 一般先创建 RDS, 等 RDS 已经在运行了, 再创建 EC2.
因为 EC2 有一个 bootstrap 的过程, 这个过程中需要跟数据库通信. 数据库没有准备好
bootstrap 是不可能成功的.
:param bsm: Boto3 Session Manager.
:param stack_exports: cloudformation stack exports object that contains
AWS infrastructure information.
:param ami_id: 你要从哪个 AMI 来创建 EC2? 如果不指定, 则使用 config 中的 AMI ID.
这个参数之所以是可选是因为在有的 workflow 中, 我们已经知道 AMI ID 了;
而有的 workflow 中, 我们要临时创建一个新的 AMI ID, 此时还不知道这个 ID.
:param instance_type: EC2 的 instance type 是什么, 如果不指定, 则使用 config
中的值.
:param python_version: see :meth:`build_bootstrap_command`.
:param acore_soap_app_version: see :meth:`build_bootstrap_command`.
:param acore_db_app_version: see :meth:`build_bootstrap_command`.
:param acore_server_bootstrap_version: see :meth:`build_bootstrap_command`.
:param tags: additional AWS tags to add to the EC2 instance.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
bootstrap_command = self.build_bootstrap_command(
python_version=python_version,
acore_soap_app_version=acore_soap_app_version,
acore_db_app_version=acore_db_app_version,
acore_server_bootstrap_version=acore_server_bootstrap_version,
)
user_data_lines = [
"#!/bin/bash",
bootstrap_command,
]
if check:
self.metadata.ensure_ec2_not_exists()
if tags is None:
tags = dict()
tags["Name"] = self.id
tags[TagKey.SERVER_ID] = self.id # the realm tag indicator has to match
tags["tech:machine_creator"] = "acore_server_metadata"
if ami_id is None:
ami_id = self.config.ec2_ami_id
if instance_type is None:
instance_type = self.config.ec2_instance_type
run_instances_kwargs = dict(
ImageId=ami_id,
InstanceType=instance_type,
# only launch one instance for each realm
MinCount=1,
MaxCount=1,
KeyName=self.config.ec2_key_name,
SecurityGroupIds=[
stack_exports.get_default_sg_id(server_id=self.id),
stack_exports.get_ec2_sg_id(server_id=self.id),
stack_exports.get_ssh_sg_id(),
],
SubnetId=self.config.ec2_subnet_id,
IamInstanceProfile=dict(Arn=stack_exports.get_ec2_instance_profile_arn()),
TagSpecifications=[
dict(
ResourceType="instance",
Tags=[dict(Key=k, Value=v) for k, v in tags.items()],
),
],
**kwargs,
)
if any(
[
acore_soap_app_version,
acore_db_app_version,
acore_server_bootstrap_version,
]
):
kwargs["UserData"] = "\n".join(user_data_lines)
res = bsm.ec2_client.run_instances(**run_instances_kwargs)
if wait:
inst_id = res["Instances"][0]["InstanceId"]
ec2_inst = simple_aws_ec2.Ec2Instance(id=inst_id, status="na")
_ec2_inst = ec2_inst.wait_for_running(
ec2_client=bsm.ec2_client, timeout=300
)
self.metadata.ec2_inst = _ec2_inst
return res
[docs] @logger.emoji_block(
msg="🆕🛢Create RDS from scratch",
emoji="🛢",
)
def create_rds_from_scratch(
self: "Server",
bsm: "BotoSesManager",
stack_exports: "StackExports",
db_instance_class: T.Optional[str] = None,
engine_version: T.Optional[str] = None,
multi_az: T.Optional[bool] = None,
tags: T.Optional[T.Dict[str, str]] = None,
check: bool = True,
wait: bool = True,
**kwargs,
) -> "CreateDBInstanceResultTypeDef":
"""
不使用 DB Snapshot, 创建一台全新的数据库.
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/create_db_instance.html
:param bsm: Boto3 Session Manager.
:param stack_exports: cloudformation stack exports object that contains
AWS infrastructure information.
:param db_instance_class: Database 的 instance type 是什么, 如果不指定, 则使用 config
中的值.
:param engine_version: database engine version, in this project we use 8.X.Y (mysql).
如果不指定, 则使用 config 中的值.
:param multi_az: 是否启用 Multi-AZ 高可用性. 如果不指定, 则使用 config 中的值.
:param tags: additional AWS tags to add to the EC2 instance.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_rds_not_exists()
if tags is None:
tags = dict()
tags[TagKey.SERVER_ID] = self.id
tags["tech:machine_creator"] = "acore_server"
hashes.use_sha256()
digest = hashes.of_str(self.config.db_admin_password, hexdigest=True)
tags["tech:master_password_digest"] = digest
if db_instance_class is None:
db_instance_class = self.config.db_instance_class
if engine_version is None:
engine_version = self.config.db_engine_version
if multi_az is None:
multi_az = False
res = bsm.rds_client.create_db_instance(
DBInstanceIdentifier=self.id,
DBInstanceClass=db_instance_class,
Engine="mysql",
EngineVersion=engine_version,
MasterUsername=self.config.db_admin_username,
MasterUserPassword=self.config.db_admin_password,
MultiAZ=multi_az,
DBSubnetGroupName=stack_exports.get_db_subnet_group_name(),
PubliclyAccessible=False, # you should never expose your database to the public
AutoMinorVersionUpgrade=False, # don't update MySQL minor version, PLEASE!
VpcSecurityGroupIds=[
stack_exports.get_default_sg_id(server_id=self.id),
],
CopyTagsToSnapshot=True,
Tags=[dict(Key=k, Value=v) for k, v in tags.items()],
**kwargs,
)
if wait:
rds_inst = simple_aws_rds.RDSDBInstance(id=self.id, status="na")
_rds_inst = rds_inst.wait_for_available(
rds_client=bsm.rds_client, timeout=900
)
self.metadata.rds_inst = _rds_inst
return res
[docs] @logger.emoji_block(
msg="🆕🛢Create RDS from snapshot",
emoji="🛢",
)
def create_rds_from_snapshot(
self: "Server",
bsm: "BotoSesManager",
stack_exports: "StackExports",
db_snapshot_id: T.Optional[str] = None,
db_instance_class: T.Optional[str] = None,
multi_az: T.Optional[bool] = None,
tags: T.Optional[T.Dict[str, str]] = None,
check: bool = True,
wait: bool = True,
**kwargs,
) -> "RestoreDBInstanceFromDBSnapshotResultTypeDef":
"""
为服务器创建一台新的数据库.
:param bsm: Boto3 Session Manager.
:param stack_exports: cloudformation stack exports object that contains
AWS infrastructure information.
:param db_snapshot_id: 要从哪个 DB Snapshot 来创建 RDS? 如果不指定,
则使用 config 中的 DB Snapshot ID. 这个参数之所以是可选是因为在有的 workflow 中,
我们已经知道 DB Snapshot ID 了; 而有的 workflow 中, 我们要临时创建一个新的
DB Snapshot ID, 此时还不知道这个 ID.
:param db_instance_class: Database 的 instance type 是什么, 如果不指定, 则使用 config
中的值.
:param multi_az: 是否启用 Multi-AZ 高可用性. 如果不指定, 则使用 config 中的值.
:param tags: additional AWS tags to add to the EC2 instance.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_rds_not_exists()
if tags is None:
tags = dict()
tags[TagKey.SERVER_ID] = self.id
tags["tech:machine_creator"] = "acore_server"
if db_snapshot_id is None:
db_snapshot_id = self.config.db_snapshot_id
if db_instance_class is None:
db_instance_class = self.config.db_instance_class
if multi_az is None:
multi_az = False
# locate master password digest from snapshot tags
res = bsm.rds_client.describe_db_snapshots(
DBSnapshotIdentifier=db_snapshot_id,
)
db_snapshot_list = res.get("DBSnapshots", [])
if len(db_snapshot_list):
db_snapshot_tags = {
dct["Key"]: dct["Value"]
for dct in db_snapshot_list[0].get("TagList", [])
}
master_password_digest = db_snapshot_tags.get("tech:master_password_digest")
if master_password_digest:
tags["tech:master_password_digest"] = master_password_digest
res = bsm.rds_client.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier=self.id,
DBSnapshotIdentifier=db_snapshot_id,
DBInstanceClass=db_instance_class,
MultiAZ=multi_az,
DBSubnetGroupName=stack_exports.get_db_subnet_group_name(),
PubliclyAccessible=False, # you should never expose your database to the public
AutoMinorVersionUpgrade=False, # don't update MySQL minor version, PLEASE!
VpcSecurityGroupIds=[
stack_exports.get_default_sg_id(server_id=self.id),
],
CopyTagsToSnapshot=True,
Tags=[dict(Key=k, Value=v) for k, v in tags.items()],
**kwargs,
)
if wait:
rds_inst = simple_aws_rds.RDSDBInstance(id=self.id, status="na")
_rds_inst = rds_inst.wait_for_available(
rds_client=bsm.rds_client, timeout=900
)
self.metadata.rds_inst = _rds_inst
return res
[docs] @logger.emoji_block(
msg="🟢🖥Start EC2 instance",
emoji="🖥",
)
def start_ec2(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
wait: bool = True,
auto_resolve: bool = True,
) -> T.Optional["StartInstancesResultTypeDef"]:
"""
启动关闭着的 EC2.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
:param auto_resolve: if True, wait RDS to be ready to start, when EC2 is still stopping.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_ec2_exists()
# 如果已经在运行了, 那么已经达到目的, 直接返回既可
if self.metadata.ec2_inst.is_running():
return
# 看看是不是正在启动中, 如果是, 那么等待它启动完成后直接返回既可
if wait:
try:
_ec2_inst = self.metadata.ec2_inst.wait_for_running(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
return
except simple_aws_ec2.StatusError:
pass
# 如果还没有启动, 那么看看是不是已经准备好了, 如果没有准备好, 并且 auto_resolve = True, 那么等待它准备好
if self.metadata.ec2_inst.is_ready_to_start() is False:
if auto_resolve:
_ec2_inst = self.metadata.ec2_inst.wait_for_stopped(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
else:
self.metadata.ensure_ec2_is_ready_to_start()
ec2_inst = self.metadata.ec2_inst
res = ec2_inst.start_instance(ec2_client=bsm.ec2_client)
if wait:
_ec2_inst = ec2_inst.wait_for_running(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
return res
[docs] @logger.emoji_block(
msg="🟢🛢Start RDS instance",
emoji="🛢",
)
def start_rds(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
wait: bool = True,
auto_resolve: bool = True,
) -> T.Optional["StartDBInstanceResultTypeDef"]:
"""
启动关闭着的 RDS.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
:param auto_resolve: if True, wait RDS to be ready to start, when RDS is still stopping.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_rds_exists()
if self.metadata.rds_inst.is_available():
return None
if wait:
try:
_rds_inst = self.metadata.rds_inst.wait_for_available(
rds_client=bsm.rds_client,
timeout=900,
)
self.metadata.rds_inst = _rds_inst
return
except simple_aws_rds.StatusError:
pass
if self.metadata.rds_inst.is_ready_to_start() is False:
if auto_resolve:
_rds_inst = self.metadata.rds_inst.wait_for_stopped(
rds_client=bsm.rds_client,
timeout=900,
)
self.metadata.rds_inst = _rds_inst
else:
self.metadata.ensure_rds_is_ready_to_start()
rds_inst = self.metadata.rds_inst
res = rds_inst.start_db_instance(rds_client=bsm.rds_client)
if wait:
_rds_inst = rds_inst.wait_for_available(
rds_client=bsm.rds_client,
timeout=900,
)
self._rds_inst = _rds_inst
return res
[docs] @logger.emoji_block(
msg="Associate EIP Address",
emoji="🖥",
)
def associate_eip_address(
self: "Server",
bsm: "BotoSesManager",
allow_reassociation: bool = False,
check: bool = True,
) -> T.Optional[dict]:
"""
给 EC2 关联 EIP.
"""
if self.config.ec2_eip_allocation_id is not None:
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_ec2_exists()
ec2_inst = self.metadata.ec2_inst
# check if this allocation id is already associated with an instance
res = bsm.ec2_client.describe_addresses(
AllocationIds=[self.config.ec2_eip_allocation_id]
)
address_data = res["Addresses"][0]
instance_id = address_data.get("InstanceId", "invalid-instance-id")
if instance_id == ec2_inst.id: # already associated
return None
# associate eip address
return bsm.ec2_client.associate_address(
AllocationId=self.config.ec2_eip_allocation_id,
InstanceId=ec2_inst.id,
AllowReassociation=allow_reassociation,
)
return None
[docs] @logger.emoji_block(
msg="Update db master password",
emoji="🛢",
)
def update_db_master_password(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
) -> T.Optional[dict]:
"""
更新数据库的 master password.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
rds_inst = self.metadata.ensure_rds_exists()
else:
rds_inst = self.metadata.rds_inst
master_password = self.config.db_admin_password
hashes.use_sha256()
master_password_digest = hashes.of_str(master_password, hexdigest=True)
if (
rds_inst.tags.get("tech:master_password_digest", "invalid")
== master_password_digest
):
# do nothing
return None
response = bsm.rds_client.modify_db_instance(
DBInstanceIdentifier=rds_inst.id,
MasterUserPassword=master_password,
ApplyImmediately=True,
)
bsm.rds_client.add_tags_to_resource(
ResourceName=rds_inst.db_instance_arn,
Tags=[
dict(Key="tech:master_password_digest", Value=master_password_digest)
],
)
return response
[docs] @logger.emoji_block(
msg="🔴🖥Stop EC2 instance",
emoji="🖥",
)
def stop_ec2(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
wait: bool = True,
auto_resolve: bool = True,
) -> T.Optional["StopInstancesResultTypeDef"]:
"""
关闭 EC2.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
:param auto_resolve: if True, wait EC2 to be ready to stop, when EC2 is still pending.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_ec2_exists()
if self.metadata.ec2_inst.is_stopped():
return None
if wait:
try:
_ec2_inst = self.metadata.ec2_inst.wait_for_stopped(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
return
except simple_aws_ec2.StatusError:
pass
if self.metadata.ec2_inst.is_ready_to_stop() is False:
if auto_resolve:
_ec2_inst = self.metadata.ec2_inst.wait_for_running(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
else:
self.metadata.ensure_ec2_is_ready_to_stop()
ec2_inst = self.metadata.ec2_inst
res = ec2_inst.stop_instance(ec2_client=bsm.ec2_client)
if wait:
_ec2_inst = ec2_inst.wait_for_stopped(
ec2_client=bsm.ec2_client,
timeout=300,
)
self.metadata.ec2_inst = _ec2_inst
return res
[docs] @logger.emoji_block(
msg="🔴🛢Stop RDS instance",
emoji="🛢",
)
def stop_rds(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
wait: bool = True,
auto_resolve: bool = True,
) -> T.Optional["StopDBInstanceResultTypeDef"]:
"""
关闭 RDS.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
:param auto_resolve: if True, wait RDS to be ready to stop, when RDS is still starting.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_rds_exists()
if self.metadata.rds_inst.is_stopped():
return None
if wait:
try:
_rds_inst = self.metadata.rds_inst.wait_for_stopped(
rds_client=bsm.rds_client,
timeout=900,
)
self.metadata.rds_inst = _rds_inst
return
except simple_aws_rds.StatusError:
pass
if self.metadata.rds_inst.is_ready_to_stop() is False:
if auto_resolve:
_rds_inst = self.metadata.rds_inst.wait_for_available(
rds_client=bsm.rds_client,
timeout=900,
)
self.metadata.rds_inst = _rds_inst
else:
self.metadata.ensure_rds_is_ready_to_stop()
rds_inst = self.metadata.rds_inst
res = rds_inst.stop_db_instance(rds_client=bsm.rds_client)
if wait:
_rds_inst = rds_inst.wait_for_stopped(
rds_client=bsm.rds_client,
timeout=300,
)
self.metadata.rds_inst = _rds_inst
return res
[docs] @logger.emoji_block(
msg="🗑🖥Stop EC2 instance",
emoji="🖥",
)
def delete_ec2(
self: "Server",
bsm: "BotoSesManager",
check: bool = True,
wait: bool = True,
) -> "TerminateInstancesResultTypeDef":
"""
删除 EC2.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if check:
self.metadata.ensure_ec2_exists()
ec2_inst = self.metadata.ec2_inst
res = ec2_inst.terminate_instance(ec2_client=bsm.ec2_client)
if wait:
ec2_inst.wait_for_terminated(ec2_client=bsm.ec2_client, timeout=300)
self.metadata.ec2_inst = None
return res
[docs] @logger.emoji_block(
msg="🗑🛢Delete EC2 instance",
emoji="🛢",
)
def delete_rds(
self: "Server",
bsm: "BotoSesManager",
create_final_snapshot: T.Optional[bool] = None,
check: bool = True,
) -> "DeleteDBInstanceResultTypeDef":
"""
删除 RDS.
:param bsm: Boto3 Session Manager.
:param check: if True, check if it meets the prerequisites for this operation.
todo: add waiter
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if create_final_snapshot is None:
create_final_snapshot = self.env_name == EnvEnum.prd.value
if check:
self.metadata.ensure_rds_exists()
rds_inst = self.metadata.rds_inst
if create_final_snapshot:
snapshot_id = self._get_db_snapshot_id()
res = rds_inst.delete_db_instance(
rds_client=bsm.rds_client,
skip_final_snapshot=False,
final_db_snapshot_identifier=snapshot_id,
delete_automated_backups=False,
)
else:
res = rds_inst.delete_db_instance(
rds_client=bsm.rds_client,
skip_final_snapshot=True,
delete_automated_backups=True,
)
return res
[docs] @logger.emoji_block(
msg="🖥Stop worldserver",
emoji="🖥",
)
def stop_worldserver(
self: "Server",
bsm: "BotoSesManager",
):
"""
停止魔兽世界游戏服务器. 这个命令不会失败. 它只是一个 async API call.
这个命令比较特殊, 它建立在服务器已经成功配置好了
`acore_server_bootstrap@1.0.1+ <https://github.com/MacHu-GWU/acore_server_bootstrap-project>`_
的前提上 (用到了 `stop_server <https://acore-server-bootstrap.readthedocs.io/en/latest/acore_server_bootstrap/remoter.html#acore_server_bootstrap.remoter.Remoter.stop_server>`_ 这个命令).
按理说我们这个库的 requirements 里没有依赖于 ``acore_server_bootstrap``,
但是实际上依赖了. 因为我们在运行 Stop Server workflow 的过程中需要有这一步.
所以这个函数算是例外了.
:param bsm: Boto3 Session Manager.
"""
return ssm_better_boto.run_shell_script_async(
ssm_client=bsm.ssm_client,
commands=(
f"sudo -H -u ubuntu " f"{path_acore_server_bootstrap_cli} stop_server"
),
instance_ids=self.metadata.ec2_inst.id,
)
@property
def wow_status(self: "Server") -> str:
"""
从 EC2 的 Tag 中获取魔兽世界服务器的状态.
1. 如果 EC2 或 RDS 任意一个不存在或是已被删除 则返回 "deleted".
2. 如果 EC2 或 RDS 不在线, 则返回 ``stopped``.
3. 如果 EC2 或 RDS 都在线, tag 中不存在测量数据, 则返回 "stopped"
4. 如果 EC2 或 RDS 都在线, tag 中有测量数据且没有过期, 则返回 tag 中的数据, 值可能是
"123 players" (数字是在线人数), 或是 "stopped"
5. 如果 EC2 或 RDS 都在线, tag 中有测量数据且过期了, 则返回 "stopped"
"""
if self.metadata.is_exists() is False:
return "deleted"
elif self.metadata.is_running() is False:
return "stopped"
elif TagKey.WOW_STATUS not in self.metadata.ec2_inst.tags:
return "stopped"
else:
wow_status = self.metadata.ec2_inst.tags[TagKey.WOW_STATUS]
measure_time = self.metadata.ec2_inst.tags[TagKey.WOW_STATUS_MEASURE_TIME]
measure_time = datetime.fromisoformat(measure_time)
utc_now = datetime.utcnow().replace(tzinfo=timezone.utc)
elapsed = (utc_now - measure_time).total_seconds()
if elapsed > 300:
return "stopped"
else:
return wow_status
def _get_path_pem_file(self: "Server", bsm: "BotoSesManager") -> Path:
path = (
Path.home()
.joinpath("ec2-pem")
.joinpath(
bsm.aws_account_alias,
bsm.aws_region,
f"{self.metadata.ec2_inst.key_name}.pem",
)
)
if path.exists() is False:
raise FileNotFoundError(
f"Failed to locate the pem file: {path}, "
"make sure you have the pem file at ${HOME}/ec2-pem/${AWS_ACCOUNT_ALIAS}/${AWS_REGION}/${KEY_NAME}.pem"
)
return path
[docs] def create_ssh_tunnel(
self: "Server",
bsm: "BotoSesManager",
path_pem_file: T.Optional[Path] = None,
):
"""
创建一个本地的 SSH Tunnel, 用于本地数据库开发.
:param bsm: Boto3 Session Manager.
:param path_pem_file: EC2 的 pem 文件路径. 如果不指定, 则使用 :meth:`_get_path_pem_file`
的逻辑来自动获取.
"""
if self.metadata.is_running() is False:
raise ConnectionError(
"cannot create ssh tunnel, EC2 or RDS is not running."
)
if path_pem_file is None:
path_pem_file = self._get_path_pem_file(bsm=bsm)
acore_db_ssh_tunnel.create_ssh_tunnel(
path_pem_file=path_pem_file,
db_host=self.metadata.rds_inst.endpoint,
db_port=DB_PORT,
jump_host_username=EC2_USERNAME,
jump_host_public_ip=self.metadata.ec2_inst.public_ip,
)
[docs] def list_ssh_tunnel(
self: "Server",
bsm: "BotoSesManager",
path_pem_file: "Path" = None,
):
"""
列出所有正在运行中的 SSH Tunnel.
:param bsm: Boto3 Session Manager.
:param path_pem_file: EC2 的 pem 文件路径. 如果不指定, 则使用 :meth:`_get_path_pem_file`
的逻辑来自动获取.
"""
if path_pem_file is None:
path_pem_file = self._get_path_pem_file(bsm=bsm)
acore_db_ssh_tunnel.list_ssh_tunnel(path_pem_file)
[docs] def test_ssh_tunnel(self: "Server"):
"""
通过运行一个简单的 SQL 语句来测试 SSH Tunnel 是否正常工作.
"""
if self.metadata.is_running() is False:
raise ConnectionError("cannot test ssh tunnel, EC2 or RDS is not running.")
acore_db_ssh_tunnel.test_ssh_tunnel(
db_port=DB_PORT,
db_username=self.config.db_username,
db_password=self.config.db_password,
db_name="acore_auth",
)
[docs] def kill_ssh_tunnel(
self: "Server",
bsm: "BotoSesManager",
path_pem_file: "Path" = None,
):
"""
关闭所有正在运行中的 SSH Tunnel.
:param bsm: Boto3 Session Manager.
:param path_pem_file: EC2 的 pem 文件路径. 如果不指定, 则使用 :meth:`_get_path_pem_file`
的逻辑来自动获取.
"""
if path_pem_file is None:
path_pem_file = self._get_path_pem_file(bsm=bsm)
acore_db_ssh_tunnel.kill_ssh_tunnel(path_pem_file)
[docs] @logger.emoji_block(
msg="🆕🖥📸Create new EC2 AMI",
emoji="📸",
)
def create_ec2_ami(
self: "Server",
bsm: "BotoSesManager",
ami_name: T.Optional[str] = None,
utc_now: T.Optional[datetime] = None,
skip_reboot: bool = True,
check: bool = True,
wait: bool = True,
) -> "CreateImageResultTypeDef":
"""
Create a new AMI from existing EC2.
:param bsm: Boto3 Session Manager.
:param ami_name: 是否要指定 ami_name, 如不指定则自动生成.
:param utc_now: 是否要指定 utc_now, 如不指定则自动生成.
:param skip_reboot: 是否要不关机直接创建 AMI.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if ami_name is None:
ami_name = self._get_ec2_ami_name(utc_now)
if check:
self.metadata.ensure_ec2_exists()
ec2_inst = self.metadata.ec2_inst
logger.info(
f"create image {ami_name!r} from ec2 instance {self.metadata.ec2_inst.id}"
)
res = bsm.ec2_client.create_image(
InstanceId=ec2_inst.id,
Name=ami_name,
NoReboot=skip_reboot,
TagSpecifications=[
{
"ResourceType": "image",
"Tags": [{"Key": k, "Value": v} for k, v in ec2_inst.tags.items()],
}
],
)
ami_id = res["ImageId"]
if wait:
image = simple_aws_ec2.Image(id=ami_id)
image.wait_for_available(ec2_client=bsm.ec2_client)
else:
logger.info("skip waiting for available")
return res
[docs] @logger.emoji_block(
msg="🆕🛢📸Create new DB Snapshot",
emoji="📸",
)
def create_db_snapshot(
self: "Server",
bsm: "BotoSesManager",
snapshot_id: T.Optional[str] = None,
check: bool = True,
wait: bool = True,
auto_resolve: bool = True,
) -> "CreateDBSnapshotResultTypeDef":
"""
Create a new DB snapshot from existing RDS. Note that you can only
create a snapshot when DB instance is available.
:param bsm: Boto3 Session Manager.
:param snapshot_id: 是否要指定 snapshot_id, 如不指定则自动生成.
:param check: if True, check if it meets the prerequisites for this operation.
:param wait: if True, wait for the operation to complete.
"""
self.metadata.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if snapshot_id is None:
snapshot_id = self._get_db_snapshot_id()
if check:
self.metadata.ensure_rds_exists()
if self.metadata.rds_inst.is_available() is False:
if auto_resolve:
_rds_inst = self.metadata.rds_inst.wait_for_available(
rds_client=bsm.rds_client, timeout=900
)
self.metadata.rds_inst = _rds_inst
else:
self.metadata.ensure_rds_is_running()
rds_inst = self.metadata.rds_inst
logger.info(
f"create db snapshot {snapshot_id!r} from db instance {rds_inst.id}"
)
res = bsm.rds_client.create_db_snapshot(
DBSnapshotIdentifier=snapshot_id,
DBInstanceIdentifier=rds_inst.id,
)
if wait:
snap = simple_aws_rds.RDSDBSnapshot(db_snapshot_identifier=snapshot_id)
snap.wait_for_available(rds_client=bsm.rds_client)
else:
logger.info("skip waiting for available")
return res