以下为完整代码:
import hashlib
import time
class Transaction:
def __init__(self, sender, receiver, amount):
self.sender = sender
self.receiver = receiver
self.amount = amount
class Block:
def __init__(self, index, timestamp, transactions, previous_hash):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.hash = self.calculate_hash()
def calculate_hash(self):
sha = hashlib.sha256()
transactions_str = "".join([f"{tx.sender}{tx.receiver}{tx.amount}"
for tx in self.transactions])
sha.update
(f"{self.index}{self.timestamp}{transactions_str}{self.previous_hash}".encode('utf-8'))
return sha.hexdigest()
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.pending_transactions = []
self.mining_reward = 100
def create_genesis_block(self):
return Block(0, time.time(), [], "0")
def get_latest_block(self):
return self.chain[-1]
def mine_pending_transactions(self, mining_reward_address):
reward_tx = Transaction(None, mining_reward_address, self.mining_reward)
self.pending_transactions.append(reward_tx)
new_block = Block(len(self.chain), time.time(),
self.pending_transactions, self.get_latest_block().hash)
new_block.hash = new_block.calculate_hash()
self.chain.append(new_block)
self.pending_transactions = []
def create_transaction(self, transaction):
self.pending_transactions.append(transaction)
def get_balance(self, address):
balance = 0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
blockchain = Blockchain()
blockchain.create_transaction(Transaction("Alice", "Bob", 50))
blockchain.create_transaction(Transaction("Bob", "Alice", 30))
blockchain.mine_pending_transactions("Miner1")
for block in blockchain.chain:
print(f"Index: {block.index}")
print(f"Timestamp: {block.timestamp}")
print(f"Transactions:
{[{'sender': tx.sender, 'receiver': tx.receiver, 'amount': tx.amount}
for tx in block.transactions]}")
print(f"Previous Hash: {block.previous_hash}")
print(f"Hash: {block.hash}")
print()
print(f"Balance of Miner1: {blockchain.get_balance('Miner1')}")
print(f"Balance of Alice: {blockchain.get_balance('Alice')}")
print(f"Balance of Bob: {blockchain.get_balance('Bob')}")
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
谁能挖矿成功?
在区块链中,挖矿是一个竞争的过程,只有第一个找到符合条件的哈希值的矿工才能获得奖励。这个过程是一个随机的过程,因此只能通过不断尝试来找到符合条件的哈希值。
决定谁可以挖矿成功的算法有很多种,比如工作量证明(Proof of Work)、权益证明(Proof of Stake)等等。其中工作量证明是最常见的一种算法,比特币就是使用工作量证明算法来决定谁可以挖矿成功。
这里我们实现一下工作量证明算法(POW)。工作量证明算法的核心思想是找到一个符合条件的哈希值,这个哈希值的前几位是 0。这个条件是可以调整的,比如前两位是 0,前三位是 0 等等。位数越多,实现起来越困难。我们可以将这个位数称为难度(Difficulty)。如果被哈希的字符串是固定的,那么哈希值一定也是固定的,因此被哈希的字符串不能是固定的(否则可能无法找到符合条件的哈希值),通常的做法是包含一个随机数 nonce,这个随机数就是我们需要不断尝试的值。
也就是说没有这个约束,我们很快就能计算出哈希,这个哈希有可能也不满足 difficult 条件,但是有了这个约束,我们就需要不断尝试,直到找到符合条件的哈希值。
为了实现这个目的,我们需要:
- 修改 Block 类:添加 nonce 属性和 proof_of_work 方法。
def proof_of_work(self, difficulty):
self.nonce = 0
computed_hash = self.calculate_hash()
while not computed_hash.startswith('0' * difficulty):
self.nonce += 1
computed_hash = self.calculate_hash()
return computed_hash
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
- 修改 Blockchain 类:在挖矿时调用 proof_of_work 方法。
import hashlib
import time
class Transaction:
def __init__(self, sender, receiver, amount):
self.sender = sender
self.receiver = receiver
self.amount = amount
class Block:
def __init__(self, index, timestamp, transactions, previous_hash):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = 0
self.hash = self.calculate_hash()
def calculate_hash(self):
sha = hashlib.sha256()
transactions_str = "".join([f"{tx.sender}{tx.receiver}{tx.amount}"
for tx in self.transactions])
sha.update
(f"{self.index}{self.timestamp}{transactions_str}{self.previous_hash}{self.nonce}".encode('utf-8'))
return sha.hexdigest()
def proof_of_work(self, difficulty):
self.nonce = 0
computed_hash = self.calculate_hash()
while not computed_hash.startswith('0' * difficulty):
self.nonce += 1
computed_hash = self.calculate_hash()
return computed_hash
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.pending_transactions = []
self.mining_reward = 100
self.difficulty = 4
def create_genesis_block(self):
return Block(0, time.time(), [], "0")
def get_latest_block(self):
return self.chain[-1]
def mine_pending_transactions(self, mining_reward_address):
reward_tx = Transaction(None, mining_reward_address, self.mining_reward)
self.pending_transactions.append(reward_tx)
new_block =
Block(len(self.chain), time.time(),
self.pending_transactions, self.get_latest_block().hash)
new_block.hash = new_block.proof_of_work(self.difficulty)
self.chain.append(new_block)
self.pending_transactions = []
def create_transaction(self, transaction):
self.pending_transactions.append(transaction)
def get_balance(self, address):
balance = 0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
blockchain = Blockchain()
blockchain.create_transaction(Transaction("Alice", "Bob", 50))
blockchain.create_transaction(Transaction("Bob", "Alice", 30))
blockchain.mine_pending_transactions("Miner1")
for block in blockchain.chain:
print(f"Index: {block.index}")
print(f"Timestamp: {block.timestamp}")
print(f"Transactions: {[{'sender': tx.sender, 'receiver': tx.receiver, 'amount': tx.amount}
for tx in block.transactions]}")
print(f"Previous Hash: {block.previous_hash}")
print(f"Hash: {block.hash}")
print(f"Nonce: {block.nonce}")
print()
print(f"Balance of Miner1: {blockchain.get_balance('Miner1')}")
print(f"Balance of Alice: {blockchain.get_balance('Alice')}")
print(f"Balance of Bob: {blockchain.get_balance('Bob')}")
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
智能合约
智能合约是区块链中的另一个重要概念,它是一种自动执行的合约,不需要中间人,不需要信任。智能合约是区块链中的一种应用,它可以实现一些自动化的业务逻辑,比如数字货币、供应链金融等等。
本质上,智能合约是一段代码,这段代码会被部署到区块链上,然后通过交易来调用这段代码。智能合约的代码是不可篡改的,一旦部署到区块链上,就无法修改。
智能合约的本体就是代码,本质类似于状态机。
智能合约还有一个很重要的概念是 ABI。ABI(Application Binary Interface,应用二进制接口)是智能合约与外部应用程序之间的接口定义。它描述了智能合约的函数和事件,使得外部应用程序可以与智能合约进行交互。
智能合约代码是用 Solidity 等编程语言编写的,定义了合约的逻辑和功能。合约代码通常需要编译,在编译后会生成字节码(bytecode),部署到区块链上。
ABI 是合约编译后生成的 JSON 文件,描述了合约的接口。它不包含合约的逻辑实现,只包含函数和事件的定义。外部应用程序使用 ABI 来与部署在区块链上的合约进行交互。
也就是说 ABI 决定了如何调用合约,而合约代码决定了合约的逻辑
为了在区块链中添加智能合约功能,我们需要进行以下步骤:
-
- 定义智能合约类:创建一个 SmartContract 类,用于定义智能合约的基本结构和功能。
class SmartContract:
def __init__(self, code):
self.code = code
self.state = {}
def execute(self, sender, receiver, amount):
exec(self.code,
{'sender': sender, 'receiver': receiver, 'amount': amount, 'state': self.state})
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
-
- 修改 Blockchain 类:添加处理智能合约的功能。
def deploy_contract(self, contract_code):
contract = SmartContract(contract_code)
contract_address = hashlib.sha256(contract_code.encode('utf-8')).hexdigest()
self.contracts[contract_address] = contract
return contract_address
def call_contract(self, contract_address, sender, receiver, amount):
contract = self.contracts.get(contract_address)
if contract:
contract.execute(sender, receiver, amount)
tx = Transaction(sender, receiver, amount, contract_address)
self.create_transaction(tx)
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
合约有地址属性,合约的地址是合约代码的哈希值,这也说明了合约的本体就是代码本身。通常要调用合约就是指定合约地址,然后调用合约的方法,外加一些参数。本质上和调用函数是一样的。
完整代码:
import hashlib
import time
class Transaction:
def __init__(self, sender, receiver, amount, contract=None):
self.sender = sender
self.receiver = receiver
self.amount = amount
self.contract = contract
class SmartContract:
def __init__(self, code):
self.code = code
self.state = {}
def execute(self, sender, receiver, amount):
exec(self.code, {'sender': sender, 'receiver': receiver, 'amount': amount, 'state': self.state})
class Block:
def __init__(self, index, timestamp, transactions, previous_hash):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = 0
self.hash = self.calculate_hash()
def calculate_hash(self):
sha = hashlib.sha256()
transactions_str = "".join([f"{tx.sender}{tx.receiver}{tx.amount}{tx.contract}"
for tx in self.transactions])
sha.update
(f"{self.index}{self.timestamp}{transactions_str}{self.previous_hash}{self.nonce}".encode('utf-8'))
return sha.hexdigest()
def proof_of_work(self, difficulty):
self.nonce = 0
computed_hash = self.calculate_hash()
while not computed_hash.startswith('0' * difficulty):
self.nonce += 1
computed_hash = self.calculate_hash()
return computed_hash
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.pending_transactions = []
self.mining_reward = 100
self.difficulty = 4
self.contracts = {}
def create_genesis_block(self):
return Block(0, time.time(), [], "0")
def get_latest_block(self):
return self.chain[-1]
def mine_pending_transactions(self, mining_reward_address):
reward_tx = Transaction(None, mining_reward_address, self.mining_reward)
self.pending_transactions.append(reward_tx)
new_block =
Block(len(self.chain), time.time(), self.pending_transactions,
self.get_latest_block().hash)
new_block.hash = new_block.proof_of_work(self.difficulty)
self.chain.append(new_block)
self.pending_transactions = []
def create_transaction(self, transaction):
self.pending_transactions.append(transaction)
def deploy_contract(self, contract_code):
contract = SmartContract(contract_code)
contract_address = hashlib.sha256(contract_code.encode('utf-8')).hexdigest()
self.contracts[contract_address] = contract
return contract_address
def call_contract(self, contract_address, sender, receiver, amount):
contract = self.contracts.get(contract_address)
if contract:
contract.execute(sender, receiver, amount)
tx = Transaction(sender, receiver, amount, contract_address)
self.create_transaction(tx)
def get_balance(self, address):
balance = 0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
blockchain = Blockchain()
contract_code = """ if amount > 10:
state['status'] = 'High value transaction' else:
state['status'] = 'Low value transaction' """
contract_address = blockchain.deploy_contract(contract_code)
blockchain.call_contract(contract_address, "Alice", "Bob", 50)
blockchain.mine_pending_transactions("Miner1")
for block in blockchain.chain:
print(f"Index: {block.index}")
print(f"Timestamp: {block.timestamp}")
print(f"Transactions:
{[{'sender': tx.sender, 'receiver': tx.receiver, 'amount': tx.amount, 'contract': tx.contract}
for tx in block.transactions]}")
print(f"Previous Hash: {block.previous_hash}")
print(f"Hash: {block.hash}")
print(f"Nonce: {block.nonce}")
print()
print(f"Contract State: {blockchain.contracts[contract_address].state}")
print(f"Balance of Miner1: {blockchain.get_balance('Miner1')}")
print(f"Balance of Alice: {blockchain.get_balance('Alice')}")
print(f"Balance of Bob: {blockchain.get_balance('Bob')}")
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
验证交易
交易不全是有效的,我们需要验证交易的有效性。比如余额不足、交易重复,签名等等。
为了实现验证交易的功能,我们需要以下步骤:
-
- 定义交易验证规则:确保交易的发送者有足够的余额,交易的格式正确等。
def validate_transaction(self, transaction):
if transaction.sender is None:
return True
sender_balance = self.get_balance(transaction.sender)
if sender_balance >= transaction.amount:
return True
return False
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class Transaction:
def __init__(self, sender, receiver, amount, signature=None, contract=None):
self.sender = sender
self.receiver = receiver
self.amount = amount
self.signature = signature
self.contract = contract
def to_dict(self):
return {'sender': self.sender,'receiver': self.receiver,'amount': self.amount,'contract': self.contrac}
def sign_transaction(self, private_key):
sk = SigningKey.from_string(bytes.fromhex(private_key), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
self.signature = sk.sign(message).hex()
def is_valid(self):
if self.sender is None:
return True
if not self.signature:
return False
vk = VerifyingKey.from_string(bytes.fromhex(self.sender), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
try:
return vk.verify(bytes.fromhex(self.signature), message)
except:
return False
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
-
- 实现交易验证方法:在区块链类中添加一个方法来验证交易。
-
- 在添加交易时进行验证:在创建交易时调用验证方法。
import hashlib
import time
import requests
from flask import Flask, jsonify, request
from ecdsa import SigningKey, VerifyingKey, SECP256k1
class Transaction:
def __init__(self, sender, receiver, amount, signature=None, contract=None):
self.sender = sender
self.receiver = receiver
self.amount = amount
self.signature = signature
self.contract = contract
def to_dict(self):
return {'sender': self.sender,'receiver': self.receiver,'amount': self.amount'contract': self.contract}
def sign_transaction(self, private_key):
sk = SigningKey.from_string(bytes.fromhex(private_key), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
self.signature = sk.sign(message).hex()
def is_valid(self):
if self.sender is None:
return True
if not self.signature:
return False
vk = VerifyingKey.from_string(bytes.fromhex(self.sender), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
try:
return vk.verify(bytes.fromhex(self.signature), message)
except:
return False
class Block:
def __init__(self, index, timestamp, transactions, previous_hash):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = 0
self.hash = self.calculate_hash()
def calculate_hash(self):
sha = hashlib.sha256()
transactions_str = "".join([f"{tx.sender}{tx.receiver}{tx.amount}{tx.contract}"
for tx in self.transactions])
sha.update
(f"{self.index}{self.timestamp}{transactions_str}{self.previous_hash}{self.nonce}".encode('utf-8'))
return sha.hexdigest()
def proof_of_work(self, difficulty):
self.nonce = 0
computed_hash = self.calculate_hash()
while not computed_hash.startswith('0' * difficulty):
self.nonce += 1
computed_hash = self.calculate_hash()
return computed_hash
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.pending_transactions = []
self.mining_reward = 100
self.difficulty = 4
self.contracts = {}
self.nodes = set()
def create_genesis_block(self):
return Block(0, time.time(), [], "0")
def get_latest_block(self):
return self.chain[-1]
def mine_pending_transactions(self, mining_reward_address):
reward_tx = Transaction(None, mining_reward_address, self.mining_reward)
self.pending_transactions.append(reward_tx)
new_block =
Block(len(self.chain), time.time(), self.pending_transactions, self.get_latest_block().hash)
new_block.hash = new_block.proof_of_work(self.difficulty)
self.chain.append(new_block)
self.pending_transactions = []
self.broadcast_block(new_block)
def create_transaction(self, transaction):
if self.validate_transaction(transaction):
self.pending_transactions.append(transaction)
self.broadcast_transaction(transaction)
else:
raise ValueError("Invalid transaction")
def validate_transaction(self, transaction):
if transaction.sender is None:
return True
sender_balance = self.get_balance(transaction.sender)
if sender_balance >= transaction.amount and transaction.is_valid():
return True
return False
def deploy_contract(self, contract_code):
contract = SmartContract(contract_code)
contract_address = hashlib.sha256(contract_code.encode('utf-8')).hexdigest()
self.contracts[contract_address] = contract
return contract_address
def call_contract(self, contract_address, sender, receiver, amount):
contract = self.contracts.get(contract_address)
if contract:
contract.execute(sender, receiver, amount)
tx = Transaction(sender, receiver, amount, contract_address)
self.create_transaction(tx)
def get_balance(self, address):
balance = 0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
class Node:
def __init__(self, address):
self.address = address
self.blockchain = Blockchain()
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
如果两个矿工同时挖到了区块怎么办?
这涉及到一个共识算法,比如比特币使用的共识算法是最长链原则。
在区块链中,如果两个矿工同时挖到了区块,那么就会出现分叉的情况。这个时候需要选择一个分支作为主链,另一个分支作为孤块。选择主链的原则是选择最长的链作为主链。
至今我们的区块链都是单节点的,接下来我们要实现多节点的区块链来解决这个问题。
首先我们需要实现多节点、节点广播和节点同步的功能。为此我需要:
-
- 定义节点类:创建一个 Node 类,用于表示区块链网络中的节点。
class Node:
def __init__(self, address):
self.address = address
self.blockchain = Blockchain()
def connect_to_node(self, node_address):
self.blockchain.add_node(node_address)
def broadcast_transaction(self, transaction):
for node in self.blockchain.nodes:
requests.post(f"{node}/add_transaction", json=transaction.__dict__)
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
-
- 修改 Blockchain 类:添加处理节点和广播的功能。
def add_node(self, address):
self.nodes.add(address)
def broadcast_block(self, block):
for node in self.nodes:
requests.post(f"{node}/add_block", json=block.__dict__)
def sync_chain(self):
longest_chain = None
max_length = len(self.chain)
for node in self.nodes:
response = requests.get(f"{node}/chain")
length = response.json()['length']
chain = response.json()['chain']
if length > max_length:
max_length = length
longest_chain = chain
if longest_chain:
self.chain = [Block(**block) for block in longest_chain]
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
-
- 实现节点之间的通信:使用 HTTP 或 WebSocket 实现节点之间的通信。
app = Flask(__name__)
node = Node("http://localhost:5000")
@app.route('/chain', methods=['GET'])
def get_chain():
chain_data = [block.__dict__
for block in node.blockchain.chain]
return jsonify(length=len(chain_data), chain=chain_data)
@app.route('/add_block', methods=['POST'])
def add_block():
block_data = request.get_json()
block = Block(**block_data)
node.blockchain.chain.append(block)
return "Block added", 201
@app.route('/add_transaction', methods=['POST'])
def add_transaction():
tx_data = request.get_json()
transaction = Transaction(**tx_data)
node.blockchain.create_transaction(transaction)
return "Transaction added", 201
@app.route('/mine', methods=['GET'])
def mine():
node.blockchain.mine_pending_transactions(node.address)
return "Mining complete", 200
if __name__ == '__main__':
app.run(port=5000)
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
接下来,我们实现最长链原则,当两个矿工同时挖到了区块时,我们选择最长的链作为主链。
为了实现这个功能,我们先介绍下分叉如何实现。
- 创建一个新的链:从当前链的某个区块开始创建一个新的链。
- 添加新的区块到分叉链:在新的链上添加新的区块。
- 切换到分叉链:在需要的时候切换到分叉链。
整个过程类似于我们 git 上切换分支。
fork_chain 方法:从当前链的某个区块开始创建一个新的链,并将其添加到 forks 列表中。
switch_to_fork 方法:切换到指定的分叉链
Flask 路由:
- /fork 路由用于创建分叉链。
- /switch_fork 路由用于切换到指定的分叉链。
最后我们来加入最长链原则。
为了在区块链中实现最长链原则,我们需要在同步链时选择最长的链作为当前链。以下是详细步骤和代码实现:
详细步骤
- 同步链:从所有节点获取链数据。找到最长的链。如果最长的链比当前链长,则替换当前链。
- 广播新块:当有新块时,广播给所有节点。
- 验证链:验证链的有效性。
- sync_chain 方法:从所有节点获取链数据,找到最长的链并替换当前链
- is_valid_chain 方法:验证链的有效性,确保链中的每个块都有效。
Flask 路由:
以下为完整代码:
import hashlib
import time
import requests
from flask
import Flask, jsonify, request
from ecdsa
import SigningKey, VerifyingKey, SECP256k1
class Transaction:
def __init__(self, sender, receiver, amount, signature=None, contract=None):
self.sender = sender
self.receiver = receiver
self.amount = amount
self.signature = signature
self.contract = contract
def to_dict(self):
return
{'sender': self.sender,'receiver': self.receiver,'amount': self.amount,'contract': self.contract}
def sign_transaction(self, private_key):
sk = SigningKey.from_string(bytes.fromhex(private_key), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
self.signature = sk.sign(message).hex()
def is_valid(self):
if self.sender is None:
return True
if not self.signature:
return False
vk = VerifyingKey.from_string(bytes.fromhex(self.sender), curve=SECP256k1)
message = str(self.to_dict()).encode('utf-8')
try:
return vk.verify(bytes.fromhex(self.signature), message)
except:
return False
class Block:
def __init__(self, index, timestamp, transactions, previous_hash):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = 0
self.hash = self.calculate_hash()
def calculate_hash(self):
sha = hashlib.sha256()
transactions_str = "".join([f"{tx.sender}{tx.receiver}{tx.amount}{tx.contract}"
for tx in self.transactions])
sha.update
(f"{self.index}{self.timestamp}{transactions_str}{self.previous_hash}{self.nonce}".encode('utf-8'))
return sha.hexdigest()
def proof_of_work(self, difficulty):
self.nonce = 0
computed_hash = self.calculate_hash()
while not computed_hash.startswith('0' * difficulty):
self.nonce += 1
computed_hash = self.calculate_hash()
return computed_hash
class Blockchain:
def __init__(self):
self.chain = [self.create_genesis_block()]
self.pending_transactions = []
self.mining_reward = 100
self.difficulty = 4
self.contracts = {}
self.nodes = set()
self.forks = []
def create_genesis_block(self):
return Block(0, time.time(), [], "0")
def get_latest_block(self):
return self.chain[-1]
def mine_pending_transactions(self, mining_reward_address):
reward_tx = Transaction(None, mining_reward_address, self.mining_reward)
self.pending_transactions.append(reward_tx)
new_block =
Block(len(self.chain), time.time(), self.pending_transactions, self.get_latest_block().hash)
new_block.hash = new_block.proof_of_work(self.difficulty)
self.chain.append(new_block)
self.pending_transactions = []
self.broadcast_block(new_block)
def create_transaction(self, transaction):
if self.validate_transaction(transaction):
self.pending_transactions.append(transaction)
self.broadcast_transaction(transaction)
else:
raise ValueError("Invalid transaction")
def validate_transaction(self, transaction):
if transaction.sender is None:
return True
sender_balance = self.get_balance(transaction.sender)
if sender_balance >= transaction.amount and transaction.is_valid():
return True
return False
def deploy_contract(self, contract_code):
contract = SmartContract(contract_code)
contract_address = hashlib.sha256(contract_code.encode('utf-8')).hexdigest()
self.contracts[contract_address] = contract
return contract_address
def call_contract(self, contract_address, sender, receiver, amount):
contract = self.contracts.get(contract_address)
if contract:
contract.execute(sender, receiver, amount)
tx = Transaction(sender, receiver, amount, contract_address)
self.create_transaction(tx)
def get_balance(self, address):
balance = 0
for block in self.chain:
for tx in block.transactions:
if tx.sender == address:
balance -= tx.amount
if tx.receiver == address:
balance += tx.amount
return balance
def add_node(self, address):
self.nodes.add(address)
def broadcast_block(self, block):
for node in self.nodes:
requests.post(f"{node}/add_block", json=block.__dict__)
def broadcast_transaction(self, transaction):
for node in self.nodes:
requests.post(f"{node}/add_transaction", json=transaction.__dict__)
def sync_chain(self):
longest_chain = None
max_length = len(self.chain)
for node in self.nodes:
response = requests.get(f"{node}/chain")
length = response.json()['length']
chain = response.json()['chain']
if length > max_length and self.is_valid_chain(chain):
max_length = length
longest_chain = chain
if longest_chain:
self.chain = [Block(**block) for block in longest_chain]
def is_valid_chain(self, chain):
for i in range(1, len(chain)):
current_block = chain[i]
previous_block = chain[i - 1]
if current_block['previous_hash'] != previous_block['hash']:
return False
block = Block(**current_block)
if block.hash != block.calculate_hash():
return False
return True
def fork_chain(self, fork_point):
if fork_point < 0 or fork_point >= len(self.chain):
raise ValueError("Invalid fork point")
forked_chain = self.chain[:fork_point + 1]
self.forks.append(forked_chain)
return forked_chain
def switch_to_fork(self, fork_index):
if fork_index < 0 or fork_index >= len(self.forks):
raise ValueError("Invalid fork index")
self.chain = self.forks[fork_index] class Node:
def __init__(self, address): self.address = address
self.blockchain = Blockchain()
def connect_to_node(self, node_address):
self.blockchain.add_node(node_address)
def broadcast_transaction(self, transaction):
for node in self.blockchain.nodes:
requests.post(f"{node}/add_transaction", json=transaction.__dict__)
app = Flask(__name__) node = Node("http://localhost:5000")
@app.route('/chain', methods=['GET'])
def get_chain():
chain_data = [block.__dict__ for block in node.blockchain.chain]
return jsonify(length=len(chain_data), chain=chain_data)
@app.route('/add_block', methods=['POST']) def add_block():
block_data = request.get_json() block = Block(**block_data)
node.blockchain.chain.append(block)
return "Block added", 201
@app.route('/add_transaction', methods=['POST'])
def add_transaction():
tx_data = request.get_json()
transaction = Transaction(**tx_data)
try:
node.blockchain.create_transaction(transaction)
return "Transaction added", 201
except ValueError as e:
return str(e), 400
@app.route('/mine', methods=['GET'])
def mine():
node.blockchain.mine_pending_transactions(node.address)
return "Mining complete", 200
@app.route('/fork', methods=['POST'])
def fork():
fork_point = request.json.get('fork_point')
try:
forked_chain = node.blockchain.fork_chain(fork_point)
return jsonify([block.__dict__
for block in forked_chain]), 201
except ValueError as e:
return str(e), 400
@app.route('/switch_fork', methods=['POST'])
def switch_fork():
fork_index = request.json.get('fork_index')
try:
node.blockchain.switch_to_fork(fork_index)
return "Switched to fork", 200
except ValueError as e:
return str(e), 400
if __name__ == '__main__':
app.run(port=5000)
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}">
class="hide-preCode-box">
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
总结
在本文中,我们学习了如何使用 Python 实现一个简单的区块链。我们实现了区块链、区块、交易、智能合约、节点等核心概念。我们还实现了挖矿、交易验证、节点同步等功能,希望能帮助你快速入门区块链技术。

总结
- 最后希望你编程学习上不急不躁,按照计划有条不紊推进,把任何一件事做到极致,都是不容易的,加油,努力!相信自己!
文末福利
- 最后这里免费分享给大家一份Python全套学习资料,希望能帮到那些不满现状,想提升自己却又没有方向的朋友,也可以和我一起来学习交流呀。
包含编程资料、学习路线图、源代码、软件安装包等!【[点击这里]】领取!
- ① Python所有方向的学习路线图,清楚各个方向要学什么东西
- ② 100多节Python课程视频,涵盖必备基础、爬虫和数据分析
- ③ 100多个Python实战案例,学习不再是只会理论
- ④ 华为出品独家Python漫画教程,手机也能学习
可以扫描下方二维码领取【保证100%免费】
data-report-view="{"mod":"1585297308_001","spm":"1001.2101.3001.6548","dest":"https://blog.csdn.net/2301_78217634/article/details/145490984","extend1":"pc","ab":"new"}">>
评论记录:
回复评论: